import os import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # NOQA import argparse from math import ceil from glob import glob import numpy as np import cv2 from PIL import Image, ImageDraw, ImageOps, ImageFont from utils.logging_config import logger from utils.util import make_dirs, bbox_offset DEFAULT_FPS = 6 MAX_LENGTH = 60 def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( '-fps', '--fps', type=int, default=DEFAULT_FPS, help="Output video FPS" ) parser.add_argument( '-v', '--video_dir', type=str, help="Video directory name" ) parser.add_argument( '-vs', '--video_dirs', nargs='+', type=str, help="Video directory names" ) parser.add_argument( '-v2', '--video_dir2', type=str, help="Video directory name" ) parser.add_argument( '-sd', '--segms_dir', type=str, help="Segmentation directory name" ) parser.add_argument( '-fgd', '--fg_dir', type=str, help="Foreground directory name" ) parser.add_argument( '-fgfd', '--fg_frames_dir', type=str, help="Foreground frames directory name" ) parser.add_argument( '-fgsd', '--fg_segms_dir', type=str, help="Foreground segmentations directory name" ) parser.add_argument( '-syfd', '--syn_frames_dir', type=str, help="Synthesized frames directory name" ) parser.add_argument( '-bgfd', '--bg_frames_dir', type=str, help="Background frames directory name" ) parser.add_argument( '-rt', '--reader_type', type=str, help="Type of reader" ) parser.add_argument( '-od', '--output_dir', type=str, help="Output directory name" ) parser.add_argument( '-o', '--output_filename', type=str, required=True, help="Output output filename" ) args = parser.parse_args() return args class Reader: def __init__(self, dir_name, read=True, max_length=None, sample_period=1): self.dir_name = dir_name self.count = 0 self.max_length = max_length self.filenames = [] self.sample_period = sample_period if read: if os.path.exists(dir_name): # self.filenames = read_filenames_from_dir(dir_name, self.__class__.__name__) # ^^^^^ yield None when reading some videos of face forensics data # (related to 'Too many levels of symbolic links'?) self.filenames = sorted(glob(os.path.join(dir_name, '*'))) self.filenames = [f for f in self.filenames if os.path.isfile(f)] self.filenames = self.filenames[::sample_period][:max_length] self.files = self.read_files(self.filenames) else: self.files = [] logger.warning(f"Directory {dir_name} not exists!") else: self.files = [] self.current_index = 0 def append(self, file_): self.files.append(file_) def set_files(self, files): self.files = files def read_files(self, filenames): assert type(filenames) == list, f'filenames is not a list; dirname: {self.dir_name}' filenames.sort() frames = [] for filename in filenames: file_ = self.read_file(filename) frames.append(file_) return frames def save_files(self, output_dir=None): make_dirs(output_dir) logger.info(f"Saving {self.__class__.__name__} files to {output_dir}") for i, file_ in enumerate(self.files): self._save_file(output_dir, i, file_) def _save_file(self, output_dir, i, file_): raise NotImplementedError("This is an abstract function") def read_file(self, filename): raise NotImplementedError("This is an abstract function") def __iter__(self): return self def __next__(self): if self.current_index < len(self.files): file_ = self.files[self.current_index] self.current_index += 1 return file_ else: self.current_index = 0 raise StopIteration def __getitem__(self, key): return self.files[key] def __len__(self): return len(self.files) class FrameReader(Reader): def __init__( self, dir_name, resize=None, read=True, max_length=MAX_LENGTH, scale=1, sample_period=1 ): self.resize = resize self.scale = scale self.sample_period = sample_period super().__init__(dir_name, read, max_length, sample_period) def read_file(self, filename): origin_frame = Image.open(filename) size = self.resize if self.resize is not None else origin_frame.size origin_frame_resized = origin_frame.resize( (int(size[0] * self.scale), int(size[1] * self.scale)) ) return origin_frame_resized def _save_file(self, output_dir, i, file_): if len(self.filenames) == len(self.files): name = sorted(self.filenames)[i].split('/')[-1] else: name = f"frame_{i:04}.png" filename = os.path.join( output_dir, name ) file_.save(filename, "PNG") def write_files_to_video(self, output_filename, fps=DEFAULT_FPS, frame_num_when_repeat_list=[1]): logger.info( f"Writeing frames to video {output_filename} with FPS={fps}") video_writer = cv2.VideoWriter( output_filename, cv2.VideoWriter_fourcc(*"MJPG"), fps, self.files[0].size ) for frame_num_when_repeat in frame_num_when_repeat_list: for frame in self.files: frame = frame.convert("RGB") frame_cv = np.array(frame) frame_cv = cv2.cvtColor(frame_cv, cv2.COLOR_RGB2BGR) for i in range(frame_num_when_repeat): video_writer.write(frame_cv) video_writer.release() class SynthesizedFrameReader(FrameReader): def __init__( self, bg_frames_dir, fg_frames_dir, fg_segms_dir, segm_bbox_mask_dir, fg_dir, dir_name, bboxes_list_dir, fg_scale=0.7, fg_location=(48, 27), mask_only=False ): self.bg_reader = FrameReader(bg_frames_dir) self.size = self.bg_reader[0].size # TODO: add different location and change scale to var self.fg_reader = ForegroundReader( fg_frames_dir, fg_segms_dir, fg_dir, resize=self.size, scale=fg_scale ) self.fg_location = fg_location # self.masks = self.fg_reader.masks # self.bbox_masks = self.fg_reader.bbox_masks super().__init__(dir_name, read=False) self.files = self.synthesize_frames( self.bg_reader, self.fg_reader, mask_only) self.bbox_masks = MaskGenerator( segm_bbox_mask_dir, self.size, self.get_bboxeses() ) self.bboxes_list_dir = bboxes_list_dir self.bboxes_list = self.get_bboxeses() self.save_bboxes() def save_bboxes(self): make_dirs(self.bboxes_list_dir) logger.info(f"Saving bboxes to {self.bboxes_list_dir}") for i, bboxes in enumerate(self.bboxes_list): save_path = os.path.join(self.bboxes_list_dir, f"bboxes_{i:04}.txt") if len(bboxes) > 0: np.savetxt(save_path, bboxes[0], fmt='%4u') def get_bboxeses(self): bboxeses = self.fg_reader.segms.bboxeses new_bboxeses = [] for bboxes in bboxeses: new_bboxes = [] for bbox in bboxes: offset_bbox = bbox_offset(bbox, self.fg_location) new_bboxes.append(offset_bbox) new_bboxeses.append(new_bboxes) return new_bboxeses def synthesize_frames(self, bg_reader, fg_reader, mask_only=False): logger.info( f"Synthesizing {bg_reader.dir_name} and {fg_reader.dir_name}" ) synthesized_frames = [] for i, bg in enumerate(bg_reader): if i == len(fg_reader): break fg = fg_reader[i] mask = fg_reader.get_mask(i) synthesized_frame = bg.copy() if mask_only: synthesized_frame.paste(mask, self.fg_location, mask) else: synthesized_frame.paste(fg, self.fg_location, mask) synthesized_frames.append(synthesized_frame) return synthesized_frames class WarpedFrameReader(FrameReader): def __init__(self, dir_name, i, ks): self.i = i self.ks = ks super().__init__(dir_name) def _save_file(self, output_dir, i, file_): filename = os.path.join( output_dir, f"warped_frame_{self.i:04}_k{self.ks[i]:02}.png" ) file_.save(filename) class SegmentationReader(FrameReader): def __init__( self, dir_name, resize=None, scale=1 ): super().__init__( dir_name, resize=resize, scale=scale ) def read_file(self, filename): origin_frame = Image.open(filename) mask = ImageOps.invert(origin_frame.convert("L")) mask = mask.point(lambda x: 0 if x < 255 else 255, '1') size = self.resize if self.resize is not None else origin_frame.size mask_resized = mask.resize( (int(size[0] * self.scale), int(size[1] * self.scale)) ) return mask_resized class MaskReader(Reader): def __init__(self, dir_name, read=True): super().__init__(dir_name, read=read) def read_file(self, filename): mask = Image.open(filename) return mask def _save_file(self, output_dir, i, file_): filename = os.path.join( output_dir, f"mask_{i:04}.png" ) file_.save(filename) def get_bboxes(self, i): # TODO: save bbox instead of looking for one mask = self.files[i] mask = ImageOps.invert(mask.convert("L")).convert("1") mask = np.array(mask) image, contours, hier = cv2.findContours( mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) bboxes = [] for c in contours: # get the bounding rect x, y, w, h = cv2.boundingRect(c) bbox = ((x, y), (x + w - 1, y + h - 1)) bboxes.append(bbox) return bboxes def get_bbox(self, i): # TODO: save bbox instead of looking for one mask = self.files[i] mask = ImageOps.invert(mask.convert("L")) mask = np.array(mask) image, contours, hier = cv2.findContours( mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) for c in contours: # get the bounding rect x, y, w, h = cv2.boundingRect(c) bbox = ((x, y), (x + w - 1, y + h - 1)) return bbox class MaskGenerator(Reader): def __init__( self, mask_output_dir, size, bboxeses, save_masks=True ): self.bboxeses = bboxeses self.size = size super().__init__(mask_output_dir, read=False) self.files = self.generate_masks() if save_masks: make_dirs(mask_output_dir) self.save_files(mask_output_dir) def _save_file(self, output_dir, i, file_): filename = os.path.join( output_dir, f"mask_{i:04}.png" ) file_.save(filename) def get_bboxes(self, i): return self.bboxeses[i] def generate_masks(self): masks = [] for i in range(len(self.bboxeses)): mask = self.generate_mask(i) masks.append(mask) return masks def generate_mask(self, i): bboxes = self.bboxeses[i] mask = Image.new("1", self.size, 1) draw = ImageDraw.Draw(mask) for bbox in bboxes: draw.rectangle( bbox, fill=0 ) return mask class ForegroundReader(FrameReader): def __init__( self, frames_dir, segms_dir, dir_name, resize=None, scale=1 ): self.frames_dir = frames_dir self.segms_dir = segms_dir self.frames = FrameReader( frames_dir, resize=resize, scale=scale ) self.segms = SegmentationReader( segms_dir, resize=resize, scale=scale ) super().__init__(dir_name, read=False) self.masks = self.segms.masks # self.bbox_masks = self.segms.bbox_masks self.files = self.generate_fg_frames(self.frames, self.segms) def get_mask(self, i): return self.masks[i] def generate_fg_frames(self, frames, segms): logger.info( f"Generating fg frames from {self.frames_dir} and {self.segms_dir}" ) fg_frames = [] for i, frame in enumerate(frames): mask = self.masks[i] fg_frame = Image.new("RGB", frame.size, (0, 0, 0)) fg_frame.paste( frame, (0, 0), mask ) fg_frames.append(fg_frame) return fg_frames class CompareFramesReader(FrameReader): def __init__(self, dir_names, col=2, names=[], mask_dir=None): self.videos = [] for dir_name in dir_names: # If a method fails on this video, use None to indicate the situation try: self.videos.append(FrameReader(dir_name)) except AssertionError: self.videos.append(None) if mask_dir is not None: self.masks = MaskReader(mask_dir) self.names = names self.files = self.combine_videos(self.videos, col) def combine_videos(self, videos, col=2, edge_offset=35, h_start_offset=35): combined_frames = [] w, h = videos[0][0].size # Prevent the first method fails and have a "None" as its video i = 0 while videos[i] is None: i += 1 length = len(videos[i]) video_num = len(videos) row = ceil(video_num / col) for frame_idx in range(length): width = col * w + (col - 1) * edge_offset height = row * h + (row - 1) * edge_offset + h_start_offset combined_frame = Image.new("RGBA", (width, height)) draw = ImageDraw.Draw(combined_frame) for i, video in enumerate(videos): # Give the failed method a black output if video is None or frame_idx >= len(video): failed = True frame = Image.new("RGBA", (w, h)) else: frame = video[frame_idx].convert("RGBA") failed = False f_x = (i % col) * (w + edge_offset) f_y = (i // col) * (h + edge_offset) + h_start_offset combined_frame.paste(frame, (f_x, f_y)) # Draw name font = ImageFont.truetype("DejaVuSans.ttf", 12) # font = ImageFont.truetype("DejaVuSans-Bold.ttf", 13) # font = ImageFont.truetype("timesbd.ttf", 14) name = self.names[i] if not failed else f'{self.names[i]} (failed)' draw.text( (f_x + 10, f_y - 20), name, (255, 255, 255), font=font ) combined_frames.append(combined_frame) return combined_frames class BoundingBoxesListReader(Reader): def __init__( self, dir_name, resize=None, read=True, max_length=MAX_LENGTH, scale=1 ): self.resize = resize self.scale = scale super().__init__(dir_name, read, max_length) def read_file(self, filename): bboxes = np.loadtxt(filename, dtype=int) bboxes = [bboxes.tolist()] return bboxes def save_frames_to_dir(frames, dirname): reader = FrameReader(dirname, read=False) reader.set_files(frames) reader.save_files(dirname) if __name__ == "__main__": args = parse_args() if args.reader_type is None: reader = FrameReader(args.video_dir) elif args.reader_type == 'fg': reader = ForegroundReader( args.video_dir, args.segms_dir, args.fg_dir) elif args.reader_type == 'sy': reader = SynthesizedFrameReader( args.bg_frames_dir, args.fg_frames_dir, args.fg_segms_dir, args.fg_dir, args.syn_frames_dir ) elif args.reader_type == 'com': reader = CompareFramesReader( args.video_dirs ) reader.write_files_to_video( os.path.join(args.output_dir, args.output_filename), fps=args.fps )