Spaces:
Running
on
T4
Running
on
T4
import os | |
import sys | |
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # NOQA | |
import argparse | |
from math import ceil | |
from glob import glob | |
import numpy as np | |
import cv2 | |
from PIL import Image, ImageDraw, ImageOps, ImageFont | |
from utils.logging_config import logger | |
from utils.util import make_dirs, bbox_offset | |
DEFAULT_FPS = 6 | |
MAX_LENGTH = 60 | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'-fps', '--fps', | |
type=int, default=DEFAULT_FPS, | |
help="Output video FPS" | |
) | |
parser.add_argument( | |
'-v', '--video_dir', | |
type=str, | |
help="Video directory name" | |
) | |
parser.add_argument( | |
'-vs', '--video_dirs', | |
nargs='+', | |
type=str, | |
help="Video directory names" | |
) | |
parser.add_argument( | |
'-v2', '--video_dir2', | |
type=str, | |
help="Video directory name" | |
) | |
parser.add_argument( | |
'-sd', '--segms_dir', | |
type=str, | |
help="Segmentation directory name" | |
) | |
parser.add_argument( | |
'-fgd', '--fg_dir', | |
type=str, | |
help="Foreground directory name" | |
) | |
parser.add_argument( | |
'-fgfd', '--fg_frames_dir', | |
type=str, | |
help="Foreground frames directory name" | |
) | |
parser.add_argument( | |
'-fgsd', '--fg_segms_dir', | |
type=str, | |
help="Foreground segmentations directory name" | |
) | |
parser.add_argument( | |
'-syfd', '--syn_frames_dir', | |
type=str, | |
help="Synthesized frames directory name" | |
) | |
parser.add_argument( | |
'-bgfd', '--bg_frames_dir', | |
type=str, | |
help="Background frames directory name" | |
) | |
parser.add_argument( | |
'-rt', '--reader_type', | |
type=str, | |
help="Type of reader" | |
) | |
parser.add_argument( | |
'-od', '--output_dir', | |
type=str, | |
help="Output directory name" | |
) | |
parser.add_argument( | |
'-o', '--output_filename', | |
type=str, required=True, | |
help="Output output filename" | |
) | |
args = parser.parse_args() | |
return args | |
class Reader: | |
def __init__(self, dir_name, read=True, max_length=None, sample_period=1): | |
self.dir_name = dir_name | |
self.count = 0 | |
self.max_length = max_length | |
self.filenames = [] | |
self.sample_period = sample_period | |
if read: | |
if os.path.exists(dir_name): | |
# self.filenames = read_filenames_from_dir(dir_name, self.__class__.__name__) | |
# ^^^^^ yield None when reading some videos of face forensics data | |
# (related to 'Too many levels of symbolic links'?) | |
self.filenames = sorted(glob(os.path.join(dir_name, '*'))) | |
self.filenames = [f for f in self.filenames if os.path.isfile(f)] | |
self.filenames = self.filenames[::sample_period][:max_length] | |
self.files = self.read_files(self.filenames) | |
else: | |
self.files = [] | |
logger.warning(f"Directory {dir_name} not exists!") | |
else: | |
self.files = [] | |
self.current_index = 0 | |
def append(self, file_): | |
self.files.append(file_) | |
def set_files(self, files): | |
self.files = files | |
def read_files(self, filenames): | |
assert type(filenames) == list, f'filenames is not a list; dirname: {self.dir_name}' | |
filenames.sort() | |
frames = [] | |
for filename in filenames: | |
file_ = self.read_file(filename) | |
frames.append(file_) | |
return frames | |
def save_files(self, output_dir=None): | |
make_dirs(output_dir) | |
logger.info(f"Saving {self.__class__.__name__} files to {output_dir}") | |
for i, file_ in enumerate(self.files): | |
self._save_file(output_dir, i, file_) | |
def _save_file(self, output_dir, i, file_): | |
raise NotImplementedError("This is an abstract function") | |
def read_file(self, filename): | |
raise NotImplementedError("This is an abstract function") | |
def __iter__(self): | |
return self | |
def __next__(self): | |
if self.current_index < len(self.files): | |
file_ = self.files[self.current_index] | |
self.current_index += 1 | |
return file_ | |
else: | |
self.current_index = 0 | |
raise StopIteration | |
def __getitem__(self, key): | |
return self.files[key] | |
def __len__(self): | |
return len(self.files) | |
class FrameReader(Reader): | |
def __init__( | |
self, dir_name, resize=None, read=True, max_length=MAX_LENGTH, | |
scale=1, sample_period=1 | |
): | |
self.resize = resize | |
self.scale = scale | |
self.sample_period = sample_period | |
super().__init__(dir_name, read, max_length, sample_period) | |
def read_file(self, filename): | |
origin_frame = Image.open(filename) | |
size = self.resize if self.resize is not None else origin_frame.size | |
origin_frame_resized = origin_frame.resize( | |
(int(size[0] * self.scale), int(size[1] * self.scale)) | |
) | |
return origin_frame_resized | |
def _save_file(self, output_dir, i, file_): | |
if len(self.filenames) == len(self.files): | |
name = sorted(self.filenames)[i].split('/')[-1] | |
else: | |
name = f"frame_{i:04}.png" | |
filename = os.path.join( | |
output_dir, name | |
) | |
file_.save(filename, "PNG") | |
def write_files_to_video(self, output_filename, fps=DEFAULT_FPS, frame_num_when_repeat_list=[1]): | |
logger.info( | |
f"Writeing frames to video {output_filename} with FPS={fps}") | |
video_writer = cv2.VideoWriter( | |
output_filename, | |
cv2.VideoWriter_fourcc(*"MJPG"), | |
fps, | |
self.files[0].size | |
) | |
for frame_num_when_repeat in frame_num_when_repeat_list: | |
for frame in self.files: | |
frame = frame.convert("RGB") | |
frame_cv = np.array(frame) | |
frame_cv = cv2.cvtColor(frame_cv, cv2.COLOR_RGB2BGR) | |
for i in range(frame_num_when_repeat): | |
video_writer.write(frame_cv) | |
video_writer.release() | |
class SynthesizedFrameReader(FrameReader): | |
def __init__( | |
self, bg_frames_dir, fg_frames_dir, | |
fg_segms_dir, segm_bbox_mask_dir, fg_dir, dir_name, | |
bboxes_list_dir, | |
fg_scale=0.7, fg_location=(48, 27), mask_only=False | |
): | |
self.bg_reader = FrameReader(bg_frames_dir) | |
self.size = self.bg_reader[0].size | |
# TODO: add different location and change scale to var | |
self.fg_reader = ForegroundReader( | |
fg_frames_dir, fg_segms_dir, fg_dir, | |
resize=self.size, | |
scale=fg_scale | |
) | |
self.fg_location = fg_location | |
# self.masks = self.fg_reader.masks | |
# self.bbox_masks = self.fg_reader.bbox_masks | |
super().__init__(dir_name, read=False) | |
self.files = self.synthesize_frames( | |
self.bg_reader, self.fg_reader, mask_only) | |
self.bbox_masks = MaskGenerator( | |
segm_bbox_mask_dir, self.size, self.get_bboxeses() | |
) | |
self.bboxes_list_dir = bboxes_list_dir | |
self.bboxes_list = self.get_bboxeses() | |
self.save_bboxes() | |
def save_bboxes(self): | |
make_dirs(self.bboxes_list_dir) | |
logger.info(f"Saving bboxes to {self.bboxes_list_dir}") | |
for i, bboxes in enumerate(self.bboxes_list): | |
save_path = os.path.join(self.bboxes_list_dir, f"bboxes_{i:04}.txt") | |
if len(bboxes) > 0: | |
np.savetxt(save_path, bboxes[0], fmt='%4u') | |
def get_bboxeses(self): | |
bboxeses = self.fg_reader.segms.bboxeses | |
new_bboxeses = [] | |
for bboxes in bboxeses: | |
new_bboxes = [] | |
for bbox in bboxes: | |
offset_bbox = bbox_offset(bbox, self.fg_location) | |
new_bboxes.append(offset_bbox) | |
new_bboxeses.append(new_bboxes) | |
return new_bboxeses | |
def synthesize_frames(self, bg_reader, fg_reader, mask_only=False): | |
logger.info( | |
f"Synthesizing {bg_reader.dir_name} and {fg_reader.dir_name}" | |
) | |
synthesized_frames = [] | |
for i, bg in enumerate(bg_reader): | |
if i == len(fg_reader): | |
break | |
fg = fg_reader[i] | |
mask = fg_reader.get_mask(i) | |
synthesized_frame = bg.copy() | |
if mask_only: | |
synthesized_frame.paste(mask, self.fg_location, mask) | |
else: | |
synthesized_frame.paste(fg, self.fg_location, mask) | |
synthesized_frames.append(synthesized_frame) | |
return synthesized_frames | |
class WarpedFrameReader(FrameReader): | |
def __init__(self, dir_name, i, ks): | |
self.i = i | |
self.ks = ks | |
super().__init__(dir_name) | |
def _save_file(self, output_dir, i, file_): | |
filename = os.path.join( | |
output_dir, | |
f"warped_frame_{self.i:04}_k{self.ks[i]:02}.png" | |
) | |
file_.save(filename) | |
class SegmentationReader(FrameReader): | |
def __init__( | |
self, dir_name, | |
resize=None, scale=1 | |
): | |
super().__init__( | |
dir_name, resize=resize, scale=scale | |
) | |
def read_file(self, filename): | |
origin_frame = Image.open(filename) | |
mask = ImageOps.invert(origin_frame.convert("L")) | |
mask = mask.point(lambda x: 0 if x < 255 else 255, '1') | |
size = self.resize if self.resize is not None else origin_frame.size | |
mask_resized = mask.resize( | |
(int(size[0] * self.scale), int(size[1] * self.scale)) | |
) | |
return mask_resized | |
class MaskReader(Reader): | |
def __init__(self, dir_name, read=True): | |
super().__init__(dir_name, read=read) | |
def read_file(self, filename): | |
mask = Image.open(filename) | |
return mask | |
def _save_file(self, output_dir, i, file_): | |
filename = os.path.join( | |
output_dir, | |
f"mask_{i:04}.png" | |
) | |
file_.save(filename) | |
def get_bboxes(self, i): | |
# TODO: save bbox instead of looking for one | |
mask = self.files[i] | |
mask = ImageOps.invert(mask.convert("L")).convert("1") | |
mask = np.array(mask) | |
image, contours, hier = cv2.findContours( | |
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
bboxes = [] | |
for c in contours: | |
# get the bounding rect | |
x, y, w, h = cv2.boundingRect(c) | |
bbox = ((x, y), (x + w - 1, y + h - 1)) | |
bboxes.append(bbox) | |
return bboxes | |
def get_bbox(self, i): | |
# TODO: save bbox instead of looking for one | |
mask = self.files[i] | |
mask = ImageOps.invert(mask.convert("L")) | |
mask = np.array(mask) | |
image, contours, hier = cv2.findContours( | |
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
for c in contours: | |
# get the bounding rect | |
x, y, w, h = cv2.boundingRect(c) | |
bbox = ((x, y), (x + w - 1, y + h - 1)) | |
return bbox | |
class MaskGenerator(Reader): | |
def __init__( | |
self, mask_output_dir, size, bboxeses, save_masks=True | |
): | |
self.bboxeses = bboxeses | |
self.size = size | |
super().__init__(mask_output_dir, read=False) | |
self.files = self.generate_masks() | |
if save_masks: | |
make_dirs(mask_output_dir) | |
self.save_files(mask_output_dir) | |
def _save_file(self, output_dir, i, file_): | |
filename = os.path.join( | |
output_dir, | |
f"mask_{i:04}.png" | |
) | |
file_.save(filename) | |
def get_bboxes(self, i): | |
return self.bboxeses[i] | |
def generate_masks(self): | |
masks = [] | |
for i in range(len(self.bboxeses)): | |
mask = self.generate_mask(i) | |
masks.append(mask) | |
return masks | |
def generate_mask(self, i): | |
bboxes = self.bboxeses[i] | |
mask = Image.new("1", self.size, 1) | |
draw = ImageDraw.Draw(mask) | |
for bbox in bboxes: | |
draw.rectangle( | |
bbox, fill=0 | |
) | |
return mask | |
class ForegroundReader(FrameReader): | |
def __init__( | |
self, frames_dir, segms_dir, dir_name, | |
resize=None, scale=1 | |
): | |
self.frames_dir = frames_dir | |
self.segms_dir = segms_dir | |
self.frames = FrameReader( | |
frames_dir, | |
resize=resize, scale=scale | |
) | |
self.segms = SegmentationReader( | |
segms_dir, resize=resize, scale=scale | |
) | |
super().__init__(dir_name, read=False) | |
self.masks = self.segms.masks | |
# self.bbox_masks = self.segms.bbox_masks | |
self.files = self.generate_fg_frames(self.frames, self.segms) | |
def get_mask(self, i): | |
return self.masks[i] | |
def generate_fg_frames(self, frames, segms): | |
logger.info( | |
f"Generating fg frames from {self.frames_dir} and {self.segms_dir}" | |
) | |
fg_frames = [] | |
for i, frame in enumerate(frames): | |
mask = self.masks[i] | |
fg_frame = Image.new("RGB", frame.size, (0, 0, 0)) | |
fg_frame.paste( | |
frame, (0, 0), | |
mask | |
) | |
fg_frames.append(fg_frame) | |
return fg_frames | |
class CompareFramesReader(FrameReader): | |
def __init__(self, dir_names, col=2, names=[], mask_dir=None): | |
self.videos = [] | |
for dir_name in dir_names: | |
# If a method fails on this video, use None to indicate the situation | |
try: | |
self.videos.append(FrameReader(dir_name)) | |
except AssertionError: | |
self.videos.append(None) | |
if mask_dir is not None: | |
self.masks = MaskReader(mask_dir) | |
self.names = names | |
self.files = self.combine_videos(self.videos, col) | |
def combine_videos(self, videos, col=2, edge_offset=35, h_start_offset=35): | |
combined_frames = [] | |
w, h = videos[0][0].size | |
# Prevent the first method fails and have a "None" as its video | |
i = 0 | |
while videos[i] is None: | |
i += 1 | |
length = len(videos[i]) | |
video_num = len(videos) | |
row = ceil(video_num / col) | |
for frame_idx in range(length): | |
width = col * w + (col - 1) * edge_offset | |
height = row * h + (row - 1) * edge_offset + h_start_offset | |
combined_frame = Image.new("RGBA", (width, height)) | |
draw = ImageDraw.Draw(combined_frame) | |
for i, video in enumerate(videos): | |
# Give the failed method a black output | |
if video is None or frame_idx >= len(video): | |
failed = True | |
frame = Image.new("RGBA", (w, h)) | |
else: | |
frame = video[frame_idx].convert("RGBA") | |
failed = False | |
f_x = (i % col) * (w + edge_offset) | |
f_y = (i // col) * (h + edge_offset) + h_start_offset | |
combined_frame.paste(frame, (f_x, f_y)) | |
# Draw name | |
font = ImageFont.truetype("DejaVuSans.ttf", 12) | |
# font = ImageFont.truetype("DejaVuSans-Bold.ttf", 13) | |
# font = ImageFont.truetype("timesbd.ttf", 14) | |
name = self.names[i] if not failed else f'{self.names[i]} (failed)' | |
draw.text( | |
(f_x + 10, f_y - 20), | |
name, (255, 255, 255), font=font | |
) | |
combined_frames.append(combined_frame) | |
return combined_frames | |
class BoundingBoxesListReader(Reader): | |
def __init__( | |
self, dir_name, resize=None, read=True, max_length=MAX_LENGTH, | |
scale=1 | |
): | |
self.resize = resize | |
self.scale = scale | |
super().__init__(dir_name, read, max_length) | |
def read_file(self, filename): | |
bboxes = np.loadtxt(filename, dtype=int) | |
bboxes = [bboxes.tolist()] | |
return bboxes | |
def save_frames_to_dir(frames, dirname): | |
reader = FrameReader(dirname, read=False) | |
reader.set_files(frames) | |
reader.save_files(dirname) | |
if __name__ == "__main__": | |
args = parse_args() | |
if args.reader_type is None: | |
reader = FrameReader(args.video_dir) | |
elif args.reader_type == 'fg': | |
reader = ForegroundReader( | |
args.video_dir, args.segms_dir, args.fg_dir) | |
elif args.reader_type == 'sy': | |
reader = SynthesizedFrameReader( | |
args.bg_frames_dir, args.fg_frames_dir, | |
args.fg_segms_dir, args.fg_dir, args.syn_frames_dir | |
) | |
elif args.reader_type == 'com': | |
reader = CompareFramesReader( | |
args.video_dirs | |
) | |
reader.write_files_to_video( | |
os.path.join(args.output_dir, args.output_filename), | |
fps=args.fps | |
) | |