import torch.nn as nn import torch.utils.checkpoint as cp from mmcv.cnn import build_conv_layer, build_norm_layer, constant_init from ..builder import BACKBONES from .resnet import Bottleneck as _Bottleneck from .resnet import ResNet class Bottleneck(_Bottleneck): r"""Bottleneck for the ResNet backbone in `DetectoRS `_. This bottleneck allows the users to specify whether to use SAC (Switchable Atrous Convolution) and RFP (Recursive Feature Pyramid). Args: inplanes (int): The number of input channels. planes (int): The number of output channels before expansion. rfp_inplanes (int, optional): The number of channels from RFP. Default: None. If specified, an additional conv layer will be added for ``rfp_feat``. Otherwise, the structure is the same as base class. sac (dict, optional): Dictionary to construct SAC. Default: None. """ expansion = 4 def __init__(self, inplanes, planes, rfp_inplanes=None, sac=None, **kwargs): super(Bottleneck, self).__init__(inplanes, planes, **kwargs) assert sac is None or isinstance(sac, dict) self.sac = sac self.with_sac = sac is not None if self.with_sac: self.conv2 = build_conv_layer( self.sac, planes, planes, kernel_size=3, stride=self.conv2_stride, padding=self.dilation, dilation=self.dilation, bias=False) self.rfp_inplanes = rfp_inplanes if self.rfp_inplanes: self.rfp_conv = build_conv_layer( None, self.rfp_inplanes, planes * self.expansion, 1, stride=1, bias=True) self.init_weights() def init_weights(self): """Initialize the weights.""" if self.rfp_inplanes: constant_init(self.rfp_conv, 0) def rfp_forward(self, x, rfp_feat): """The forward function that also takes the RFP features as input.""" def _inner_forward(x): identity = x out = self.conv1(x) out = self.norm1(out) out = self.relu(out) if self.with_plugins: out = self.forward_plugin(out, self.after_conv1_plugin_names) out = self.conv2(out) out = self.norm2(out) out = self.relu(out) if self.with_plugins: out = self.forward_plugin(out, self.after_conv2_plugin_names) out = self.conv3(out) out = self.norm3(out) if self.with_plugins: out = self.forward_plugin(out, self.after_conv3_plugin_names) if self.downsample is not None: identity = self.downsample(x) out += identity return out if self.with_cp and x.requires_grad: out = cp.checkpoint(_inner_forward, x) else: out = _inner_forward(x) if self.rfp_inplanes: rfp_feat = self.rfp_conv(rfp_feat) out = out + rfp_feat out = self.relu(out) return out class ResLayer(nn.Sequential): """ResLayer to build ResNet style backbone for RPF in detectoRS. The difference between this module and base class is that we pass ``rfp_inplanes`` to the first block. Args: block (nn.Module): block used to build ResLayer. inplanes (int): inplanes of block. planes (int): planes of block. num_blocks (int): number of blocks. stride (int): stride of the first block. Default: 1 avg_down (bool): Use AvgPool instead of stride conv when downsampling in the bottleneck. Default: False conv_cfg (dict): dictionary to construct and config conv layer. Default: None norm_cfg (dict): dictionary to construct and config norm layer. Default: dict(type='BN') downsample_first (bool): Downsample at the first block or last block. False for Hourglass, True for ResNet. Default: True rfp_inplanes (int, optional): The number of channels from RFP. Default: None. If specified, an additional conv layer will be added for ``rfp_feat``. Otherwise, the structure is the same as base class. """ def __init__(self, block, inplanes, planes, num_blocks, stride=1, avg_down=False, conv_cfg=None, norm_cfg=dict(type='BN'), downsample_first=True, rfp_inplanes=None, **kwargs): self.block = block assert downsample_first, f'downsample_first={downsample_first} is ' \ 'not supported in DetectoRS' downsample = None if stride != 1 or inplanes != planes * block.expansion: downsample = [] conv_stride = stride if avg_down and stride != 1: conv_stride = 1 downsample.append( nn.AvgPool2d( kernel_size=stride, stride=stride, ceil_mode=True, count_include_pad=False)) downsample.extend([ build_conv_layer( conv_cfg, inplanes, planes * block.expansion, kernel_size=1, stride=conv_stride, bias=False), build_norm_layer(norm_cfg, planes * block.expansion)[1] ]) downsample = nn.Sequential(*downsample) layers = [] layers.append( block( inplanes=inplanes, planes=planes, stride=stride, downsample=downsample, conv_cfg=conv_cfg, norm_cfg=norm_cfg, rfp_inplanes=rfp_inplanes, **kwargs)) inplanes = planes * block.expansion for _ in range(1, num_blocks): layers.append( block( inplanes=inplanes, planes=planes, stride=1, conv_cfg=conv_cfg, norm_cfg=norm_cfg, **kwargs)) super(ResLayer, self).__init__(*layers) @BACKBONES.register_module() class DetectoRS_ResNet(ResNet): """ResNet backbone for DetectoRS. Args: sac (dict, optional): Dictionary to construct SAC (Switchable Atrous Convolution). Default: None. stage_with_sac (list): Which stage to use sac. Default: (False, False, False, False). rfp_inplanes (int, optional): The number of channels from RFP. Default: None. If specified, an additional conv layer will be added for ``rfp_feat``. Otherwise, the structure is the same as base class. output_img (bool): If ``True``, the input image will be inserted into the starting position of output. Default: False. pretrained (str, optional): The pretrained model to load. """ arch_settings = { 50: (Bottleneck, (3, 4, 6, 3)), 101: (Bottleneck, (3, 4, 23, 3)), 152: (Bottleneck, (3, 8, 36, 3)) } def __init__(self, sac=None, stage_with_sac=(False, False, False, False), rfp_inplanes=None, output_img=False, pretrained=None, **kwargs): self.sac = sac self.stage_with_sac = stage_with_sac self.rfp_inplanes = rfp_inplanes self.output_img = output_img self.pretrained = pretrained super(DetectoRS_ResNet, self).__init__(**kwargs) self.inplanes = self.stem_channels self.res_layers = [] for i, num_blocks in enumerate(self.stage_blocks): stride = self.strides[i] dilation = self.dilations[i] dcn = self.dcn if self.stage_with_dcn[i] else None sac = self.sac if self.stage_with_sac[i] else None if self.plugins is not None: stage_plugins = self.make_stage_plugins(self.plugins, i) else: stage_plugins = None planes = self.base_channels * 2**i res_layer = self.make_res_layer( block=self.block, inplanes=self.inplanes, planes=planes, num_blocks=num_blocks, stride=stride, dilation=dilation, style=self.style, avg_down=self.avg_down, with_cp=self.with_cp, conv_cfg=self.conv_cfg, norm_cfg=self.norm_cfg, dcn=dcn, sac=sac, rfp_inplanes=rfp_inplanes if i > 0 else None, plugins=stage_plugins) self.inplanes = planes * self.block.expansion layer_name = f'layer{i + 1}' self.add_module(layer_name, res_layer) self.res_layers.append(layer_name) self._freeze_stages() def make_res_layer(self, **kwargs): """Pack all blocks in a stage into a ``ResLayer`` for DetectoRS.""" return ResLayer(**kwargs) def forward(self, x): """Forward function.""" outs = list(super(DetectoRS_ResNet, self).forward(x)) if self.output_img: outs.insert(0, x) return tuple(outs) def rfp_forward(self, x, rfp_feats): """Forward function for RFP.""" if self.deep_stem: x = self.stem(x) else: x = self.conv1(x) x = self.norm1(x) x = self.relu(x) x = self.maxpool(x) outs = [] for i, layer_name in enumerate(self.res_layers): res_layer = getattr(self, layer_name) rfp_feat = rfp_feats[i] if i > 0 else None for layer in res_layer: x = layer.rfp_forward(x, rfp_feat) if i in self.out_indices: outs.append(x) return tuple(outs)