oguzakif's picture
init repo
d4b77ac
raw
history blame
7.17 kB
import random
import pickle
import logging
import torch
import cv2
import os
from torch.utils.data.dataset import Dataset
import numpy as np
import cvbase
from .util.STTN_mask import create_random_shape_with_random_motion
import imageio
from .util.flow_utils import region_fill as rf
logger = logging.getLogger('base')
class VideoBasedDataset(Dataset):
def __init__(self, opt, dataInfo):
self.opt = opt
self.sampleMethod = opt['sample']
self.dataInfo = dataInfo
self.height, self.width = self.opt['input_resolution']
self.frame_path = dataInfo['frame_path']
self.flow_path = dataInfo['flow_path'] # The path of the optical flows
self.train_list = os.listdir(self.frame_path)
self.name2length = self.dataInfo['name2len']
with open(self.name2length, 'rb') as f:
self.name2length = pickle.load(f)
self.sequenceLen = self.opt['num_frames']
self.flow2rgb = opt['flow2rgb'] # whether to change flow to rgb domain
self.flow_direction = opt[
'flow_direction'] # The direction must be in ['for', 'back', 'bi'], indicating forward, backward and bidirectional flows
def __len__(self):
return len(self.train_list)
def __getitem__(self, idx):
try:
item = self.load_item(idx)
except:
print('Loading error: ' + self.train_list[idx])
item = self.load_item(0)
return item
def frameSample(self, frameLen, sequenceLen):
if self.sampleMethod == 'random':
indices = [i for i in range(frameLen)]
sampleIndices = random.sample(indices, sequenceLen)
elif self.sampleMethod == 'seq':
pivot = random.randint(0, sequenceLen - 1 - frameLen)
sampleIndices = [i for i in range(pivot, pivot + frameLen)]
else:
raise ValueError('Cannot determine the sample method {}'.format(self.sampleMethod))
return sampleIndices
def load_item(self, idx):
video = self.train_list[idx]
frame_dir = os.path.join(self.frame_path, video)
forward_flow_dir = os.path.join(self.flow_path, video, 'forward_flo')
backward_flow_dir = os.path.join(self.flow_path, video, 'backward_flo')
frameLen = self.name2length[video]
flowLen = frameLen - 1
assert frameLen > self.sequenceLen, 'Frame length {} is less than sequence length'.format(frameLen)
sampledIndices = self.frameSample(frameLen, self.sequenceLen)
# generate random masks for these sampled frames
candidateMasks = create_random_shape_with_random_motion(frameLen, 0.9, 1.1, 1, 10)
# read the frames and masks
frames, masks, forward_flows, backward_flows = [], [], [], []
for i in range(len(sampledIndices)):
frame = self.read_frame(os.path.join(frame_dir, '{:05d}.jpg'.format(sampledIndices[i])), self.height,
self.width)
mask = self.read_mask(candidateMasks[sampledIndices[i]], self.height, self.width)
frames.append(frame)
masks.append(mask)
if self.flow_direction == 'for':
forward_flow = self.read_forward_flow(forward_flow_dir, sampledIndices[i], flowLen)
forward_flow = self.diffusion_flow(forward_flow, mask)
forward_flows.append(forward_flow)
elif self.flow_direction == 'back':
backward_flow = self.read_backward_flow(backward_flow_dir, sampledIndices[i])
backward_flow = self.diffusion_flow(backward_flow, mask)
backward_flows.append(backward_flow)
elif self.flow_direction == 'bi':
forward_flow = self.read_forward_flow(forward_flow_dir, sampledIndices[i], flowLen)
forward_flow = self.diffusion_flow(forward_flow, mask)
forward_flows.append(forward_flow)
backward_flow = self.read_backward_flow(backward_flow_dir, sampledIndices[i])
backward_flow = self.diffusion_flow(backward_flow, mask)
backward_flows.append(backward_flow)
else:
raise ValueError('Unknown flow direction mode: {}'.format(self.flow_direction))
inputs = {'frames': frames, 'masks': masks, 'forward_flo': forward_flows, 'backward_flo': backward_flows}
inputs = self.to_tensor(inputs)
inputs['frames'] = (inputs['frames'] / 255.) * 2 - 1
return inputs
def diffusion_flow(self, flow, mask):
flow_filled = np.zeros(flow.shape)
flow_filled[:, :, 0] = rf.regionfill(flow[:, :, 0] * (1 - mask), mask)
flow_filled[:, :, 1] = rf.regionfill(flow[:, :, 1] * (1 - mask), mask)
return flow_filled
def read_frame(self, path, height, width):
frame = imageio.imread(path)
frame = cv2.resize(frame, (width, height), cv2.INTER_LINEAR)
return frame
def read_mask(self, mask, height, width):
mask = np.array(mask)
mask = mask / 255.
raw_mask = (mask > 0.5).astype(np.uint8)
raw_mask = cv2.resize(raw_mask, dsize=(width, height), interpolation=cv2.INTER_NEAREST)
return raw_mask
def read_forward_flow(self, forward_flow_dir, sampledIndex, flowLen):
if sampledIndex >= flowLen:
sampledIndex = flowLen - 1
flow = cvbase.read_flow(os.path.join(forward_flow_dir, '{:05d}.flo'.format(sampledIndex)))
height, width = flow.shape[:2]
flow = cv2.resize(flow, (self.width, self.height), cv2.INTER_LINEAR)
flow[:, :, 0] = flow[:, :, 0] / width * self.width
flow[:, :, 1] = flow[:, :, 1] / height * self.height
return flow
def read_backward_flow(self, backward_flow_dir, sampledIndex):
if sampledIndex == 0:
sampledIndex = 0
else:
sampledIndex -= 1
flow = cvbase.read_flow(os.path.join(backward_flow_dir, '{:05d}.flo'.format(sampledIndex)))
height, width = flow.shape[:2]
flow = cv2.resize(flow, (self.width, self.height), cv2.INTER_LINEAR)
flow[:, :, 0] = flow[:, :, 0] / width * self.width
flow[:, :, 1] = flow[:, :, 1] / height * self.height
return flow
def to_tensor(self, data_list):
"""
Args:
data_list: A list contains multiple numpy arrays
Returns: The stacked tensor list
"""
keys = list(data_list.keys())
for key in keys:
if data_list[key] is None or data_list[key] == []:
data_list.pop(key)
else:
item = data_list[key]
if not isinstance(item, list):
item = torch.from_numpy(np.transpose(item, (2, 0, 1))).float() # [c, h, w]
else:
item = np.stack(item, axis=0)
if len(item.shape) == 3: # [t, h, w]
item = item[:, :, :, np.newaxis]
item = torch.from_numpy(np.transpose(item, (0, 3, 1, 2))).float() # [t, c, h, w]
data_list[key] = item
return data_list