|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
from torch.autograd import Function
|
|
from torch.nn.modules.module import Module
|
|
|
|
from ..cnn import UPSAMPLE_LAYERS, normal_init, xavier_init
|
|
from ..utils import ext_loader
|
|
|
|
ext_module = ext_loader.load_ext('_ext', [
|
|
'carafe_naive_forward', 'carafe_naive_backward', 'carafe_forward',
|
|
'carafe_backward'
|
|
])
|
|
|
|
|
|
class CARAFENaiveFunction(Function):
|
|
|
|
@staticmethod
|
|
def symbolic(g, features, masks, kernel_size, group_size, scale_factor):
|
|
return g.op(
|
|
'mmcv::MMCVCARAFENaive',
|
|
features,
|
|
masks,
|
|
kernel_size_i=kernel_size,
|
|
group_size_i=group_size,
|
|
scale_factor_f=scale_factor)
|
|
|
|
@staticmethod
|
|
def forward(ctx, features, masks, kernel_size, group_size, scale_factor):
|
|
assert scale_factor >= 1
|
|
assert masks.size(1) == kernel_size * kernel_size * group_size
|
|
assert masks.size(-1) == features.size(-1) * scale_factor
|
|
assert masks.size(-2) == features.size(-2) * scale_factor
|
|
assert features.size(1) % group_size == 0
|
|
assert (kernel_size - 1) % 2 == 0 and kernel_size >= 1
|
|
ctx.kernel_size = kernel_size
|
|
ctx.group_size = group_size
|
|
ctx.scale_factor = scale_factor
|
|
ctx.feature_size = features.size()
|
|
ctx.mask_size = masks.size()
|
|
|
|
n, c, h, w = features.size()
|
|
output = features.new_zeros((n, c, h * scale_factor, w * scale_factor))
|
|
ext_module.carafe_naive_forward(
|
|
features,
|
|
masks,
|
|
output,
|
|
kernel_size=kernel_size,
|
|
group_size=group_size,
|
|
scale_factor=scale_factor)
|
|
|
|
if features.requires_grad or masks.requires_grad:
|
|
ctx.save_for_backward(features, masks)
|
|
return output
|
|
|
|
@staticmethod
|
|
def backward(ctx, grad_output):
|
|
assert grad_output.is_cuda
|
|
|
|
features, masks = ctx.saved_tensors
|
|
kernel_size = ctx.kernel_size
|
|
group_size = ctx.group_size
|
|
scale_factor = ctx.scale_factor
|
|
|
|
grad_input = torch.zeros_like(features)
|
|
grad_masks = torch.zeros_like(masks)
|
|
ext_module.carafe_naive_backward(
|
|
grad_output.contiguous(),
|
|
features,
|
|
masks,
|
|
grad_input,
|
|
grad_masks,
|
|
kernel_size=kernel_size,
|
|
group_size=group_size,
|
|
scale_factor=scale_factor)
|
|
|
|
return grad_input, grad_masks, None, None, None
|
|
|
|
|
|
carafe_naive = CARAFENaiveFunction.apply
|
|
|
|
|
|
class CARAFENaive(Module):
|
|
|
|
def __init__(self, kernel_size, group_size, scale_factor):
|
|
super(CARAFENaive, self).__init__()
|
|
|
|
assert isinstance(kernel_size, int) and isinstance(
|
|
group_size, int) and isinstance(scale_factor, int)
|
|
self.kernel_size = kernel_size
|
|
self.group_size = group_size
|
|
self.scale_factor = scale_factor
|
|
|
|
def forward(self, features, masks):
|
|
return carafe_naive(features, masks, self.kernel_size, self.group_size,
|
|
self.scale_factor)
|
|
|
|
|
|
class CARAFEFunction(Function):
|
|
|
|
@staticmethod
|
|
def symbolic(g, features, masks, kernel_size, group_size, scale_factor):
|
|
return g.op(
|
|
'mmcv::MMCVCARAFE',
|
|
features,
|
|
masks,
|
|
kernel_size_i=kernel_size,
|
|
group_size_i=group_size,
|
|
scale_factor_f=scale_factor)
|
|
|
|
@staticmethod
|
|
def forward(ctx, features, masks, kernel_size, group_size, scale_factor):
|
|
assert scale_factor >= 1
|
|
assert masks.size(1) == kernel_size * kernel_size * group_size
|
|
assert masks.size(-1) == features.size(-1) * scale_factor
|
|
assert masks.size(-2) == features.size(-2) * scale_factor
|
|
assert features.size(1) % group_size == 0
|
|
assert (kernel_size - 1) % 2 == 0 and kernel_size >= 1
|
|
ctx.kernel_size = kernel_size
|
|
ctx.group_size = group_size
|
|
ctx.scale_factor = scale_factor
|
|
ctx.feature_size = features.size()
|
|
ctx.mask_size = masks.size()
|
|
|
|
n, c, h, w = features.size()
|
|
output = features.new_zeros((n, c, h * scale_factor, w * scale_factor))
|
|
routput = features.new_zeros(output.size(), requires_grad=False)
|
|
rfeatures = features.new_zeros(features.size(), requires_grad=False)
|
|
rmasks = masks.new_zeros(masks.size(), requires_grad=False)
|
|
ext_module.carafe_forward(
|
|
features,
|
|
masks,
|
|
rfeatures,
|
|
routput,
|
|
rmasks,
|
|
output,
|
|
kernel_size=kernel_size,
|
|
group_size=group_size,
|
|
scale_factor=scale_factor)
|
|
|
|
if features.requires_grad or masks.requires_grad:
|
|
ctx.save_for_backward(features, masks, rfeatures)
|
|
return output
|
|
|
|
@staticmethod
|
|
def backward(ctx, grad_output):
|
|
assert grad_output.is_cuda
|
|
|
|
features, masks, rfeatures = ctx.saved_tensors
|
|
kernel_size = ctx.kernel_size
|
|
group_size = ctx.group_size
|
|
scale_factor = ctx.scale_factor
|
|
|
|
rgrad_output = torch.zeros_like(grad_output, requires_grad=False)
|
|
rgrad_input_hs = torch.zeros_like(grad_output, requires_grad=False)
|
|
rgrad_input = torch.zeros_like(features, requires_grad=False)
|
|
rgrad_masks = torch.zeros_like(masks, requires_grad=False)
|
|
grad_input = torch.zeros_like(features, requires_grad=False)
|
|
grad_masks = torch.zeros_like(masks, requires_grad=False)
|
|
ext_module.carafe_backward(
|
|
grad_output.contiguous(),
|
|
rfeatures,
|
|
masks,
|
|
rgrad_output,
|
|
rgrad_input_hs,
|
|
rgrad_input,
|
|
rgrad_masks,
|
|
grad_input,
|
|
grad_masks,
|
|
kernel_size=kernel_size,
|
|
group_size=group_size,
|
|
scale_factor=scale_factor)
|
|
return grad_input, grad_masks, None, None, None
|
|
|
|
|
|
carafe = CARAFEFunction.apply
|
|
|
|
|
|
class CARAFE(Module):
|
|
""" CARAFE: Content-Aware ReAssembly of FEatures
|
|
|
|
Please refer to https://arxiv.org/abs/1905.02188 for more details.
|
|
|
|
Args:
|
|
kernel_size (int): reassemble kernel size
|
|
group_size (int): reassemble group size
|
|
scale_factor (int): upsample ratio
|
|
|
|
Returns:
|
|
upsampled feature map
|
|
"""
|
|
|
|
def __init__(self, kernel_size, group_size, scale_factor):
|
|
super(CARAFE, self).__init__()
|
|
|
|
assert isinstance(kernel_size, int) and isinstance(
|
|
group_size, int) and isinstance(scale_factor, int)
|
|
self.kernel_size = kernel_size
|
|
self.group_size = group_size
|
|
self.scale_factor = scale_factor
|
|
|
|
def forward(self, features, masks):
|
|
return carafe(features, masks, self.kernel_size, self.group_size,
|
|
self.scale_factor)
|
|
|
|
|
|
@UPSAMPLE_LAYERS.register_module(name='carafe')
|
|
class CARAFEPack(nn.Module):
|
|
"""A unified package of CARAFE upsampler that contains: 1) channel
|
|
compressor 2) content encoder 3) CARAFE op.
|
|
|
|
Official implementation of ICCV 2019 paper
|
|
CARAFE: Content-Aware ReAssembly of FEatures
|
|
Please refer to https://arxiv.org/abs/1905.02188 for more details.
|
|
|
|
Args:
|
|
channels (int): input feature channels
|
|
scale_factor (int): upsample ratio
|
|
up_kernel (int): kernel size of CARAFE op
|
|
up_group (int): group size of CARAFE op
|
|
encoder_kernel (int): kernel size of content encoder
|
|
encoder_dilation (int): dilation of content encoder
|
|
compressed_channels (int): output channels of channels compressor
|
|
|
|
Returns:
|
|
upsampled feature map
|
|
"""
|
|
|
|
def __init__(self,
|
|
channels,
|
|
scale_factor,
|
|
up_kernel=5,
|
|
up_group=1,
|
|
encoder_kernel=3,
|
|
encoder_dilation=1,
|
|
compressed_channels=64):
|
|
super(CARAFEPack, self).__init__()
|
|
self.channels = channels
|
|
self.scale_factor = scale_factor
|
|
self.up_kernel = up_kernel
|
|
self.up_group = up_group
|
|
self.encoder_kernel = encoder_kernel
|
|
self.encoder_dilation = encoder_dilation
|
|
self.compressed_channels = compressed_channels
|
|
self.channel_compressor = nn.Conv2d(channels, self.compressed_channels,
|
|
1)
|
|
self.content_encoder = nn.Conv2d(
|
|
self.compressed_channels,
|
|
self.up_kernel * self.up_kernel * self.up_group *
|
|
self.scale_factor * self.scale_factor,
|
|
self.encoder_kernel,
|
|
padding=int((self.encoder_kernel - 1) * self.encoder_dilation / 2),
|
|
dilation=self.encoder_dilation,
|
|
groups=1)
|
|
self.init_weights()
|
|
|
|
def init_weights(self):
|
|
for m in self.modules():
|
|
if isinstance(m, nn.Conv2d):
|
|
xavier_init(m, distribution='uniform')
|
|
normal_init(self.content_encoder, std=0.001)
|
|
|
|
def kernel_normalizer(self, mask):
|
|
mask = F.pixel_shuffle(mask, self.scale_factor)
|
|
n, mask_c, h, w = mask.size()
|
|
|
|
|
|
mask_channel = int(mask_c / float(self.up_kernel**2))
|
|
mask = mask.view(n, mask_channel, -1, h, w)
|
|
|
|
mask = F.softmax(mask, dim=2, dtype=mask.dtype)
|
|
mask = mask.view(n, mask_c, h, w).contiguous()
|
|
|
|
return mask
|
|
|
|
def feature_reassemble(self, x, mask):
|
|
x = carafe(x, mask, self.up_kernel, self.up_group, self.scale_factor)
|
|
return x
|
|
|
|
def forward(self, x):
|
|
compressed_x = self.channel_compressor(x)
|
|
mask = self.content_encoder(compressed_x)
|
|
mask = self.kernel_normalizer(mask)
|
|
|
|
x = self.feature_reassemble(x, mask)
|
|
return x
|
|
|