Spaces:
Running
on
Zero
Running
on
Zero
File size: 9,175 Bytes
135075d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
import glob
import torch
import os
from os import path as osp
from torch.utils import data as data
from torchvision.transforms.functional import normalize
from collections import defaultdict
import numpy as np
import cv2
from basicsr.data.data_util import duf_downsample, generate_frame_indices, read_img_seq
from basicsr.utils import get_root_logger, scandir
from basicsr.utils.registry import DATASET_REGISTRY
from basicsr.utils.img_util import img2tensor, tensor2img
from facelib.utils.face_restoration_helper import FaceAligner
@DATASET_REGISTRY.register()
class VideoTestDataset(data.Dataset):
"""Video test dataset.
Supported datasets: Vid4, REDS4, REDSofficial.
More generally, it supports testing dataset with following structures:
::
dataroot
βββ subfolder1
βββ frame000
βββ frame001
βββ ...
βββ subfolder2
βββ frame000
βββ frame001
βββ ...
βββ ...
For testing datasets, there is no need to prepare LMDB files.
Args:
opt (dict): Config for train dataset. It contains the following keys:
dataroot_gt (str): Data root path for gt.
dataroot_lq (str): Data root path for lq.
io_backend (dict): IO backend type and other kwarg.
cache_data (bool): Whether to cache testing datasets.
name (str): Dataset name.
global_meta_info_file (str): The path to the file storing the list of test folders. If not provided, all the folders
in the dataroot will be used.
num_frame (int): Window size for input frames.
padding (str): Padding mode.
"""
def __init__(self, opt):
super(VideoTestDataset, self).__init__()
self.opt = opt
self.cache_data = opt['cache_data']
self.interval = opt['interval']
self.gt_root, self.lq_root = opt['dataroot_gt'], opt['dataroot_lq']
self.data_info = {'lq_path': [], 'gt_path': [],
'folder': [], 'idx': [], 'border': []}
# file client (io backend)
self.file_client = None
self.io_backend_opt = opt['io_backend']
assert self.io_backend_opt['type'] != 'lmdb', 'No need to use lmdb during validation/test.'
logger = get_root_logger()
logger.info(f'Generate data info for VideoTestDataset - {opt["name"]}')
self.imgs_lq, self.imgs_gt = {}, {}
if 'global_meta_info_file' in opt:
with open(opt['global_meta_info_file'], 'r') as fin:
subfolders = [line.split('/')[0] for line in fin]
subfolders_lq = [osp.join(self.lq_root, key)
for key in subfolders]
subfolders_gt = [osp.join(self.gt_root, key)
for key in subfolders]
else:
subfolders_lq = sorted(glob.glob(osp.join(self.lq_root, '*')))
subfolders_gt = sorted(glob.glob(osp.join(self.gt_root, '*')))
for subfolder_lq, subfolder_gt in zip(subfolders_lq, subfolders_gt):
# get frame list for lq and gt
subfolder_name = osp.basename(subfolder_lq)
img_paths_lq = sorted(list(scandir(subfolder_lq, full_path=True)))[
::self.interval]
img_paths_gt = sorted(list(scandir(subfolder_gt, full_path=True)))[
::self.interval]
max_idx = len(img_paths_lq)
assert max_idx == len(img_paths_gt), (f'Different number of images in lq ({max_idx})'
f' and gt folders ({len(img_paths_gt)})')
self.data_info['lq_path'].extend(img_paths_lq)
self.data_info['gt_path'].extend(img_paths_gt)
self.data_info['folder'].extend([subfolder_name] * max_idx)
for i in range(max_idx):
self.data_info['idx'].append(f'{i}/{max_idx}')
border_l = [0] * max_idx
for i in range(self.opt['num_frame'] // 2):
border_l[i] = 1
border_l[max_idx - i - 1] = 1
self.data_info['border'].extend(border_l)
# cache data or save the frame list
if self.cache_data:
logger.info(
f'Cache {subfolder_name} for VideoTestDataset...')
self.imgs_lq[subfolder_name] = read_img_seq(img_paths_lq)
self.imgs_gt[subfolder_name] = read_img_seq(img_paths_gt)
else:
self.imgs_lq[subfolder_name] = img_paths_lq
self.imgs_gt[subfolder_name] = img_paths_gt
self.normalize = opt.get('normalize', False)
def __getitem__(self, index):
folder = self.data_info['folder'][index]
idx, max_idx = self.data_info['idx'][index].split('/')
idx, max_idx = int(idx), int(max_idx)
border = self.data_info['border'][index]
lq_path = self.data_info['lq_path'][index]
select_idx = generate_frame_indices(
idx, max_idx, self.opt['num_frame'], padding=self.opt['padding'])
if self.cache_data:
imgs_lq = self.imgs_lq[folder].index_select(
0, torch.LongTensor(select_idx))
img_gt = self.imgs_gt[folder][idx]
else:
img_paths_lq = [self.imgs_lq[folder][i] for i in select_idx]
imgs_lq = read_img_seq(img_paths_lq)
img_gt = read_img_seq([self.imgs_gt[folder][idx]])
img_gt.squeeze_(0)
if self.normalize:
normalize(imgs_lq, [0.5, 0.5, 0.5], [0.5, 0.5, 0.5], inplace=True)
normalize(img_gt, [0.5, 0.5, 0.5], [0.5, 0.5, 0.5], inplace=True)
return {
'lq': imgs_lq, # (t, c, h, w)
'gt': img_gt, # (c, h, w)
'folder': folder, # folder name
'idx': self.data_info['idx'][index], # e.g., 0/99
'border': border, # 1 for border, 0 for non-border
'lq_path': lq_path # center frame
}
def __len__(self):
return len(self.data_info['gt_path'])
@DATASET_REGISTRY.register()
class VideoRecurrentTestDataset(VideoTestDataset):
"""Video test dataset for recurrent architectures, which takes LR video
frames as input and output corresponding HR video frames.
Args:
opt (dict): Same as VideoTestDataset. Unused opt:
padding (str): Padding mode.
"""
def __init__(self, opt):
super(VideoRecurrentTestDataset, self).__init__(opt)
# Find unique folder strings
self.folders = sorted(list(set(self.data_info['folder'])))
self.need_align = opt.get('need_align', False)
self.normalize = opt.get('normalize', False)
if self.need_align:
self.dataroot_meta_info = opt['dataroot_meta_info']
self.face_aligner = FaceAligner(
upscale_factor=1,
face_size=512,
crop_ratio=(1, 1),
det_model='retinaface_resnet50',
save_ext='png',
use_parse=True,)
def __getitem__(self, index):
folder = self.folders[index]
if self.cache_data:
imgs_lq = self.imgs_lq[folder]
imgs_gt = self.imgs_gt[folder]
else:
imgs_lq = read_img_seq(self.imgs_lq[folder])
imgs_gt = read_img_seq(self.imgs_gt[folder])
if self.need_align:
clip_info_path = os.path.join(
self.dataroot_meta_info, f'{folder}.txt')
clip_info = []
with open(clip_info_path, 'r', encoding='utf-8') as fin:
for line in fin:
line = line.strip()
if line.startswith('0'):
clip_info.append(line)
align_lqs, align_gts = [], []
for frame_idx, (img_lq, img_gt) in enumerate(zip(imgs_lq, imgs_gt)):
img_lq = tensor2img(img_lq) / 255.0
img_gt = tensor2img(img_gt) / 255.0
landmarks_str = clip_info[frame_idx].split(' ')[1:]
# print(clip_name, paths[neighbor], landmarks_str)
landmarks = np.array([float(x)
for x in landmarks_str]).reshape(5, 2)
self.face_aligner.clean_all()
# align and warp each face
img_lq, img_gt = self.face_aligner.align_pair_face(
img_lq, img_gt, landmarks)
align_lqs.append(img_lq)
align_gts.append(img_gt)
img_lqs, img_gts = align_lqs, align_gts
img_gts = img2tensor(img_gts)
img_lqs = img2tensor(img_lqs)
imgs_gt = torch.stack(img_gts, dim=0)
imgs_lq = torch.stack(img_lqs, dim=0)
if self.normalize:
normalize(imgs_lq, [0.5, 0.5, 0.5], [0.5, 0.5, 0.5], inplace=True)
normalize(imgs_gt, [0.5, 0.5, 0.5], [0.5, 0.5, 0.5], inplace=True)
return {
'lq': imgs_lq,
'gt': imgs_gt,
'folder': folder,
}
def __len__(self):
return len(self.folders)
|