nrtoya's picture
0135
ea56a4f
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Author : Peike Li
@Contact : peike.li@yahoo.com
@File : AugmentCE2P.py
@Time : 8/4/19 3:35 PM
@Desc :
@License : This source code is licensed under the license found in the
LICENSE file in the root directory of this source tree.
"""
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.nn import BatchNorm2d, LeakyReLU
affine_par = True
pretrained_settings = {
'resnet101': {
'imagenet': {
'input_space': 'BGR',
'input_size': [3, 224, 224],
'input_range': [0, 1],
'mean': [0.406, 0.456, 0.485],
'std': [0.225, 0.224, 0.229],
'num_classes': 1000
}
},
}
def conv3x3(in_planes, out_planes, stride=1):
"3x3 convolution with padding"
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
padding=1, bias=False)
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, inplanes, planes, stride=1, dilation=1, downsample=None, fist_dilation=1, multi_grid=1):
super(Bottleneck, self).__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride,
padding=dilation * multi_grid, dilation=dilation * multi_grid, bias=False)
self.bn2 = BatchNorm2d(planes)
self.conv3 = nn.Conv2d(planes, planes * 4, kernel_size=1, bias=False)
self.bn3 = BatchNorm2d(planes * 4)
self.relu = nn.ReLU(inplace=False)
self.relu_inplace = nn.ReLU(inplace=True)
self.downsample = downsample
self.dilation = dilation
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
residual = self.downsample(x)
out = out + residual
out = self.relu_inplace(out)
return out
class PSPModule(nn.Module):
"""
Reference:
Zhao, Hengshuang, et al. *"Pyramid scene parsing network."*
"""
def __init__(self, features, out_features=512, sizes=(1, 2, 3, 6)):
super(PSPModule, self).__init__()
self.stages = []
self.stages = nn.ModuleList([self._make_stage(features, out_features, size) for size in sizes])
self.bottleneck = nn.Sequential(
nn.Conv2d(features + len(sizes) * out_features, out_features, kernel_size=3, padding=1, dilation=1,
bias=False),
BatchNorm2d(out_features),
LeakyReLU(),
)
def _make_stage(self, features, out_features, size):
prior = nn.AdaptiveAvgPool2d(output_size=(size, size))
conv = nn.Conv2d(features, out_features, kernel_size=1, bias=False)
return nn.Sequential(
prior,
conv,
# bn
BatchNorm2d(out_features),
LeakyReLU(),
)
def forward(self, feats):
h, w = feats.size(2), feats.size(3)
priors = [F.interpolate(input=stage(feats), size=(h, w), mode='bilinear', align_corners=True) for stage in
self.stages] + [feats]
bottle = self.bottleneck(torch.cat(priors, 1))
return bottle
class ASPPModule(nn.Module):
"""
Reference:
Chen, Liang-Chieh, et al. *"Rethinking Atrous Convolution for Semantic Image Segmentation."*
"""
def __init__(self, features, inner_features=256, out_features=512, dilations=(12, 24, 36)):
super(ASPPModule, self).__init__()
self.conv1 = nn.Sequential(nn.AdaptiveAvgPool2d((1, 1)),
nn.Conv2d(features, inner_features, kernel_size=1, padding=0, dilation=1,
bias=False),
# InPlaceABNSync(inner_features)
BatchNorm2d(inner_features),
LeakyReLU(),
)
self.conv2 = nn.Sequential(
nn.Conv2d(features, inner_features, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(inner_features),
LeakyReLU(),
)
self.conv3 = nn.Sequential(
nn.Conv2d(features, inner_features, kernel_size=3, padding=dilations[0], dilation=dilations[0], bias=False),
BatchNorm2d(inner_features),
LeakyReLU(),
)
self.conv4 = nn.Sequential(
nn.Conv2d(features, inner_features, kernel_size=3, padding=dilations[1], dilation=dilations[1], bias=False),
BatchNorm2d(inner_features),
LeakyReLU(),
)
self.conv5 = nn.Sequential(
nn.Conv2d(features, inner_features, kernel_size=3, padding=dilations[2], dilation=dilations[2], bias=False),
BatchNorm2d(inner_features),
LeakyReLU(),
)
self.bottleneck = nn.Sequential(
nn.Conv2d(inner_features * 5, out_features, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(inner_features),
LeakyReLU(),
nn.Dropout2d(0.1)
)
def forward(self, x):
_, _, h, w = x.size()
feat1 = F.interpolate(self.conv1(x), size=(h, w), mode='bilinear', align_corners=True)
feat2 = self.conv2(x)
feat3 = self.conv3(x)
feat4 = self.conv4(x)
feat5 = self.conv5(x)
out = torch.cat((feat1, feat2, feat3, feat4, feat5), 1)
bottle = self.bottleneck(out)
return bottle
class Edge_Module(nn.Module):
"""
Edge Learning Branch
"""
def __init__(self, in_fea=[256, 512, 1024], mid_fea=256, out_fea=2):
super(Edge_Module, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(in_fea[0], mid_fea, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(mid_fea),
LeakyReLU(),
)
self.conv2 = nn.Sequential(
nn.Conv2d(in_fea[1], mid_fea, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(mid_fea),
LeakyReLU(),
)
self.conv3 = nn.Sequential(
nn.Conv2d(in_fea[2], mid_fea, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(mid_fea),
LeakyReLU(),
)
self.conv4 = nn.Conv2d(mid_fea, out_fea, kernel_size=3, padding=1, dilation=1, bias=True)
# self.conv5 = nn.Conv2d(out_fea * 3, out_fea, kernel_size=1, padding=0, dilation=1, bias=True)
def forward(self, x1, x2, x3):
_, _, h, w = x1.size()
edge1_fea = self.conv1(x1)
# edge1 = self.conv4(edge1_fea)
edge2_fea = self.conv2(x2)
edge2 = self.conv4(edge2_fea)
edge3_fea = self.conv3(x3)
edge3 = self.conv4(edge3_fea)
edge2_fea = F.interpolate(edge2_fea, size=(h, w), mode='bilinear', align_corners=True)
edge3_fea = F.interpolate(edge3_fea, size=(h, w), mode='bilinear', align_corners=True)
edge2 = F.interpolate(edge2, size=(h, w), mode='bilinear', align_corners=True)
edge3 = F.interpolate(edge3, size=(h, w), mode='bilinear', align_corners=True)
# edge = torch.cat([edge1, edge2, edge3], dim=1)
edge_fea = torch.cat([edge1_fea, edge2_fea, edge3_fea], dim=1)
# edge = self.conv5(edge)
# return edge, edge_fea
return edge_fea
class Decoder_Module(nn.Module):
"""
Parsing Branch Decoder Module.
"""
def __init__(self, num_classes):
super(Decoder_Module, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(512, 256, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(256),
LeakyReLU(),
)
self.conv2 = nn.Sequential(
nn.Conv2d(256, 48, kernel_size=1, stride=1, padding=0, dilation=1, bias=False),
BatchNorm2d(48),
LeakyReLU(),
)
self.conv3 = nn.Sequential(
nn.Conv2d(304, 256, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(256),
LeakyReLU(),
nn.Conv2d(256, 256, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(256),
LeakyReLU(),
)
# self.conv4 = nn.Conv2d(256, num_classes, kernel_size=1, padding=0, dilation=1, bias=True)
def forward(self, xt, xl):
_, _, h, w = xl.size()
xt = F.interpolate(self.conv1(xt), size=(h, w), mode='bilinear', align_corners=True)
xl = self.conv2(xl)
x = torch.cat([xt, xl], dim=1)
x = self.conv3(x)
# seg = self.conv4(x)
# return seg, x
return x
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes):
self.inplanes = 128
super(ResNet, self).__init__()
self.conv1 = conv3x3(3, 64, stride=2)
self.bn1 = BatchNorm2d(64)
self.relu1 = nn.ReLU(inplace=False)
self.conv2 = conv3x3(64, 64)
self.bn2 = BatchNorm2d(64)
self.relu2 = nn.ReLU(inplace=False)
self.conv3 = conv3x3(64, 128)
self.bn3 = BatchNorm2d(128)
self.relu3 = nn.ReLU(inplace=False)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
self.layer4 = self._make_layer(block, 512, layers[3], stride=1, dilation=2, multi_grid=(1, 1, 1))
self.context_encoding = PSPModule(2048, 512)
self.edge = Edge_Module()
self.decoder = Decoder_Module(num_classes)
self.fushion = nn.Sequential(
nn.Conv2d(1024, 256, kernel_size=1, padding=0, dilation=1, bias=False),
BatchNorm2d(256),
LeakyReLU(),
nn.Dropout2d(0.1),
nn.Conv2d(256, num_classes, kernel_size=1, padding=0, dilation=1, bias=True)
)
def _make_layer(self, block, planes, blocks, stride=1, dilation=1, multi_grid=1):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.inplanes, planes * block.expansion,
kernel_size=1, stride=stride, bias=False),
BatchNorm2d(planes * block.expansion, affine=affine_par))
layers = []
generate_multi_grid = lambda index, grids: grids[index % len(grids)] if isinstance(grids, tuple) else 1
layers.append(block(self.inplanes, planes, stride, dilation=dilation, downsample=downsample,
multi_grid=generate_multi_grid(0, multi_grid)))
self.inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(
block(self.inplanes, planes, dilation=dilation, multi_grid=generate_multi_grid(i, multi_grid)))
return nn.Sequential(*layers)
def forward(self, x):
x = self.relu1(self.bn1(self.conv1(x)))
x = self.relu2(self.bn2(self.conv2(x)))
x = self.relu3(self.bn3(self.conv3(x)))
x = self.maxpool(x)
x2 = self.layer1(x)
x3 = self.layer2(x2)
x4 = self.layer3(x3)
x5 = self.layer4(x4)
x = self.context_encoding(x5)
# parsing_result, parsing_fea = self.decoder(x, x2)
parsing_fea = self.decoder(x, x2)
# Edge Branch
# edge_result, edge_fea = self.edge(x2, x3, x4)
edge_fea = self.edge(x2, x3, x4)
# Fusion Branch
x = torch.cat([parsing_fea, edge_fea], dim=1)
fusion_result = self.fushion(x)
# return [[parsing_result, fusion_result], [edge_result]]
return fusion_result
def initialize_pretrained_model(model, settings, pretrained='./models/resnet101-imagenet.pth'):
model.input_space = settings['input_space']
model.input_size = settings['input_size']
model.input_range = settings['input_range']
model.mean = settings['mean']
model.std = settings['std']
if pretrained is not None:
saved_state_dict = torch.load(pretrained)
new_params = model.state_dict().copy()
for i in saved_state_dict:
i_parts = i.split('.')
if not i_parts[0] == 'fc':
new_params['.'.join(i_parts[0:])] = saved_state_dict[i]
model.load_state_dict(new_params)
def resnet101(num_classes=20, pretrained='./models/resnet101-imagenet.pth'):
model = ResNet(Bottleneck, [3, 4, 23, 3], num_classes)
settings = pretrained_settings['resnet101']['imagenet']
initialize_pretrained_model(model, settings, pretrained)
return model