toto10's picture
00a4ef3f9ad50e484da61893e60b77bb57629f7b9dd7d759346c4dda116cba15
a0611b0
raw
history blame
6.76 kB
import torch
import torch.nn as nn
import torch.functional as F
# Define the model
class FloweR(nn.Module):
def __init__(self, input_size = (384, 384), window_size = 4):
super(FloweR, self).__init__()
self.input_size = input_size
self.window_size = window_size
# 2 channels for optical flow
# 1 channel for occlusion mask
# 3 channels for next frame prediction
self.out_channels = 6
#INPUT: 384 x 384 x 4 * 3
### DOWNSCALE ###
self.conv_block_1 = nn.Sequential(
nn.Conv2d(3 * self.window_size, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 384 x 384 x 128
self.conv_block_2 = nn.Sequential(
nn.AvgPool2d(2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 192 x 192 x 128
self.conv_block_3 = nn.Sequential(
nn.AvgPool2d(2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 96 x 96 x 128
self.conv_block_4 = nn.Sequential(
nn.AvgPool2d(2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 48 x 48 x 128
self.conv_block_5 = nn.Sequential(
nn.AvgPool2d(2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 24 x 24 x 128
self.conv_block_6 = nn.Sequential(
nn.AvgPool2d(2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 12 x 12 x 128
self.conv_block_7 = nn.Sequential(
nn.AvgPool2d(2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 6 x 6 x 128
self.conv_block_8 = nn.Sequential(
nn.AvgPool2d(2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 3 x 3 x 128 - 9 input tokens
### Transformer part ###
# To be done
### UPSCALE ###
self.conv_block_9 = nn.Sequential(
nn.Upsample(scale_factor=2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 6 x 6 x 128
self.conv_block_10 = nn.Sequential(
nn.Upsample(scale_factor=2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 12 x 12 x 128
self.conv_block_11 = nn.Sequential(
nn.Upsample(scale_factor=2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 24 x 24 x 128
self.conv_block_12 = nn.Sequential(
nn.Upsample(scale_factor=2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 48 x 48 x 128
self.conv_block_13 = nn.Sequential(
nn.Upsample(scale_factor=2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 96 x 96 x 128
self.conv_block_14 = nn.Sequential(
nn.Upsample(scale_factor=2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 192 x 192 x 128
self.conv_block_15 = nn.Sequential(
nn.Upsample(scale_factor=2),
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'),
nn.ReLU(),
) # 384 x 384 x 128
self.conv_block_16 = nn.Conv2d(128, self.out_channels, kernel_size=3, stride=1, padding='same')
def forward(self, input_frames):
if input_frames.size(1) != self.window_size:
raise Exception(f'Shape of the input is not compatable. There should be exactly {self.window_size} frames in an input video.')
h, w = self.input_size
# batch, frames, height, width, colors
input_frames_permuted = input_frames.permute((0, 1, 4, 2, 3))
# batch, frames, colors, height, width
in_x = input_frames_permuted.reshape(-1, self.window_size * 3, self.input_size[0], self.input_size[1])
### DOWNSCALE ###
block_1_out = self.conv_block_1(in_x) # 384 x 384 x 128
block_2_out = self.conv_block_2(block_1_out) # 192 x 192 x 128
block_3_out = self.conv_block_3(block_2_out) # 96 x 96 x 128
block_4_out = self.conv_block_4(block_3_out) # 48 x 48 x 128
block_5_out = self.conv_block_5(block_4_out) # 24 x 24 x 128
block_6_out = self.conv_block_6(block_5_out) # 12 x 12 x 128
block_7_out = self.conv_block_7(block_6_out) # 6 x 6 x 128
block_8_out = self.conv_block_8(block_7_out) # 3 x 3 x 128
### UPSCALE ###
block_9_out = block_7_out + self.conv_block_9(block_8_out) # 6 x 6 x 128
block_10_out = block_6_out + self.conv_block_10(block_9_out) # 12 x 12 x 128
block_11_out = block_5_out + self.conv_block_11(block_10_out) # 24 x 24 x 128
block_12_out = block_4_out + self.conv_block_12(block_11_out) # 48 x 48 x 128
block_13_out = block_3_out + self.conv_block_13(block_12_out) # 96 x 96 x 128
block_14_out = block_2_out + self.conv_block_14(block_13_out) # 192 x 192 x 128
block_15_out = block_1_out + self.conv_block_15(block_14_out) # 384 x 384 x 128
block_16_out = self.conv_block_16(block_15_out) # 384 x 384 x (2 + 1 + 3)
out = block_16_out.reshape(-1, self.out_channels, self.input_size[0], self.input_size[1])
### for future model training ###
device = out.get_device()
pred_flow = out[:,:2,:,:] * 255 # (-255, 255)
pred_occl = (out[:,2:3,:,:] + 1) / 2 # [0, 1]
pred_next = out[:,3:6,:,:]
# Generate sampling grids
# Create grid to upsample input
'''
d = torch.linspace(-1, 1, 8)
meshx, meshy = torch.meshgrid((d, d))
grid = torch.stack((meshy, meshx), 2)
grid = grid.unsqueeze(0) '''
grid_y, grid_x = torch.meshgrid(torch.arange(0, h), torch.arange(0, w))
flow_grid = torch.stack((grid_x, grid_y), dim=0).float()
flow_grid = flow_grid.unsqueeze(0).to(device=device)
flow_grid = flow_grid + pred_flow
flow_grid[:, 0, :, :] = 2 * flow_grid[:, 0, :, :] / (w - 1) - 1
flow_grid[:, 1, :, :] = 2 * flow_grid[:, 1, :, :] / (h - 1) - 1
# batch, flow_chanels, height, width
flow_grid = flow_grid.permute(0, 2, 3, 1)
# batch, height, width, flow_chanels
previous_frame = input_frames_permuted[:, -1, :, :, :]
sampling_mode = "bilinear" if self.training else "nearest"
warped_frame = torch.nn.functional.grid_sample(previous_frame, flow_grid, mode=sampling_mode, padding_mode="reflection", align_corners=False)
alpha_mask = torch.clip(pred_occl * 10, 0, 1) * 0.04
pred_next = torch.clip(pred_next, -1, 1)
warped_frame = torch.clip(warped_frame, -1, 1)
next_frame = pred_next * alpha_mask + warped_frame * (1 - alpha_mask)
res = torch.cat((pred_flow / 255, pred_occl * 2 - 1, next_frame), dim=1)
# batch, channels, height, width
res = res.permute((0, 2, 3, 1))
# batch, height, width, channels
return res