Spaces:
Sleeping
Sleeping
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from .deform_conv import DeformConv2d | |
def add_conv(in_ch, out_ch, ksize, stride, leaky=True): | |
""" | |
Add a conv2d / batchnorm / leaky ReLU block. | |
Args: | |
in_ch (int): number of input channels of the convolution layer. | |
out_ch (int): number of output channels of the convolution layer. | |
ksize (int): kernel size of the convolution layer. | |
stride (int): stride of the convolution layer. | |
Returns: | |
stage (Sequential) : Sequential layers composing a convolution block. | |
""" | |
stage = nn.Sequential() | |
pad = (ksize - 1) // 2 | |
stage.add_module( | |
"conv", | |
nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=ksize, stride=stride, padding=pad, bias=False), | |
) | |
stage.add_module("batch_norm", nn.BatchNorm2d(out_ch)) | |
if leaky: | |
stage.add_module("leaky", nn.LeakyReLU(0.1)) | |
else: | |
stage.add_module("relu6", nn.ReLU6(inplace=True)) | |
return stage | |
class upsample(nn.Module): | |
__constants__ = ["size", "scale_factor", "mode", "align_corners", "name"] | |
def __init__(self, size=None, scale_factor=None, mode="nearest", align_corners=None): | |
super(upsample, self).__init__() | |
self.name = type(self).__name__ | |
self.size = size | |
self.scale_factor = scale_factor | |
self.mode = mode | |
self.align_corners = align_corners | |
def forward(self, input): | |
return F.interpolate(input, self.size, self.scale_factor, self.mode, self.align_corners) | |
def extra_repr(self): | |
if self.scale_factor is not None: | |
info = "scale_factor=" + str(self.scale_factor) | |
else: | |
info = "size=" + str(self.size) | |
info += ", mode=" + self.mode | |
return info | |
class SPPLayer(nn.Module): | |
def __init__(self): | |
super(SPPLayer, self).__init__() | |
def forward(self, x): | |
x_1 = x | |
x_2 = F.max_pool2d(x, 5, stride=1, padding=2) | |
x_3 = F.max_pool2d(x, 9, stride=1, padding=4) | |
x_4 = F.max_pool2d(x, 13, stride=1, padding=6) | |
out = torch.cat((x_1, x_2, x_3, x_4), dim=1) | |
return out | |
class DropBlock(nn.Module): | |
def __init__(self, block_size=7, keep_prob=0.9): | |
super(DropBlock, self).__init__() | |
self.block_size = block_size | |
self.keep_prob = keep_prob | |
self.gamma = None | |
self.kernel_size = (block_size, block_size) | |
self.stride = (1, 1) | |
self.padding = (block_size // 2, block_size // 2) | |
def reset(self, block_size, keep_prob): | |
self.block_size = block_size | |
self.keep_prob = keep_prob | |
self.gamma = None | |
self.kernel_size = (block_size, block_size) | |
self.stride = (1, 1) | |
self.padding = (block_size // 2, block_size // 2) | |
def calculate_gamma(self, x): | |
return ( | |
(1 - self.keep_prob) * x.shape[-1] ** 2 / (self.block_size**2 * (x.shape[-1] - self.block_size + 1) ** 2) | |
) | |
def forward(self, x): | |
if not self.training or self.keep_prob == 1: # set keep_prob=1 to turn off dropblock | |
return x | |
if self.gamma is None: | |
self.gamma = self.calculate_gamma(x) | |
if x.type() == "torch.cuda.HalfTensor": # TODO: not fully support for FP16 now | |
FP16 = True | |
x = x.float() | |
else: | |
FP16 = False | |
p = torch.ones_like(x) * (self.gamma) | |
mask = 1 - torch.nn.functional.max_pool2d(torch.bernoulli(p), self.kernel_size, self.stride, self.padding) | |
out = mask * x * (mask.numel() / mask.sum()) | |
if FP16: | |
out = out.half() | |
return out | |
class resblock(nn.Module): | |
""" | |
Sequential residual blocks each of which consists of \ | |
two convolution layers. | |
Args: | |
ch (int): number of input and output channels. | |
nblocks (int): number of residual blocks. | |
shortcut (bool): if True, residual tensor addition is enabled. | |
""" | |
def __init__(self, ch, nblocks=1, shortcut=True): | |
super().__init__() | |
self.shortcut = shortcut | |
self.module_list = nn.ModuleList() | |
for i in range(nblocks): | |
resblock_one = nn.ModuleList() | |
resblock_one.append(add_conv(ch, ch // 2, 1, 1)) | |
resblock_one.append(add_conv(ch // 2, ch, 3, 1)) | |
self.module_list.append(resblock_one) | |
def forward(self, x): | |
for module in self.module_list: | |
h = x | |
for res in module: | |
h = res(h) | |
x = x + h if self.shortcut else h | |
return x | |
class RFBblock(nn.Module): | |
def __init__(self, in_ch, residual=False): | |
super(RFBblock, self).__init__() | |
inter_c = in_ch // 4 | |
self.branch_0 = nn.Sequential( | |
nn.Conv2d(in_channels=in_ch, out_channels=inter_c, kernel_size=1, stride=1, padding=0), | |
) | |
self.branch_1 = nn.Sequential( | |
nn.Conv2d(in_channels=in_ch, out_channels=inter_c, kernel_size=1, stride=1, padding=0), | |
nn.Conv2d(in_channels=inter_c, out_channels=inter_c, kernel_size=3, stride=1, padding=1), | |
) | |
self.branch_2 = nn.Sequential( | |
nn.Conv2d(in_channels=in_ch, out_channels=inter_c, kernel_size=1, stride=1, padding=0), | |
nn.Conv2d(in_channels=inter_c, out_channels=inter_c, kernel_size=3, stride=1, padding=1), | |
nn.Conv2d(in_channels=inter_c, out_channels=inter_c, kernel_size=3, stride=1, dilation=2, padding=2), | |
) | |
self.branch_3 = nn.Sequential( | |
nn.Conv2d(in_channels=in_ch, out_channels=inter_c, kernel_size=1, stride=1, padding=0), | |
nn.Conv2d(in_channels=inter_c, out_channels=inter_c, kernel_size=5, stride=1, padding=2), | |
nn.Conv2d(in_channels=inter_c, out_channels=inter_c, kernel_size=3, stride=1, dilation=3, padding=3), | |
) | |
self.residual = residual | |
def forward(self, x): | |
x_0 = self.branch_0(x) | |
x_1 = self.branch_1(x) | |
x_2 = self.branch_2(x) | |
x_3 = self.branch_3(x) | |
out = torch.cat((x_0, x_1, x_2, x_3), 1) | |
if self.residual: | |
out += x | |
return out | |
class FeatureAdaption(nn.Module): | |
def __init__(self, in_ch, out_ch, n_anchors, rfb=False, sep=False): | |
super(FeatureAdaption, self).__init__() | |
if sep: | |
self.sep = True | |
else: | |
self.sep = False | |
self.conv_offset = nn.Conv2d( | |
in_channels=2 * n_anchors, | |
out_channels=2 * 9 * n_anchors, | |
groups=n_anchors, | |
kernel_size=1, | |
stride=1, | |
padding=0, | |
) | |
self.dconv = DeformConv2d( | |
in_channels=in_ch, out_channels=out_ch, kernel_size=3, stride=1, padding=1, deformable_groups=n_anchors | |
) | |
self.rfb = None | |
if rfb: | |
self.rfb = RFBblock(out_ch) | |
def forward(self, input, wh_pred): | |
# The RFB block is added behind FeatureAdaption | |
# For mobilenet, we currently don't support rfb and FeatureAdaption | |
if self.sep: | |
return input | |
if self.rfb is not None: | |
input = self.rfb(input) | |
wh_pred_new = wh_pred.detach() | |
offset = self.conv_offset(wh_pred_new) | |
out = self.dconv(input, offset) | |
return out | |
class ASFFmobile(nn.Module): | |
def __init__(self, level, rfb=False, vis=False): | |
super(ASFFmobile, self).__init__() | |
self.level = level | |
self.dim = [512, 256, 128] | |
self.inter_dim = self.dim[self.level] | |
if level == 0: | |
self.stride_level_1 = add_conv(256, self.inter_dim, 3, 2, leaky=False) | |
self.stride_level_2 = add_conv(128, self.inter_dim, 3, 2, leaky=False) | |
self.expand = add_conv(self.inter_dim, 1024, 3, 1, leaky=False) | |
elif level == 1: | |
self.compress_level_0 = add_conv(512, self.inter_dim, 1, 1, leaky=False) | |
self.stride_level_2 = add_conv(128, self.inter_dim, 3, 2, leaky=False) | |
self.expand = add_conv(self.inter_dim, 512, 3, 1, leaky=False) | |
elif level == 2: | |
self.compress_level_0 = add_conv(512, self.inter_dim, 1, 1, leaky=False) | |
self.compress_level_1 = add_conv(256, self.inter_dim, 1, 1, leaky=False) | |
self.expand = add_conv(self.inter_dim, 256, 3, 1, leaky=False) | |
compress_c = 8 if rfb else 16 # when adding rfb, we use half number of channels to save memory | |
self.weight_level_0 = add_conv(self.inter_dim, compress_c, 1, 1, leaky=False) | |
self.weight_level_1 = add_conv(self.inter_dim, compress_c, 1, 1, leaky=False) | |
self.weight_level_2 = add_conv(self.inter_dim, compress_c, 1, 1, leaky=False) | |
self.weight_levels = nn.Conv2d(compress_c * 3, 3, kernel_size=1, stride=1, padding=0) | |
self.vis = vis | |
def forward(self, x_level_0, x_level_1, x_level_2): | |
if self.level == 0: | |
level_0_resized = x_level_0 | |
level_1_resized = self.stride_level_1(x_level_1) | |
level_2_downsampled_inter = F.max_pool2d(x_level_2, 3, stride=2, padding=1) | |
level_2_resized = self.stride_level_2(level_2_downsampled_inter) | |
elif self.level == 1: | |
level_0_compressed = self.compress_level_0(x_level_0) | |
level_0_resized = F.interpolate(level_0_compressed, scale_factor=2, mode="nearest") | |
level_1_resized = x_level_1 | |
level_2_resized = self.stride_level_2(x_level_2) | |
elif self.level == 2: | |
level_0_compressed = self.compress_level_0(x_level_0) | |
level_0_resized = F.interpolate(level_0_compressed, scale_factor=4, mode="nearest") | |
level_1_compressed = self.compress_level_1(x_level_1) | |
level_1_resized = F.interpolate(level_1_compressed, scale_factor=2, mode="nearest") | |
level_2_resized = x_level_2 | |
level_0_weight_v = self.weight_level_0(level_0_resized) | |
level_1_weight_v = self.weight_level_1(level_1_resized) | |
level_2_weight_v = self.weight_level_2(level_2_resized) | |
levels_weight_v = torch.cat((level_0_weight_v, level_1_weight_v, level_2_weight_v), 1) | |
levels_weight = self.weight_levels(levels_weight_v) | |
levels_weight = F.softmax(levels_weight, dim=1) | |
fused_out_reduced = ( | |
level_0_resized * levels_weight[:, 0:1, :, :] | |
+ level_1_resized * levels_weight[:, 1:2, :, :] | |
+ level_2_resized * levels_weight[:, 2:, :, :] | |
) | |
out = self.expand(fused_out_reduced) | |
if self.vis: | |
return out, levels_weight, fused_out_reduced.sum(dim=1) | |
else: | |
return out | |
class ASFF(nn.Module): | |
def __init__(self, level, rfb=False, vis=False): | |
super(ASFF, self).__init__() | |
self.level = level | |
self.dim = [512, 256, 256] | |
self.inter_dim = self.dim[self.level] | |
if level == 0: | |
self.stride_level_1 = add_conv(256, self.inter_dim, 3, 2) | |
self.stride_level_2 = add_conv(256, self.inter_dim, 3, 2) | |
self.expand = add_conv(self.inter_dim, 1024, 3, 1) | |
elif level == 1: | |
self.compress_level_0 = add_conv(512, self.inter_dim, 1, 1) | |
self.stride_level_2 = add_conv(256, self.inter_dim, 3, 2) | |
self.expand = add_conv(self.inter_dim, 512, 3, 1) | |
elif level == 2: | |
self.compress_level_0 = add_conv(512, self.inter_dim, 1, 1) | |
self.expand = add_conv(self.inter_dim, 256, 3, 1) | |
compress_c = 8 if rfb else 16 # when adding rfb, we use half number of channels to save memory | |
self.weight_level_0 = add_conv(self.inter_dim, compress_c, 1, 1) | |
self.weight_level_1 = add_conv(self.inter_dim, compress_c, 1, 1) | |
self.weight_level_2 = add_conv(self.inter_dim, compress_c, 1, 1) | |
self.weight_levels = nn.Conv2d(compress_c * 3, 3, kernel_size=1, stride=1, padding=0) | |
self.vis = vis | |
def forward(self, x_level_0, x_level_1, x_level_2): | |
if self.level == 0: | |
level_0_resized = x_level_0 | |
level_1_resized = self.stride_level_1(x_level_1) | |
level_2_downsampled_inter = F.max_pool2d(x_level_2, 3, stride=2, padding=1) | |
level_2_resized = self.stride_level_2(level_2_downsampled_inter) | |
elif self.level == 1: | |
level_0_compressed = self.compress_level_0(x_level_0) | |
level_0_resized = F.interpolate(level_0_compressed, scale_factor=2, mode="nearest") | |
level_1_resized = x_level_1 | |
level_2_resized = self.stride_level_2(x_level_2) | |
elif self.level == 2: | |
level_0_compressed = self.compress_level_0(x_level_0) | |
level_0_resized = F.interpolate(level_0_compressed, scale_factor=4, mode="nearest") | |
level_1_resized = F.interpolate(x_level_1, scale_factor=2, mode="nearest") | |
level_2_resized = x_level_2 | |
level_0_weight_v = self.weight_level_0(level_0_resized) | |
level_1_weight_v = self.weight_level_1(level_1_resized) | |
level_2_weight_v = self.weight_level_2(level_2_resized) | |
levels_weight_v = torch.cat((level_0_weight_v, level_1_weight_v, level_2_weight_v), 1) | |
levels_weight = self.weight_levels(levels_weight_v) | |
levels_weight = F.softmax(levels_weight, dim=1) | |
fused_out_reduced = ( | |
level_0_resized * levels_weight[:, 0:1, :, :] | |
+ level_1_resized * levels_weight[:, 1:2, :, :] | |
+ level_2_resized * levels_weight[:, 2:, :, :] | |
) | |
out = self.expand(fused_out_reduced) | |
if self.vis: | |
return out, levels_weight, fused_out_reduced.sum(dim=1) | |
else: | |
return out | |
def make_divisible(v, divisor, min_value=None): | |
""" | |
This function is taken from the original tf repo. | |
It ensures that all layers have a channel number that is divisible by 8 | |
It can be seen here: | |
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py | |
:param v: | |
:param divisor: | |
:param min_value: | |
:return: | |
""" | |
if min_value is None: | |
min_value = divisor | |
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) | |
# Make sure that round down does not go down by more than 10%. | |
if new_v < 0.9 * v: | |
new_v += divisor | |
return new_v | |
class ConvBNReLU(nn.Sequential): | |
def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1): | |
padding = (kernel_size - 1) // 2 | |
super(ConvBNReLU, self).__init__( | |
nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False), | |
nn.BatchNorm2d(out_planes), | |
nn.ReLU6(inplace=True), | |
) | |
def add_sepconv(in_ch, out_ch, ksize, stride): | |
stage = nn.Sequential() | |
pad = (ksize - 1) // 2 | |
stage.add_module( | |
"sepconv", | |
nn.Conv2d( | |
in_channels=in_ch, | |
out_channels=in_ch, | |
kernel_size=ksize, | |
stride=stride, | |
padding=pad, | |
groups=in_ch, | |
bias=False, | |
), | |
) | |
stage.add_module("sepbn", nn.BatchNorm2d(in_ch)) | |
stage.add_module("seprelu6", nn.ReLU6(inplace=True)) | |
stage.add_module("ptconv", nn.Conv2d(in_ch, out_ch, 1, 1, 0, bias=False)) | |
stage.add_module("ptbn", nn.BatchNorm2d(out_ch)) | |
stage.add_module("ptrelu6", nn.ReLU6(inplace=True)) | |
return stage | |
class InvertedResidual(nn.Module): | |
def __init__(self, inp, oup, stride, expand_ratio): | |
super(InvertedResidual, self).__init__() | |
self.stride = stride | |
assert stride in [1, 2] | |
hidden_dim = int(round(inp * expand_ratio)) | |
self.use_res_connect = self.stride == 1 and inp == oup | |
layers = [] | |
if expand_ratio != 1: | |
# pw | |
layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1)) | |
layers.extend( | |
[ | |
# dw | |
ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim), | |
# pw-linear | |
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), | |
nn.BatchNorm2d(oup), | |
] | |
) | |
self.conv = nn.Sequential(*layers) | |
def forward(self, x): | |
if self.use_res_connect: | |
return x + self.conv(x) | |
else: | |
return self.conv(x) | |
class ressepblock(nn.Module): | |
def __init__(self, ch, out_ch, in_ch=None, shortcut=True): | |
super().__init__() | |
self.shortcut = shortcut | |
self.module_list = nn.ModuleList() | |
in_ch = ch // 2 if in_ch == None else in_ch | |
resblock_one = nn.ModuleList() | |
resblock_one.append(add_conv(ch, in_ch, 1, 1, leaky=False)) | |
resblock_one.append(add_conv(in_ch, out_ch, 3, 1, leaky=False)) | |
self.module_list.append(resblock_one) | |
def forward(self, x): | |
for module in self.module_list: | |
h = x | |
for res in module: | |
h = res(h) | |
x = x + h if self.shortcut else h | |
return x | |