from model import common # from model import attention import torch.nn as nn from lambda_networks import LambdaLayer def make_model(args, parent=False): if args.dilation: from model import dilated return LAMBDAEDSR(args, dilated.dilated_conv) else: return LAMBDAEDSR(args) class MCheck(nn.Module): def __init__(self): super(MCheck, self).__init__() self.kill = False self.shape = None def forward(self, x): if self.kill: raise NotImplementedError("Shape checked here", self.shape, x.shape) self.kill = True self.shape = x.shape return x class LAMBDAEDSR(nn.Module): def __init__(self, args, conv=common.default_conv): super(LAMBDAEDSR, self).__init__() n_resblock = args.n_resblocks n_feats = args.n_feats kernel_size = 3 scale = args.scale[0] act = nn.ReLU(True) rgb_mean = (0.4488, 0.4371, 0.4040) rgb_std = (1.0, 1.0, 1.0) self.sub_mean = common.MeanShift(args.rgb_range, rgb_mean, rgb_std) # self.msa = attention.PyramidAttention(channel=256, reduction=8,res_scale=args.res_scale); self.lambda_layer = LambdaLayer(dim=256, dim_out=256, r=25, dim_k=16, heads=4, dim_u=4) # define head module m_head = [conv(args.n_colors, n_feats, kernel_size)] # define body module m_body = [ common.ResBlock( conv, n_feats, kernel_size, act=act, res_scale=args.res_scale ) for _ in range(n_resblock // 2) ] # mcheck = MCheck() # m_body.append(mcheck) m_body.append(self.lambda_layer) # m_body.append(mcheck) for _ in range(n_resblock // 2): m_body.append(common.ResBlock( conv, n_feats, kernel_size, act=act, res_scale=args.res_scale )) m_body.append(conv(n_feats, n_feats, kernel_size)) # define tail module m_tail = [ common.Upsampler(conv, scale, n_feats, act=False), nn.Conv2d( n_feats, args.n_colors, kernel_size, padding=(kernel_size // 2) ) ] self.add_mean = common.MeanShift(args.rgb_range, rgb_mean, rgb_std, 1) self.head = nn.Sequential(*m_head) self.body = nn.Sequential(*m_body) self.tail = nn.Sequential(*m_tail) def forward(self, x): x = self.sub_mean(x) x = self.head(x) res = self.body(x) res += x x = self.tail(res) x = self.add_mean(x) return x def load_state_dict(self, state_dict, strict=True): own_state = self.state_dict() for name, param in state_dict.items(): if name in own_state: if isinstance(param, nn.Parameter): param = param.data try: own_state[name].copy_(param) except Exception: if name.find('tail') == -1: raise RuntimeError('While copying the parameter named {}, ' 'whose dimensions in the model are {} and ' 'whose dimensions in the checkpoint are {}.' .format(name, own_state[name].size(), param.size())) elif strict: if name.find('tail') == -1: raise KeyError('unexpected key "{}" in state_dict' .format(name))