Paul Engstler
Initial commit
92f0e98
raw
history blame
17.9 kB
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from . import resnet, resnext
try:
from lib.nn import SynchronizedBatchNorm2d
except ImportError:
from torch.nn import BatchNorm2d as SynchronizedBatchNorm2d
class SegmentationModuleBase(nn.Module):
def __init__(self):
super(SegmentationModuleBase, self).__init__()
@staticmethod
def pixel_acc(pred, label, ignore_index=-1):
_, preds = torch.max(pred, dim=1)
valid = (label != ignore_index).long()
acc_sum = torch.sum(valid * (preds == label).long())
pixel_sum = torch.sum(valid)
acc = acc_sum.float() / (pixel_sum.float() + 1e-10)
return acc
@staticmethod
def part_pixel_acc(pred_part, gt_seg_part, gt_seg_object, object_label, valid):
mask_object = (gt_seg_object == object_label)
_, pred = torch.max(pred_part, dim=1)
acc_sum = mask_object * (pred == gt_seg_part)
acc_sum = torch.sum(acc_sum.view(acc_sum.size(0), -1), dim=1)
acc_sum = torch.sum(acc_sum * valid)
pixel_sum = torch.sum(mask_object.view(mask_object.size(0), -1), dim=1)
pixel_sum = torch.sum(pixel_sum * valid)
return acc_sum, pixel_sum
@staticmethod
def part_loss(pred_part, gt_seg_part, gt_seg_object, object_label, valid):
mask_object = (gt_seg_object == object_label)
loss = F.nll_loss(pred_part, gt_seg_part * mask_object.long(), reduction='none')
loss = loss * mask_object.float()
loss = torch.sum(loss.view(loss.size(0), -1), dim=1)
nr_pixel = torch.sum(mask_object.view(mask_object.shape[0], -1), dim=1)
sum_pixel = (nr_pixel * valid).sum()
loss = (loss * valid.float()).sum() / torch.clamp(sum_pixel, 1).float()
return loss
class SegmentationModule(SegmentationModuleBase):
def __init__(self, net_enc, net_dec, labeldata, loss_scale=None):
super(SegmentationModule, self).__init__()
self.encoder = net_enc
self.decoder = net_dec
self.crit_dict = nn.ModuleDict()
if loss_scale is None:
self.loss_scale = {"object": 1, "part": 0.5, "scene": 0.25, "material": 1}
else:
self.loss_scale = loss_scale
# criterion
self.crit_dict["object"] = nn.NLLLoss(ignore_index=0) # ignore background 0
self.crit_dict["material"] = nn.NLLLoss(ignore_index=0) # ignore background 0
self.crit_dict["scene"] = nn.NLLLoss(ignore_index=-1) # ignore unlabelled -1
# Label data - read from json
self.labeldata = labeldata
object_to_num = {k: v for v, k in enumerate(labeldata['object'])}
part_to_num = {k: v for v, k in enumerate(labeldata['part'])}
self.object_part = {object_to_num[k]:
[part_to_num[p] for p in v]
for k, v in labeldata['object_part'].items()}
self.object_with_part = sorted(self.object_part.keys())
self.decoder.object_part = self.object_part
self.decoder.object_with_part = self.object_with_part
def forward(self, feed_dict, *, seg_size=None):
if seg_size is None: # training
if feed_dict['source_idx'] == 0:
output_switch = {"object": True, "part": True, "scene": True, "material": False}
elif feed_dict['source_idx'] == 1:
output_switch = {"object": False, "part": False, "scene": False, "material": True}
else:
raise ValueError
pred = self.decoder(
self.encoder(feed_dict['img'], return_feature_maps=True),
output_switch=output_switch
)
# loss
loss_dict = {}
if pred['object'] is not None: # object
loss_dict['object'] = self.crit_dict['object'](pred['object'], feed_dict['seg_object'])
if pred['part'] is not None: # part
part_loss = 0
for idx_part, object_label in enumerate(self.object_with_part):
part_loss += self.part_loss(
pred['part'][idx_part], feed_dict['seg_part'],
feed_dict['seg_object'], object_label, feed_dict['valid_part'][:, idx_part])
loss_dict['part'] = part_loss
if pred['scene'] is not None: # scene
loss_dict['scene'] = self.crit_dict['scene'](pred['scene'], feed_dict['scene_label'])
if pred['material'] is not None: # material
loss_dict['material'] = self.crit_dict['material'](pred['material'], feed_dict['seg_material'])
loss_dict['total'] = sum([loss_dict[k] * self.loss_scale[k] for k in loss_dict.keys()])
# metric
metric_dict= {}
if pred['object'] is not None:
metric_dict['object'] = self.pixel_acc(
pred['object'], feed_dict['seg_object'], ignore_index=0)
if pred['material'] is not None:
metric_dict['material'] = self.pixel_acc(
pred['material'], feed_dict['seg_material'], ignore_index=0)
if pred['part'] is not None:
acc_sum, pixel_sum = 0, 0
for idx_part, object_label in enumerate(self.object_with_part):
acc, pixel = self.part_pixel_acc(
pred['part'][idx_part], feed_dict['seg_part'], feed_dict['seg_object'],
object_label, feed_dict['valid_part'][:, idx_part])
acc_sum += acc
pixel_sum += pixel
metric_dict['part'] = acc_sum.float() / (pixel_sum.float() + 1e-10)
if pred['scene'] is not None:
metric_dict['scene'] = self.pixel_acc(
pred['scene'], feed_dict['scene_label'], ignore_index=-1)
return {'metric': metric_dict, 'loss': loss_dict}
else: # inference
output_switch = {"object": True, "part": True, "scene": True, "material": True}
pred = self.decoder(self.encoder(feed_dict['img'], return_feature_maps=True),
output_switch=output_switch, seg_size=seg_size)
return pred
def conv3x3(in_planes, out_planes, stride=1, has_bias=False):
"3x3 convolution with padding"
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
padding=1, bias=has_bias)
def conv3x3_bn_relu(in_planes, out_planes, stride=1):
return nn.Sequential(
conv3x3(in_planes, out_planes, stride),
SynchronizedBatchNorm2d(out_planes),
nn.ReLU(inplace=True),
)
class ModelBuilder:
def __init__(self):
pass
# custom weights initialization
@staticmethod
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
nn.init.kaiming_normal_(m.weight.data, nonlinearity='relu')
elif classname.find('BatchNorm') != -1:
m.weight.data.fill_(1.)
m.bias.data.fill_(1e-4)
#elif classname.find('Linear') != -1:
# m.weight.data.normal_(0.0, 0.0001)
def build_encoder(self, arch='resnet50_dilated8', fc_dim=512, weights=''):
pretrained = True if len(weights) == 0 else False
if arch == 'resnet50':
orig_resnet = resnet.__dict__['resnet50'](pretrained=pretrained)
net_encoder = Resnet(orig_resnet)
elif arch == 'resnet101':
orig_resnet = resnet.__dict__['resnet101'](pretrained=pretrained)
net_encoder = Resnet(orig_resnet)
elif arch == 'resnext101':
orig_resnext = resnext.__dict__['resnext101'](pretrained=pretrained)
net_encoder = Resnet(orig_resnext) # we can still use class Resnet
else:
raise Exception('Architecture undefined!')
# net_encoder.apply(self.weights_init)
if len(weights) > 0:
# print('Loading weights for net_encoder')
net_encoder.load_state_dict(
torch.load(weights, map_location=lambda storage, loc: storage), strict=False)
return net_encoder
def build_decoder(self, nr_classes,
arch='ppm_bilinear_deepsup', fc_dim=512,
weights='', use_softmax=False):
if arch == 'upernet_lite':
net_decoder = UPerNet(
nr_classes=nr_classes,
fc_dim=fc_dim,
use_softmax=use_softmax,
fpn_dim=256)
elif arch == 'upernet':
net_decoder = UPerNet(
nr_classes=nr_classes,
fc_dim=fc_dim,
use_softmax=use_softmax,
fpn_dim=512)
else:
raise Exception('Architecture undefined!')
net_decoder.apply(self.weights_init)
if len(weights) > 0:
# print('Loading weights for net_decoder')
net_decoder.load_state_dict(
torch.load(weights, map_location=lambda storage, loc: storage), strict=False)
return net_decoder
class Resnet(nn.Module):
def __init__(self, orig_resnet):
super(Resnet, self).__init__()
# take pretrained resnet, except AvgPool and FC
self.conv1 = orig_resnet.conv1
self.bn1 = orig_resnet.bn1
self.relu1 = orig_resnet.relu1
self.conv2 = orig_resnet.conv2
self.bn2 = orig_resnet.bn2
self.relu2 = orig_resnet.relu2
self.conv3 = orig_resnet.conv3
self.bn3 = orig_resnet.bn3
self.relu3 = orig_resnet.relu3
self.maxpool = orig_resnet.maxpool
self.layer1 = orig_resnet.layer1
self.layer2 = orig_resnet.layer2
self.layer3 = orig_resnet.layer3
self.layer4 = orig_resnet.layer4
def forward(self, x, return_feature_maps=False):
conv_out = []
x = self.relu1(self.bn1(self.conv1(x)))
x = self.relu2(self.bn2(self.conv2(x)))
x = self.relu3(self.bn3(self.conv3(x)))
x = self.maxpool(x)
x = self.layer1(x); conv_out.append(x);
x = self.layer2(x); conv_out.append(x);
x = self.layer3(x); conv_out.append(x);
x = self.layer4(x); conv_out.append(x);
if return_feature_maps:
return conv_out
return [x]
# upernet
class UPerNet(nn.Module):
def __init__(self, nr_classes, fc_dim=4096,
use_softmax=False, pool_scales=(1, 2, 3, 6),
fpn_inplanes=(256,512,1024,2048), fpn_dim=256):
# Lazy import so that compilation isn't needed if not being used.
from .prroi_pool import PrRoIPool2D
super(UPerNet, self).__init__()
self.use_softmax = use_softmax
# PPM Module
self.ppm_pooling = []
self.ppm_conv = []
for scale in pool_scales:
# we use the feature map size instead of input image size, so down_scale = 1.0
self.ppm_pooling.append(PrRoIPool2D(scale, scale, 1.))
self.ppm_conv.append(nn.Sequential(
nn.Conv2d(fc_dim, 512, kernel_size=1, bias=False),
SynchronizedBatchNorm2d(512),
nn.ReLU(inplace=True)
))
self.ppm_pooling = nn.ModuleList(self.ppm_pooling)
self.ppm_conv = nn.ModuleList(self.ppm_conv)
self.ppm_last_conv = conv3x3_bn_relu(fc_dim + len(pool_scales)*512, fpn_dim, 1)
# FPN Module
self.fpn_in = []
for fpn_inplane in fpn_inplanes[:-1]: # skip the top layer
self.fpn_in.append(nn.Sequential(
nn.Conv2d(fpn_inplane, fpn_dim, kernel_size=1, bias=False),
SynchronizedBatchNorm2d(fpn_dim),
nn.ReLU(inplace=True)
))
self.fpn_in = nn.ModuleList(self.fpn_in)
self.fpn_out = []
for i in range(len(fpn_inplanes) - 1): # skip the top layer
self.fpn_out.append(nn.Sequential(
conv3x3_bn_relu(fpn_dim, fpn_dim, 1),
))
self.fpn_out = nn.ModuleList(self.fpn_out)
self.conv_fusion = conv3x3_bn_relu(len(fpn_inplanes) * fpn_dim, fpn_dim, 1)
# background included. if ignore in loss, output channel 0 will not be trained.
self.nr_scene_class, self.nr_object_class, self.nr_part_class, self.nr_material_class = \
nr_classes['scene'], nr_classes['object'], nr_classes['part'], nr_classes['material']
# input: PPM out, input_dim: fpn_dim
self.scene_head = nn.Sequential(
conv3x3_bn_relu(fpn_dim, fpn_dim, 1),
nn.AdaptiveAvgPool2d(1),
nn.Conv2d(fpn_dim, self.nr_scene_class, kernel_size=1, bias=True)
)
# input: Fusion out, input_dim: fpn_dim
self.object_head = nn.Sequential(
conv3x3_bn_relu(fpn_dim, fpn_dim, 1),
nn.Conv2d(fpn_dim, self.nr_object_class, kernel_size=1, bias=True)
)
# input: Fusion out, input_dim: fpn_dim
self.part_head = nn.Sequential(
conv3x3_bn_relu(fpn_dim, fpn_dim, 1),
nn.Conv2d(fpn_dim, self.nr_part_class, kernel_size=1, bias=True)
)
# input: FPN_2 (P2), input_dim: fpn_dim
self.material_head = nn.Sequential(
conv3x3_bn_relu(fpn_dim, fpn_dim, 1),
nn.Conv2d(fpn_dim, self.nr_material_class, kernel_size=1, bias=True)
)
def forward(self, conv_out, output_switch=None, seg_size=None):
output_dict = {k: None for k in output_switch.keys()}
conv5 = conv_out[-1]
input_size = conv5.size()
ppm_out = [conv5]
roi = [] # fake rois, just used for pooling
for i in range(input_size[0]): # batch size
roi.append(torch.Tensor([i, 0, 0, input_size[3], input_size[2]]).view(1, -1)) # b, x0, y0, x1, y1
roi = torch.cat(roi, dim=0).type_as(conv5)
ppm_out = [conv5]
for pool_scale, pool_conv in zip(self.ppm_pooling, self.ppm_conv):
ppm_out.append(pool_conv(F.interpolate(
pool_scale(conv5, roi.detach()),
(input_size[2], input_size[3]),
mode='bilinear', align_corners=False)))
ppm_out = torch.cat(ppm_out, 1)
f = self.ppm_last_conv(ppm_out)
if output_switch['scene']: # scene
output_dict['scene'] = self.scene_head(f)
if output_switch['object'] or output_switch['part'] or output_switch['material']:
fpn_feature_list = [f]
for i in reversed(range(len(conv_out) - 1)):
conv_x = conv_out[i]
conv_x = self.fpn_in[i](conv_x) # lateral branch
f = F.interpolate(
f, size=conv_x.size()[2:], mode='bilinear', align_corners=False) # top-down branch
f = conv_x + f
fpn_feature_list.append(self.fpn_out[i](f))
fpn_feature_list.reverse() # [P2 - P5]
# material
if output_switch['material']:
output_dict['material'] = self.material_head(fpn_feature_list[0])
if output_switch['object'] or output_switch['part']:
output_size = fpn_feature_list[0].size()[2:]
fusion_list = [fpn_feature_list[0]]
for i in range(1, len(fpn_feature_list)):
fusion_list.append(F.interpolate(
fpn_feature_list[i],
output_size,
mode='bilinear', align_corners=False))
fusion_out = torch.cat(fusion_list, 1)
x = self.conv_fusion(fusion_out)
if output_switch['object']: # object
output_dict['object'] = self.object_head(x)
if output_switch['part']:
output_dict['part'] = self.part_head(x)
if self.use_softmax: # is True during inference
# inference scene
x = output_dict['scene']
x = x.squeeze(3).squeeze(2)
x = F.softmax(x, dim=1)
output_dict['scene'] = x
# inference object, material
for k in ['object', 'material']:
x = output_dict[k]
x = F.interpolate(x, size=seg_size, mode='bilinear', align_corners=False)
x = F.softmax(x, dim=1)
output_dict[k] = x
# inference part
x = output_dict['part']
x = F.interpolate(x, size=seg_size, mode='bilinear', align_corners=False)
part_pred_list, head = [], 0
for idx_part, object_label in enumerate(self.object_with_part):
n_part = len(self.object_part[object_label])
_x = F.interpolate(x[:, head: head + n_part], size=seg_size, mode='bilinear', align_corners=False)
_x = F.softmax(_x, dim=1)
part_pred_list.append(_x)
head += n_part
output_dict['part'] = part_pred_list
else: # Training
# object, scene, material
for k in ['object', 'scene', 'material']:
if output_dict[k] is None:
continue
x = output_dict[k]
x = F.log_softmax(x, dim=1)
if k == "scene": # for scene
x = x.squeeze(3).squeeze(2)
output_dict[k] = x
if output_dict['part'] is not None:
part_pred_list, head = [], 0
for idx_part, object_label in enumerate(self.object_with_part):
n_part = len(self.object_part[object_label])
x = output_dict['part'][:, head: head + n_part]
x = F.log_softmax(x, dim=1)
part_pred_list.append(x)
head += n_part
output_dict['part'] = part_pred_list
return output_dict