152 lines
5.3 KiB
Python
152 lines
5.3 KiB
Python
|
import torch
|
||
|
import torch.nn as nn
|
||
|
import torch.nn.functional as F
|
||
|
import numpy as np
|
||
|
import sys
|
||
|
|
||
|
class DoubleConv(nn.Module):
|
||
|
"""(convolution => [BN] => ReLU) * 2"""
|
||
|
|
||
|
def __init__(self, in_channels, out_channels, mid_channels=None):
|
||
|
super().__init__()
|
||
|
if not mid_channels:
|
||
|
mid_channels = out_channels
|
||
|
self.double_conv = nn.Sequential(
|
||
|
nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
|
||
|
nn.BatchNorm2d(mid_channels),
|
||
|
nn.ReLU(inplace=True),
|
||
|
nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
|
||
|
nn.BatchNorm2d(out_channels),
|
||
|
nn.ReLU(inplace=True)
|
||
|
)
|
||
|
|
||
|
def forward(self, x):
|
||
|
x1 = self.double_conv[0](x)
|
||
|
#print("param:", list(self.double_conv[0].parameters())[0][0][0][0])
|
||
|
#print("param:", list(self.double_conv[0].parameters())[0][63][2][2])
|
||
|
print("x1", x1[0][0][0][0])
|
||
|
return self.double_conv(x)
|
||
|
|
||
|
|
||
|
class Down(nn.Module):
|
||
|
"""Downscaling with maxpool then double conv"""
|
||
|
|
||
|
def __init__(self, in_channels, out_channels):
|
||
|
super().__init__()
|
||
|
self.maxpool_conv = nn.Sequential(
|
||
|
nn.MaxPool2d(2),
|
||
|
DoubleConv(in_channels, out_channels)
|
||
|
)
|
||
|
|
||
|
def forward(self, x):
|
||
|
#print("param:", list(self.maxpool_conv[1].double_conv[4].state_dict()['bias']))
|
||
|
return self.maxpool_conv(x)
|
||
|
|
||
|
|
||
|
class Up(nn.Module):
|
||
|
"""Upscaling then double conv"""
|
||
|
|
||
|
def __init__(self, in_channels, out_channels, bilinear=True):
|
||
|
super().__init__()
|
||
|
|
||
|
# if bilinear, use the normal convolutions to reduce the number of channels
|
||
|
if bilinear:
|
||
|
self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
|
||
|
self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
|
||
|
else:
|
||
|
self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
|
||
|
self.conv = DoubleConv(in_channels, out_channels)
|
||
|
|
||
|
def forward(self, x1, x2):
|
||
|
x1 = self.up(x1)
|
||
|
# input is CHW
|
||
|
diffY = x2.size()[2] - x1.size()[2]
|
||
|
diffX = x2.size()[3] - x1.size()[3]
|
||
|
|
||
|
x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
|
||
|
diffY // 2, diffY - diffY // 2])
|
||
|
# if you have padding issues, see
|
||
|
# https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
|
||
|
# https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
|
||
|
x = torch.cat([x2, x1], dim=1)
|
||
|
return self.conv(x)
|
||
|
|
||
|
|
||
|
class OutConv(nn.Module):
|
||
|
def __init__(self, in_channels, out_channels):
|
||
|
super(OutConv, self).__init__()
|
||
|
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
|
||
|
|
||
|
def forward(self, x):
|
||
|
#print("param:", list(self.conv.parameters()))
|
||
|
#print("param:", list(self.conv.state_dict()['bias']))
|
||
|
return self.conv(x)
|
||
|
|
||
|
|
||
|
class UNet(nn.Module):
|
||
|
def __init__(self, n_channels, n_classes, bilinear=False):
|
||
|
super(UNet, self).__init__()
|
||
|
self.n_channels = n_channels
|
||
|
self.n_classes = n_classes
|
||
|
self.bilinear = bilinear
|
||
|
|
||
|
self.inc = (DoubleConv(n_channels, 64))
|
||
|
self.down1 = (Down(64, 128))
|
||
|
self.down2 = (Down(128, 256))
|
||
|
self.down3 = (Down(256, 512))
|
||
|
factor = 2 if bilinear else 1
|
||
|
self.down4 = (Down(512, 1024 // factor))
|
||
|
self.up1 = (Up(1024, 512 // factor, bilinear))
|
||
|
self.up2 = (Up(512, 256 // factor, bilinear))
|
||
|
self.up3 = (Up(256, 128 // factor, bilinear))
|
||
|
self.up4 = (Up(128, 64, bilinear))
|
||
|
self.outc = (OutConv(64, n_classes))
|
||
|
|
||
|
def forward(self, x):
|
||
|
x1 = self.inc(x)
|
||
|
x2 = self.down1(x1)
|
||
|
x3 = self.down2(x2)
|
||
|
x4 = self.down3(x3)
|
||
|
x5 = self.down4(x4)
|
||
|
x = self.up1(x5, x4)
|
||
|
x = self.up2(x, x3)
|
||
|
x = self.up3(x, x2)
|
||
|
x = self.up4(x, x1)
|
||
|
logits = self.outc(x)
|
||
|
return logits
|
||
|
|
||
|
def use_checkpointing(self):
|
||
|
self.inc = torch.utils.checkpoint(self.inc)
|
||
|
self.down1 = torch.utils.checkpoint(self.down1)
|
||
|
self.down2 = torch.utils.checkpoint(self.down2)
|
||
|
self.down3 = torch.utils.checkpoint(self.down3)
|
||
|
self.down4 = torch.utils.checkpoint(self.down4)
|
||
|
self.up1 = torch.utils.checkpoint(self.up1)
|
||
|
self.up2 = torch.utils.checkpoint(self.up2)
|
||
|
self.up3 = torch.utils.checkpoint(self.up3)
|
||
|
self.up4 = torch.utils.checkpoint(self.up4)
|
||
|
self.outc = torch.utils.checkpoint(self.outc)
|
||
|
|
||
|
imgSize = (1,3,640,959)
|
||
|
|
||
|
if __name__=='__main__':
|
||
|
|
||
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
||
|
|
||
|
inputData = np.fromfile(sys.argv[1], dtype=np.float32)
|
||
|
inputTensor = torch.from_numpy(inputData).to(device).reshape(imgSize)
|
||
|
|
||
|
print("input data size : ", inputTensor.shape)
|
||
|
|
||
|
model = UNet(3,2,False).to(device).eval()
|
||
|
state_dict = torch.load('MODEL.pth', map_location=device)
|
||
|
mask_values = state_dict.pop('mask_values', [0, 1])
|
||
|
model.load_state_dict(state_dict)
|
||
|
print("input : {0:.6f}".format(inputTensor[0][2][639][958]))
|
||
|
outputData = model(inputTensor)
|
||
|
|
||
|
with open(sys.argv[2], 'wb') as f:
|
||
|
f.write(outputData.detach().to('cpu').numpy().tobytes())
|
||
|
|
||
|
print("output data size : ", outputData.shape)
|