oguzakif's picture
init repo
d4b77ac
raw
history blame
17 kB
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # NOQA
import argparse
from math import ceil
from glob import glob
import numpy as np
import cv2
from PIL import Image, ImageDraw, ImageOps, ImageFont
from utils.logging_config import logger
from utils.util import make_dirs, bbox_offset
DEFAULT_FPS = 6
MAX_LENGTH = 60
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'-fps', '--fps',
type=int, default=DEFAULT_FPS,
help="Output video FPS"
)
parser.add_argument(
'-v', '--video_dir',
type=str,
help="Video directory name"
)
parser.add_argument(
'-vs', '--video_dirs',
nargs='+',
type=str,
help="Video directory names"
)
parser.add_argument(
'-v2', '--video_dir2',
type=str,
help="Video directory name"
)
parser.add_argument(
'-sd', '--segms_dir',
type=str,
help="Segmentation directory name"
)
parser.add_argument(
'-fgd', '--fg_dir',
type=str,
help="Foreground directory name"
)
parser.add_argument(
'-fgfd', '--fg_frames_dir',
type=str,
help="Foreground frames directory name"
)
parser.add_argument(
'-fgsd', '--fg_segms_dir',
type=str,
help="Foreground segmentations directory name"
)
parser.add_argument(
'-syfd', '--syn_frames_dir',
type=str,
help="Synthesized frames directory name"
)
parser.add_argument(
'-bgfd', '--bg_frames_dir',
type=str,
help="Background frames directory name"
)
parser.add_argument(
'-rt', '--reader_type',
type=str,
help="Type of reader"
)
parser.add_argument(
'-od', '--output_dir',
type=str,
help="Output directory name"
)
parser.add_argument(
'-o', '--output_filename',
type=str, required=True,
help="Output output filename"
)
args = parser.parse_args()
return args
class Reader:
def __init__(self, dir_name, read=True, max_length=None, sample_period=1):
self.dir_name = dir_name
self.count = 0
self.max_length = max_length
self.filenames = []
self.sample_period = sample_period
if read:
if os.path.exists(dir_name):
# self.filenames = read_filenames_from_dir(dir_name, self.__class__.__name__)
# ^^^^^ yield None when reading some videos of face forensics data
# (related to 'Too many levels of symbolic links'?)
self.filenames = sorted(glob(os.path.join(dir_name, '*')))
self.filenames = [f for f in self.filenames if os.path.isfile(f)]
self.filenames = self.filenames[::sample_period][:max_length]
self.files = self.read_files(self.filenames)
else:
self.files = []
logger.warning(f"Directory {dir_name} not exists!")
else:
self.files = []
self.current_index = 0
def append(self, file_):
self.files.append(file_)
def set_files(self, files):
self.files = files
def read_files(self, filenames):
assert type(filenames) == list, f'filenames is not a list; dirname: {self.dir_name}'
filenames.sort()
frames = []
for filename in filenames:
file_ = self.read_file(filename)
frames.append(file_)
return frames
def save_files(self, output_dir=None):
make_dirs(output_dir)
logger.info(f"Saving {self.__class__.__name__} files to {output_dir}")
for i, file_ in enumerate(self.files):
self._save_file(output_dir, i, file_)
def _save_file(self, output_dir, i, file_):
raise NotImplementedError("This is an abstract function")
def read_file(self, filename):
raise NotImplementedError("This is an abstract function")
def __iter__(self):
return self
def __next__(self):
if self.current_index < len(self.files):
file_ = self.files[self.current_index]
self.current_index += 1
return file_
else:
self.current_index = 0
raise StopIteration
def __getitem__(self, key):
return self.files[key]
def __len__(self):
return len(self.files)
class FrameReader(Reader):
def __init__(
self, dir_name, resize=None, read=True, max_length=MAX_LENGTH,
scale=1, sample_period=1
):
self.resize = resize
self.scale = scale
self.sample_period = sample_period
super().__init__(dir_name, read, max_length, sample_period)
def read_file(self, filename):
origin_frame = Image.open(filename)
size = self.resize if self.resize is not None else origin_frame.size
origin_frame_resized = origin_frame.resize(
(int(size[0] * self.scale), int(size[1] * self.scale))
)
return origin_frame_resized
def _save_file(self, output_dir, i, file_):
if len(self.filenames) == len(self.files):
name = sorted(self.filenames)[i].split('/')[-1]
else:
name = f"frame_{i:04}.png"
filename = os.path.join(
output_dir, name
)
file_.save(filename, "PNG")
def write_files_to_video(self, output_filename, fps=DEFAULT_FPS, frame_num_when_repeat_list=[1]):
logger.info(
f"Writeing frames to video {output_filename} with FPS={fps}")
video_writer = cv2.VideoWriter(
output_filename,
cv2.VideoWriter_fourcc(*"MJPG"),
fps,
self.files[0].size
)
for frame_num_when_repeat in frame_num_when_repeat_list:
for frame in self.files:
frame = frame.convert("RGB")
frame_cv = np.array(frame)
frame_cv = cv2.cvtColor(frame_cv, cv2.COLOR_RGB2BGR)
for i in range(frame_num_when_repeat):
video_writer.write(frame_cv)
video_writer.release()
class SynthesizedFrameReader(FrameReader):
def __init__(
self, bg_frames_dir, fg_frames_dir,
fg_segms_dir, segm_bbox_mask_dir, fg_dir, dir_name,
bboxes_list_dir,
fg_scale=0.7, fg_location=(48, 27), mask_only=False
):
self.bg_reader = FrameReader(bg_frames_dir)
self.size = self.bg_reader[0].size
# TODO: add different location and change scale to var
self.fg_reader = ForegroundReader(
fg_frames_dir, fg_segms_dir, fg_dir,
resize=self.size,
scale=fg_scale
)
self.fg_location = fg_location
# self.masks = self.fg_reader.masks
# self.bbox_masks = self.fg_reader.bbox_masks
super().__init__(dir_name, read=False)
self.files = self.synthesize_frames(
self.bg_reader, self.fg_reader, mask_only)
self.bbox_masks = MaskGenerator(
segm_bbox_mask_dir, self.size, self.get_bboxeses()
)
self.bboxes_list_dir = bboxes_list_dir
self.bboxes_list = self.get_bboxeses()
self.save_bboxes()
def save_bboxes(self):
make_dirs(self.bboxes_list_dir)
logger.info(f"Saving bboxes to {self.bboxes_list_dir}")
for i, bboxes in enumerate(self.bboxes_list):
save_path = os.path.join(self.bboxes_list_dir, f"bboxes_{i:04}.txt")
if len(bboxes) > 0:
np.savetxt(save_path, bboxes[0], fmt='%4u')
def get_bboxeses(self):
bboxeses = self.fg_reader.segms.bboxeses
new_bboxeses = []
for bboxes in bboxeses:
new_bboxes = []
for bbox in bboxes:
offset_bbox = bbox_offset(bbox, self.fg_location)
new_bboxes.append(offset_bbox)
new_bboxeses.append(new_bboxes)
return new_bboxeses
def synthesize_frames(self, bg_reader, fg_reader, mask_only=False):
logger.info(
f"Synthesizing {bg_reader.dir_name} and {fg_reader.dir_name}"
)
synthesized_frames = []
for i, bg in enumerate(bg_reader):
if i == len(fg_reader):
break
fg = fg_reader[i]
mask = fg_reader.get_mask(i)
synthesized_frame = bg.copy()
if mask_only:
synthesized_frame.paste(mask, self.fg_location, mask)
else:
synthesized_frame.paste(fg, self.fg_location, mask)
synthesized_frames.append(synthesized_frame)
return synthesized_frames
class WarpedFrameReader(FrameReader):
def __init__(self, dir_name, i, ks):
self.i = i
self.ks = ks
super().__init__(dir_name)
def _save_file(self, output_dir, i, file_):
filename = os.path.join(
output_dir,
f"warped_frame_{self.i:04}_k{self.ks[i]:02}.png"
)
file_.save(filename)
class SegmentationReader(FrameReader):
def __init__(
self, dir_name,
resize=None, scale=1
):
super().__init__(
dir_name, resize=resize, scale=scale
)
def read_file(self, filename):
origin_frame = Image.open(filename)
mask = ImageOps.invert(origin_frame.convert("L"))
mask = mask.point(lambda x: 0 if x < 255 else 255, '1')
size = self.resize if self.resize is not None else origin_frame.size
mask_resized = mask.resize(
(int(size[0] * self.scale), int(size[1] * self.scale))
)
return mask_resized
class MaskReader(Reader):
def __init__(self, dir_name, read=True):
super().__init__(dir_name, read=read)
def read_file(self, filename):
mask = Image.open(filename)
return mask
def _save_file(self, output_dir, i, file_):
filename = os.path.join(
output_dir,
f"mask_{i:04}.png"
)
file_.save(filename)
def get_bboxes(self, i):
# TODO: save bbox instead of looking for one
mask = self.files[i]
mask = ImageOps.invert(mask.convert("L")).convert("1")
mask = np.array(mask)
image, contours, hier = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
bboxes = []
for c in contours:
# get the bounding rect
x, y, w, h = cv2.boundingRect(c)
bbox = ((x, y), (x + w - 1, y + h - 1))
bboxes.append(bbox)
return bboxes
def get_bbox(self, i):
# TODO: save bbox instead of looking for one
mask = self.files[i]
mask = ImageOps.invert(mask.convert("L"))
mask = np.array(mask)
image, contours, hier = cv2.findContours(
mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
for c in contours:
# get the bounding rect
x, y, w, h = cv2.boundingRect(c)
bbox = ((x, y), (x + w - 1, y + h - 1))
return bbox
class MaskGenerator(Reader):
def __init__(
self, mask_output_dir, size, bboxeses, save_masks=True
):
self.bboxeses = bboxeses
self.size = size
super().__init__(mask_output_dir, read=False)
self.files = self.generate_masks()
if save_masks:
make_dirs(mask_output_dir)
self.save_files(mask_output_dir)
def _save_file(self, output_dir, i, file_):
filename = os.path.join(
output_dir,
f"mask_{i:04}.png"
)
file_.save(filename)
def get_bboxes(self, i):
return self.bboxeses[i]
def generate_masks(self):
masks = []
for i in range(len(self.bboxeses)):
mask = self.generate_mask(i)
masks.append(mask)
return masks
def generate_mask(self, i):
bboxes = self.bboxeses[i]
mask = Image.new("1", self.size, 1)
draw = ImageDraw.Draw(mask)
for bbox in bboxes:
draw.rectangle(
bbox, fill=0
)
return mask
class ForegroundReader(FrameReader):
def __init__(
self, frames_dir, segms_dir, dir_name,
resize=None, scale=1
):
self.frames_dir = frames_dir
self.segms_dir = segms_dir
self.frames = FrameReader(
frames_dir,
resize=resize, scale=scale
)
self.segms = SegmentationReader(
segms_dir, resize=resize, scale=scale
)
super().__init__(dir_name, read=False)
self.masks = self.segms.masks
# self.bbox_masks = self.segms.bbox_masks
self.files = self.generate_fg_frames(self.frames, self.segms)
def get_mask(self, i):
return self.masks[i]
def generate_fg_frames(self, frames, segms):
logger.info(
f"Generating fg frames from {self.frames_dir} and {self.segms_dir}"
)
fg_frames = []
for i, frame in enumerate(frames):
mask = self.masks[i]
fg_frame = Image.new("RGB", frame.size, (0, 0, 0))
fg_frame.paste(
frame, (0, 0),
mask
)
fg_frames.append(fg_frame)
return fg_frames
class CompareFramesReader(FrameReader):
def __init__(self, dir_names, col=2, names=[], mask_dir=None):
self.videos = []
for dir_name in dir_names:
# If a method fails on this video, use None to indicate the situation
try:
self.videos.append(FrameReader(dir_name))
except AssertionError:
self.videos.append(None)
if mask_dir is not None:
self.masks = MaskReader(mask_dir)
self.names = names
self.files = self.combine_videos(self.videos, col)
def combine_videos(self, videos, col=2, edge_offset=35, h_start_offset=35):
combined_frames = []
w, h = videos[0][0].size
# Prevent the first method fails and have a "None" as its video
i = 0
while videos[i] is None:
i += 1
length = len(videos[i])
video_num = len(videos)
row = ceil(video_num / col)
for frame_idx in range(length):
width = col * w + (col - 1) * edge_offset
height = row * h + (row - 1) * edge_offset + h_start_offset
combined_frame = Image.new("RGBA", (width, height))
draw = ImageDraw.Draw(combined_frame)
for i, video in enumerate(videos):
# Give the failed method a black output
if video is None or frame_idx >= len(video):
failed = True
frame = Image.new("RGBA", (w, h))
else:
frame = video[frame_idx].convert("RGBA")
failed = False
f_x = (i % col) * (w + edge_offset)
f_y = (i // col) * (h + edge_offset) + h_start_offset
combined_frame.paste(frame, (f_x, f_y))
# Draw name
font = ImageFont.truetype("DejaVuSans.ttf", 12)
# font = ImageFont.truetype("DejaVuSans-Bold.ttf", 13)
# font = ImageFont.truetype("timesbd.ttf", 14)
name = self.names[i] if not failed else f'{self.names[i]} (failed)'
draw.text(
(f_x + 10, f_y - 20),
name, (255, 255, 255), font=font
)
combined_frames.append(combined_frame)
return combined_frames
class BoundingBoxesListReader(Reader):
def __init__(
self, dir_name, resize=None, read=True, max_length=MAX_LENGTH,
scale=1
):
self.resize = resize
self.scale = scale
super().__init__(dir_name, read, max_length)
def read_file(self, filename):
bboxes = np.loadtxt(filename, dtype=int)
bboxes = [bboxes.tolist()]
return bboxes
def save_frames_to_dir(frames, dirname):
reader = FrameReader(dirname, read=False)
reader.set_files(frames)
reader.save_files(dirname)
if __name__ == "__main__":
args = parse_args()
if args.reader_type is None:
reader = FrameReader(args.video_dir)
elif args.reader_type == 'fg':
reader = ForegroundReader(
args.video_dir, args.segms_dir, args.fg_dir)
elif args.reader_type == 'sy':
reader = SynthesizedFrameReader(
args.bg_frames_dir, args.fg_frames_dir,
args.fg_segms_dir, args.fg_dir, args.syn_frames_dir
)
elif args.reader_type == 'com':
reader = CompareFramesReader(
args.video_dirs
)
reader.write_files_to_video(
os.path.join(args.output_dir, args.output_filename),
fps=args.fps
)