Spaces:
Running
on
Zero
Running
on
Zero
# Densenet decoder encoder with intermediate fully connected layers and dropout | |
import torch | |
import torch.backends.cudnn as cudnn | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import functools | |
from torch.autograd import gradcheck | |
from torch.autograd import Function | |
from torch.autograd import Variable | |
from torch.autograd import gradcheck | |
from torch.autograd import Function | |
import numpy as np | |
def add_coordConv_channels(t): | |
n,c,h,w=t.size() | |
xx_channel=np.ones((h, w)) | |
xx_range=np.array(range(h)) | |
xx_range=np.expand_dims(xx_range,-1) | |
xx_coord=xx_channel*xx_range | |
yy_coord=xx_coord.transpose() | |
xx_coord=xx_coord/(h-1) | |
yy_coord=yy_coord/(h-1) | |
xx_coord=xx_coord*2 - 1 | |
yy_coord=yy_coord*2 - 1 | |
xx_coord=torch.from_numpy(xx_coord).float() | |
yy_coord=torch.from_numpy(yy_coord).float() | |
if t.is_cuda: | |
xx_coord=xx_coord.cuda() | |
yy_coord=yy_coord.cuda() | |
xx_coord=xx_coord.unsqueeze(0).unsqueeze(0).repeat(n,1,1,1) | |
yy_coord=yy_coord.unsqueeze(0).unsqueeze(0).repeat(n,1,1,1) | |
t_cc=torch.cat((t,xx_coord,yy_coord),dim=1) | |
return t_cc | |
class DenseBlockEncoder(nn.Module): | |
def __init__(self, n_channels, n_convs, activation=nn.ReLU, args=[False]): | |
super(DenseBlockEncoder, self).__init__() | |
assert(n_convs > 0) | |
self.n_channels = n_channels | |
self.n_convs = n_convs | |
self.layers = nn.ModuleList() | |
for i in range(n_convs): | |
self.layers.append(nn.Sequential( | |
nn.BatchNorm2d(n_channels), | |
activation(*args), | |
nn.Conv2d(n_channels, n_channels, 3, stride=1, padding=1, bias=False),)) | |
def forward(self, inputs): | |
outputs = [] | |
for i, layer in enumerate(self.layers): | |
if i > 0: | |
next_output = 0 | |
for no in outputs: | |
next_output = next_output + no | |
outputs.append(next_output) | |
else: | |
outputs.append(layer(inputs)) | |
return outputs[-1] | |
# Dense block in encoder. | |
class DenseBlockDecoder(nn.Module): | |
def __init__(self, n_channels, n_convs, activation=nn.ReLU, args=[False]): | |
super(DenseBlockDecoder, self).__init__() | |
assert(n_convs > 0) | |
self.n_channels = n_channels | |
self.n_convs = n_convs | |
self.layers = nn.ModuleList() | |
for i in range(n_convs): | |
self.layers.append(nn.Sequential( | |
nn.BatchNorm2d(n_channels), | |
activation(*args), | |
nn.ConvTranspose2d(n_channels, n_channels, 3, stride=1, padding=1, bias=False),)) | |
def forward(self, inputs): | |
outputs = [] | |
for i, layer in enumerate(self.layers): | |
if i > 0: | |
next_output = 0 | |
for no in outputs: | |
next_output = next_output + no | |
outputs.append(next_output) | |
else: | |
outputs.append(layer(inputs)) | |
return outputs[-1] | |
class DenseTransitionBlockEncoder(nn.Module): | |
def __init__(self, n_channels_in, n_channels_out, mp, activation=nn.ReLU, args=[False]): | |
super(DenseTransitionBlockEncoder, self).__init__() | |
self.n_channels_in = n_channels_in | |
self.n_channels_out = n_channels_out | |
self.mp = mp | |
self.main = nn.Sequential( | |
nn.BatchNorm2d(n_channels_in), | |
activation(*args), | |
nn.Conv2d(n_channels_in, n_channels_out, 1, stride=1, padding=0, bias=False), | |
nn.MaxPool2d(mp), | |
) | |
def forward(self, inputs): | |
# print(inputs.shape,'222222222222222',self.main(inputs).shape) | |
return self.main(inputs) | |
class DenseTransitionBlockDecoder(nn.Module): | |
def __init__(self, n_channels_in, n_channels_out, activation=nn.ReLU, args=[False]): | |
super(DenseTransitionBlockDecoder, self).__init__() | |
self.n_channels_in = n_channels_in | |
self.n_channels_out = n_channels_out | |
self.main = nn.Sequential( | |
nn.BatchNorm2d(n_channels_in), | |
activation(*args), | |
nn.ConvTranspose2d(n_channels_in, n_channels_out, 4, stride=2, padding=1, bias=False), | |
) | |
def forward(self, inputs): | |
# print(inputs.shape,'333333333333',self.main(inputs).shape) | |
return self.main(inputs) | |
## Dense encoders and decoders for image of size 128 128 | |
class waspDenseEncoder128(nn.Module): | |
def __init__(self, nc=1, ndf = 32, ndim = 128, activation=nn.LeakyReLU, args=[0.2, False], f_activation=nn.Tanh, f_args=[]): | |
super(waspDenseEncoder128, self).__init__() | |
self.ndim = ndim | |
self.main = nn.Sequential( | |
# input is (nc) x 128 x 128 | |
nn.BatchNorm2d(nc), | |
nn.ReLU(True), | |
nn.Conv2d(nc, ndf, 4, stride=2, padding=1), | |
# state size. (ndf) x 64 x 64 | |
DenseBlockEncoder(ndf, 6), | |
DenseTransitionBlockEncoder(ndf, ndf*2, 2, activation=activation, args=args), | |
# state size. (ndf*2) x 32 x 32 | |
DenseBlockEncoder(ndf*2, 12), | |
DenseTransitionBlockEncoder(ndf*2, ndf*4, 2, activation=activation, args=args), | |
# state size. (ndf*4) x 16 x 16 | |
DenseBlockEncoder(ndf*4, 16), | |
DenseTransitionBlockEncoder(ndf*4, ndf*8, 2, activation=activation, args=args), | |
# state size. (ndf*4) x 8 x 8 | |
DenseBlockEncoder(ndf*8, 16), | |
DenseTransitionBlockEncoder(ndf*8, ndf*8, 2, activation=activation, args=args), | |
# state size. (ndf*8) x 4 x 4 | |
DenseBlockEncoder(ndf*8, 16), | |
DenseTransitionBlockEncoder(ndf*8, ndim, 4, activation=activation, args=args), | |
f_activation(*f_args), | |
) | |
def forward(self, input): | |
input=add_coordConv_channels(input) | |
output = self.main(input).view(-1,self.ndim) | |
#print(output.size()) | |
return output | |
class waspDenseDecoder128(nn.Module): | |
def __init__(self, nz=128, nc=1, ngf=32, lb=0, ub=1, activation=nn.ReLU, args=[False], f_activation=nn.Hardtanh, f_args=[]): | |
super(waspDenseDecoder128, self).__init__() | |
self.main = nn.Sequential( | |
# input is Z, going into convolution | |
nn.BatchNorm2d(nz), | |
activation(*args), | |
nn.ConvTranspose2d(nz, ngf * 8, 4, 1, 0, bias=False), | |
# state size. (ngf*8) x 4 x 4 | |
DenseBlockDecoder(ngf*8, 16), | |
DenseTransitionBlockDecoder(ngf*8, ngf*8), | |
# state size. (ngf*4) x 8 x 8 | |
DenseBlockDecoder(ngf*8, 16), | |
DenseTransitionBlockDecoder(ngf*8, ngf*4), | |
# state size. (ngf*2) x 16 x 16 | |
DenseBlockDecoder(ngf*4, 12), | |
DenseTransitionBlockDecoder(ngf*4, ngf*2), | |
# state size. (ngf) x 32 x 32 | |
DenseBlockDecoder(ngf*2, 6), | |
DenseTransitionBlockDecoder(ngf*2, ngf), | |
# state size. (ngf) x 64 x 64 | |
DenseBlockDecoder(ngf, 6), | |
DenseTransitionBlockDecoder(ngf, ngf), | |
# state size (ngf) x 128 x 128 | |
nn.BatchNorm2d(ngf), | |
activation(*args), | |
nn.ConvTranspose2d(ngf, nc, 3, stride=1, padding=1, bias=False), | |
f_activation(*f_args), | |
) | |
# self.smooth=nn.Sequential( | |
# nn.Conv2d(nc, nc, 1, stride=1, padding=0, bias=False), | |
# f_activation(*f_args), | |
# ) | |
def forward(self, inputs): | |
# return self.smooth(self.main(inputs)) | |
return self.main(inputs) | |
## Dense encoders and decoders for image of size 512 512 | |
class waspDenseEncoder512(nn.Module): | |
def __init__(self, nc=1, ndf = 32, ndim = 128, activation=nn.LeakyReLU, args=[0.2, False], f_activation=nn.Tanh, f_args=[]): | |
super(waspDenseEncoder512, self).__init__() | |
self.ndim = ndim | |
self.main = nn.Sequential( | |
# input is (nc) x 128 x 128 > *4 | |
nn.BatchNorm2d(nc), | |
nn.ReLU(True), | |
nn.Conv2d(nc, ndf, 4, stride=2, padding=1), | |
# state size. (ndf) x 64 x 64 > *4 | |
DenseBlockEncoder(ndf, 6), | |
DenseTransitionBlockEncoder(ndf, ndf*2, 2, activation=activation, args=args), | |
# state size. (ndf*2) x 32 x 32 > *4 | |
DenseBlockEncoder(ndf*2, 12), | |
DenseTransitionBlockEncoder(ndf*2, ndf*4, 2, activation=activation, args=args), | |
# state size. (ndf*4) x 16 x 16 > *4 | |
DenseBlockEncoder(ndf*4, 16), | |
DenseTransitionBlockEncoder(ndf*4, ndf*8, 2, activation=activation, args=args), | |
# state size. (ndf*8) x 8 x 8 *4 | |
DenseBlockEncoder(ndf*8, 16), | |
DenseTransitionBlockEncoder(ndf*8, ndf*8, 2, activation=activation, args=args), | |
# state size. (ndf*8) x 4 x 4 > *4 | |
DenseBlockEncoder(ndf*8, 16), | |
DenseTransitionBlockEncoder(ndf*8, ndf*8, 4, activation=activation, args=args), | |
f_activation(*f_args), | |
# state size. (ndf*8) x 2 x 2 > *4 | |
DenseBlockEncoder(ndf*8, 16), | |
DenseTransitionBlockEncoder(ndf*8, ndim, 4, activation=activation, args=args), | |
f_activation(*f_args), | |
) | |
def forward(self, input): | |
input=add_coordConv_channels(input) | |
output = self.main(input).view(-1,self.ndim) | |
# output = self.main(input).view(8,-1) | |
# print(input.shape,'---------------------') | |
#print(output.size()) | |
return output | |
class waspDenseDecoder512(nn.Module): | |
def __init__(self, nz=128, nc=1, ngf=32, lb=0, ub=1, activation=nn.ReLU, args=[False], f_activation=nn.Tanh, f_args=[]): | |
super(waspDenseDecoder512, self).__init__() | |
self.main = nn.Sequential( | |
# input is Z, going into convolution | |
nn.BatchNorm2d(nz), | |
activation(*args), | |
nn.ConvTranspose2d(nz, ngf * 8, 4, 1, 0, bias=False), | |
# state size. (ngf*8) x 4 x 4 | |
DenseBlockDecoder(ngf*8, 16), | |
DenseTransitionBlockDecoder(ngf*8, ngf*8), | |
# state size. (ngf*8) x 8 x 8 | |
DenseBlockDecoder(ngf*8, 16), | |
DenseTransitionBlockDecoder(ngf*8, ngf*8), | |
# state size. (ngf*4) x 16 x 16 | |
DenseBlockDecoder(ngf*8, 16), | |
DenseTransitionBlockDecoder(ngf*8, ngf*4), | |
# state size. (ngf*2) x 32 x 32 | |
DenseBlockDecoder(ngf*4, 12), | |
DenseTransitionBlockDecoder(ngf*4, ngf*2), | |
# state size. (ngf) x 64 x 64 | |
DenseBlockDecoder(ngf*2, 6), | |
DenseTransitionBlockDecoder(ngf*2, ngf), | |
# state size. (ngf) x 128 x 128 | |
DenseBlockDecoder(ngf, 6), | |
DenseTransitionBlockDecoder(ngf, ngf), | |
# state size. (ngf) x 256 x 256 | |
DenseBlockDecoder(ngf, 6), | |
DenseTransitionBlockDecoder(ngf, ngf), | |
# state size (ngf) x 512 x 512 | |
nn.BatchNorm2d(ngf), | |
activation(*args), | |
nn.ConvTranspose2d(ngf, nc, 3, stride=1, padding=1, bias=False), | |
f_activation(*f_args), | |
) | |
# self.smooth=nn.Sequential( | |
# nn.Conv2d(nc, nc, 1, stride=1, padding=0, bias=False), | |
# f_activation(*f_args), | |
# ) | |
def forward(self, inputs): | |
# return self.smooth(self.main(inputs)) | |
return self.main(inputs) | |
class dnetccnl(nn.Module): | |
#in_channels -> nc | encoder first layer | |
#filters -> ndf | encoder first layer | |
#img_size(h,w) -> ndim | |
#out_channels -> optical flow (x,y) | |
def __init__(self, img_size=448, in_channels=3, out_channels=2, filters=32,fc_units=100): | |
super(dnetccnl, self).__init__() | |
self.nc=in_channels | |
self.nf=filters | |
self.ndim=img_size | |
self.oc=out_channels | |
self.fcu=fc_units | |
self.encoder=waspDenseEncoder128(nc=self.nc+2,ndf=self.nf,ndim=self.ndim) | |
self.decoder=waspDenseDecoder128(nz=self.ndim,nc=self.oc,ngf=self.nf) | |
# self.fc_layers= nn.Sequential(nn.Linear(self.ndim, self.fcu), | |
# nn.ReLU(True), | |
# nn.Dropout(0.25), | |
# nn.Linear(self.fcu,self.ndim), | |
# nn.ReLU(True), | |
# nn.Dropout(0.25), | |
# ) | |
def forward(self, inputs): | |
encoded=self.encoder(inputs) | |
encoded=encoded.unsqueeze(-1).unsqueeze(-1) | |
decoded=self.decoder(encoded) | |
# print torch.max(decoded) | |
# print torch.min(decoded) | |
# print(decoded.shape,'11111111111111111',encoded.shape) | |
return decoded | |
class dnetccnl512(nn.Module): | |
#in_channels -> nc | encoder first layer | |
#filters -> ndf | encoder first layer | |
#img_size(h,w) -> ndim | |
#out_channels -> optical flow (x,y) | |
def __init__(self, img_size=448, in_channels=3, out_channels=2, filters=32,fc_units=100): | |
super(dnetccnl512, self).__init__() | |
self.nc=in_channels | |
self.nf=filters | |
self.ndim=img_size | |
self.oc=out_channels | |
self.fcu=fc_units | |
self.encoder=waspDenseEncoder512(nc=self.nc+2,ndf=self.nf,ndim=self.ndim) | |
self.decoder=waspDenseDecoder512(nz=self.ndim,nc=self.oc,ngf=self.nf) | |
# self.fc_layers= nn.Sequential(nn.Linear(self.ndim, self.fcu), | |
# nn.ReLU(True), | |
# nn.Dropout(0.25), | |
# nn.Linear(self.fcu,self.ndim), | |
# nn.ReLU(True), | |
# nn.Dropout(0.25), | |
# ) | |
def forward(self, inputs): | |
encoded=self.encoder(inputs) | |
encoded=encoded.unsqueeze(-1).unsqueeze(-1) | |
decoded=self.decoder(encoded) | |
# print torch.max(decoded) | |
# print torch.min(decoded) | |
# print(decoded.shape,'11111111111111111',encoded.shape) | |
return decoded |