Spaces:
Paused
Paused
File size: 8,419 Bytes
d49f7bc |
|
# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
""" Video Render Controller Class Module """
from __future__ import annotations
import time
import logging
from typing import List
from pathlib import Path
from abc import abstractmethod
import numpy as np
import numpy.typing as npt
import cv2
from OpenGL import GL
from tqdm import tqdm
from animated_drawings.controller.controller import Controller
from animated_drawings.model.scene import Scene
from animated_drawings.model.animated_drawing import AnimatedDrawing
from animated_drawings.view.view import View
from animated_drawings.config import ControllerConfig
NoneType = type(None) # for type checking below
class VideoRenderController(Controller):
""" Video Render Controller is used to non-interactively generate a video file """
def __init__(self, cfg: ControllerConfig, scene: Scene, view: View) -> None:
super().__init__(cfg, scene)
self.view: View = view
self.scene: Scene = scene
self.frames_left_to_render: int # when this becomes zero, stop rendering
self.delta_t: float # amount of time to progress scene between renders
self._set_frames_left_to_render_and_delta_t()
self.render_start_time: float # track when we started to render frames (for performance stats)
self.frames_rendered: int = 0 # track how many frames we've rendered
self.video_width: int
self.video_height: int
self.video_width, self.video_height = self.view.get_framebuffer_size()
self.video_writer: VideoWriter = VideoWriter.create_video_writer(self)
self.frame_data = np.empty([self.video_height, self.video_width, 4], dtype='uint8') # 4 for RGBA
self.progress_bar = tqdm(total=self.frames_left_to_render)
def _set_frames_left_to_render_and_delta_t(self) -> None:
"""
Based upon the animated drawings within the scene, computes maximum number of frames in a BVH.
Checks that all frame times within BVHs are equal, logs a warning if not.
Uses results to determine number of frames and frame time for output video.
"""
max_frames = 0
frame_time: List[float] = []
for child in self.scene.get_children():
if not isinstance(child, AnimatedDrawing):
continue
max_frames = max(max_frames, child.retargeter.bvh.frame_max_num)
frame_time.append(child.retargeter.bvh.frame_time)
if not all(x == frame_time[0] for x in frame_time):
msg = f'frame time of BVH files don\'t match. Using first value: {frame_time[0]}'
logging.warning(msg)
self.frames_left_to_render = max_frames
self.delta_t = frame_time[0]
def _prep_for_run_loop(self) -> None:
self.run_loop_start_time = time.time()
def _is_run_over(self) -> bool:
return self.frames_left_to_render == 0
def _start_run_loop_iteration(self) -> None:
self.view.clear_window()
def _update(self) -> None:
self.scene.update_transforms()
def _render(self) -> None:
self.view.render(self.scene)
def _tick(self) -> None:
self.scene.progress_time(self.delta_t)
def _handle_user_input(self) -> None:
""" ignore all user input when rendering video file """
def _finish_run_loop_iteration(self) -> None:
# get pixel values from the frame buffer, send them to the video writer
GL.glBindFramebuffer(GL.GL_READ_FRAMEBUFFER, 0)
GL.glReadPixels(0, 0, self.video_width, self.video_height, GL.GL_BGRA, GL.GL_UNSIGNED_BYTE, self.frame_data)
self.video_writer.process_frame(self.frame_data[::-1, :, :].copy())
# update our counts and progress_bar
self.frames_left_to_render -= 1
self.frames_rendered += 1
self.progress_bar.update(1)
def _cleanup_after_run_loop(self) -> None:
logging.info(f'Rendered {self.frames_rendered} frames in {time.time()-self.run_loop_start_time} seconds.')
self.view.cleanup()
_time = time.time()
self.video_writer.cleanup()
logging.info(f'Wrote video to file in in {time.time()-_time} seconds.')
class VideoWriter():
""" Wrapper to abstract the different backends necessary for writing different video filetypes """
def __init__(self) -> None:
pass
@abstractmethod
def process_frame(self, frame: npt.NDArray[np.uint8]) -> None:
""" Subclass must specify how to handle each frame of data received. """
pass
@abstractmethod
def cleanup(self) -> None:
""" Subclass must specify how to finish up after all frames have been received. """
pass
@staticmethod
def create_video_writer(controller: VideoRenderController) -> VideoWriter:
assert isinstance(controller.cfg.output_video_path, str) # for static analysis
output_p = Path(controller.cfg.output_video_path)
output_p.parent.mkdir(exist_ok=True, parents=True)
msg = f' Writing video to: {output_p.resolve()}'
logging.info(msg)
print(msg)
if output_p.suffix == '.gif':
return GIFWriter(controller)
elif output_p.suffix == '.mp4':
return MP4Writer(controller)
else:
msg = f'Unsupported output video file extension ({output_p.suffix}). Only .gif and .mp4 are supported.'
logging.critical(msg)
assert False, msg
class GIFWriter(VideoWriter):
""" Video writer for creating transparent, animated GIFs with Pillow """
def __init__(self, controller: VideoRenderController) -> None:
assert isinstance(controller.cfg.output_video_path, str) # for static analysis
self.output_p = Path(controller.cfg.output_video_path)
self.duration = int(controller.delta_t*1000)
if self.duration < 20:
msg = f'Specified duration of .gif is too low, replacing with 20: {self.duration}'
logging.warn(msg)
self.duration = 20
self.frames: List[npt.NDArray[np.uint8]] = []
def process_frame(self, frame: npt.NDArray[np.uint8]) -> None:
""" Reorder channels and save frames as they arrive"""
self.frames.append(cv2.cvtColor(frame, cv2.COLOR_BGRA2RGBA).astype(np.uint8))
def cleanup(self) -> None:
""" Write all frames to output path specified."""
from PIL import Image
self.output_p.parent.mkdir(exist_ok=True, parents=True)
logging.info(f'VideoWriter will write to {self.output_p.resolve()}')
ims = [Image.fromarray(a_frame) for a_frame in self.frames]
ims[0].save(self.output_p, save_all=True, append_images=ims[1:], duration=self.duration, disposal=2, loop=0)
class MP4Writer(VideoWriter):
""" Video writer for creating mp4 videos with cv2.VideoWriter """
def __init__(self, controller: VideoRenderController) -> None:
# validate and prep output path
if isinstance(controller.cfg.output_video_path, NoneType):
msg = 'output video path not specified for mp4 video writer'
logging.critical(msg)
assert False, msg
output_p = Path(controller.cfg.output_video_path)
output_p.parent.mkdir(exist_ok=True, parents=True)
logging.info(f'VideoWriter will write to {output_p.resolve()}')
# validate and prep codec
if isinstance(controller.cfg.output_video_codec, NoneType):
msg = 'output video codec not specified for mp4 video writer'
logging.critical(msg)
assert False, msg
fourcc = cv2.VideoWriter_fourcc(*controller.cfg.output_video_codec)
logging.info(f'Using codec {controller.cfg.output_video_codec}')
# calculate video writer framerate
frame_rate = round(1/controller.delta_t)
# initialize the video writer
self.video_writer = cv2.VideoWriter(str(output_p), fourcc, frame_rate, (controller.video_width, controller.video_height))
def process_frame(self, frame: npt.NDArray[np.uint8]) -> None:
""" Remove the alpha channel and send to the video writer as it arrives. """
self.video_writer.write(frame[:, :, :3])
def cleanup(self) -> None:
self.video_writer.release()
|