Spaces:
Paused
Paused
File size: 8,419 Bytes
d49f7bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
""" Video Render Controller Class Module """
from __future__ import annotations
import time
import logging
from typing import List
from pathlib import Path
from abc import abstractmethod
import numpy as np
import numpy.typing as npt
import cv2
from OpenGL import GL
from tqdm import tqdm
from animated_drawings.controller.controller import Controller
from animated_drawings.model.scene import Scene
from animated_drawings.model.animated_drawing import AnimatedDrawing
from animated_drawings.view.view import View
from animated_drawings.config import ControllerConfig
NoneType = type(None) # for type checking below
class VideoRenderController(Controller):
""" Video Render Controller is used to non-interactively generate a video file """
def __init__(self, cfg: ControllerConfig, scene: Scene, view: View) -> None:
super().__init__(cfg, scene)
self.view: View = view
self.scene: Scene = scene
self.frames_left_to_render: int # when this becomes zero, stop rendering
self.delta_t: float # amount of time to progress scene between renders
self._set_frames_left_to_render_and_delta_t()
self.render_start_time: float # track when we started to render frames (for performance stats)
self.frames_rendered: int = 0 # track how many frames we've rendered
self.video_width: int
self.video_height: int
self.video_width, self.video_height = self.view.get_framebuffer_size()
self.video_writer: VideoWriter = VideoWriter.create_video_writer(self)
self.frame_data = np.empty([self.video_height, self.video_width, 4], dtype='uint8') # 4 for RGBA
self.progress_bar = tqdm(total=self.frames_left_to_render)
def _set_frames_left_to_render_and_delta_t(self) -> None:
"""
Based upon the animated drawings within the scene, computes maximum number of frames in a BVH.
Checks that all frame times within BVHs are equal, logs a warning if not.
Uses results to determine number of frames and frame time for output video.
"""
max_frames = 0
frame_time: List[float] = []
for child in self.scene.get_children():
if not isinstance(child, AnimatedDrawing):
continue
max_frames = max(max_frames, child.retargeter.bvh.frame_max_num)
frame_time.append(child.retargeter.bvh.frame_time)
if not all(x == frame_time[0] for x in frame_time):
msg = f'frame time of BVH files don\'t match. Using first value: {frame_time[0]}'
logging.warning(msg)
self.frames_left_to_render = max_frames
self.delta_t = frame_time[0]
def _prep_for_run_loop(self) -> None:
self.run_loop_start_time = time.time()
def _is_run_over(self) -> bool:
return self.frames_left_to_render == 0
def _start_run_loop_iteration(self) -> None:
self.view.clear_window()
def _update(self) -> None:
self.scene.update_transforms()
def _render(self) -> None:
self.view.render(self.scene)
def _tick(self) -> None:
self.scene.progress_time(self.delta_t)
def _handle_user_input(self) -> None:
""" ignore all user input when rendering video file """
def _finish_run_loop_iteration(self) -> None:
# get pixel values from the frame buffer, send them to the video writer
GL.glBindFramebuffer(GL.GL_READ_FRAMEBUFFER, 0)
GL.glReadPixels(0, 0, self.video_width, self.video_height, GL.GL_BGRA, GL.GL_UNSIGNED_BYTE, self.frame_data)
self.video_writer.process_frame(self.frame_data[::-1, :, :].copy())
# update our counts and progress_bar
self.frames_left_to_render -= 1
self.frames_rendered += 1
self.progress_bar.update(1)
def _cleanup_after_run_loop(self) -> None:
logging.info(f'Rendered {self.frames_rendered} frames in {time.time()-self.run_loop_start_time} seconds.')
self.view.cleanup()
_time = time.time()
self.video_writer.cleanup()
logging.info(f'Wrote video to file in in {time.time()-_time} seconds.')
class VideoWriter():
""" Wrapper to abstract the different backends necessary for writing different video filetypes """
def __init__(self) -> None:
pass
@abstractmethod
def process_frame(self, frame: npt.NDArray[np.uint8]) -> None:
""" Subclass must specify how to handle each frame of data received. """
pass
@abstractmethod
def cleanup(self) -> None:
""" Subclass must specify how to finish up after all frames have been received. """
pass
@staticmethod
def create_video_writer(controller: VideoRenderController) -> VideoWriter:
assert isinstance(controller.cfg.output_video_path, str) # for static analysis
output_p = Path(controller.cfg.output_video_path)
output_p.parent.mkdir(exist_ok=True, parents=True)
msg = f' Writing video to: {output_p.resolve()}'
logging.info(msg)
print(msg)
if output_p.suffix == '.gif':
return GIFWriter(controller)
elif output_p.suffix == '.mp4':
return MP4Writer(controller)
else:
msg = f'Unsupported output video file extension ({output_p.suffix}). Only .gif and .mp4 are supported.'
logging.critical(msg)
assert False, msg
class GIFWriter(VideoWriter):
""" Video writer for creating transparent, animated GIFs with Pillow """
def __init__(self, controller: VideoRenderController) -> None:
assert isinstance(controller.cfg.output_video_path, str) # for static analysis
self.output_p = Path(controller.cfg.output_video_path)
self.duration = int(controller.delta_t*1000)
if self.duration < 20:
msg = f'Specified duration of .gif is too low, replacing with 20: {self.duration}'
logging.warn(msg)
self.duration = 20
self.frames: List[npt.NDArray[np.uint8]] = []
def process_frame(self, frame: npt.NDArray[np.uint8]) -> None:
""" Reorder channels and save frames as they arrive"""
self.frames.append(cv2.cvtColor(frame, cv2.COLOR_BGRA2RGBA).astype(np.uint8))
def cleanup(self) -> None:
""" Write all frames to output path specified."""
from PIL import Image
self.output_p.parent.mkdir(exist_ok=True, parents=True)
logging.info(f'VideoWriter will write to {self.output_p.resolve()}')
ims = [Image.fromarray(a_frame) for a_frame in self.frames]
ims[0].save(self.output_p, save_all=True, append_images=ims[1:], duration=self.duration, disposal=2, loop=0)
class MP4Writer(VideoWriter):
""" Video writer for creating mp4 videos with cv2.VideoWriter """
def __init__(self, controller: VideoRenderController) -> None:
# validate and prep output path
if isinstance(controller.cfg.output_video_path, NoneType):
msg = 'output video path not specified for mp4 video writer'
logging.critical(msg)
assert False, msg
output_p = Path(controller.cfg.output_video_path)
output_p.parent.mkdir(exist_ok=True, parents=True)
logging.info(f'VideoWriter will write to {output_p.resolve()}')
# validate and prep codec
if isinstance(controller.cfg.output_video_codec, NoneType):
msg = 'output video codec not specified for mp4 video writer'
logging.critical(msg)
assert False, msg
fourcc = cv2.VideoWriter_fourcc(*controller.cfg.output_video_codec)
logging.info(f'Using codec {controller.cfg.output_video_codec}')
# calculate video writer framerate
frame_rate = round(1/controller.delta_t)
# initialize the video writer
self.video_writer = cv2.VideoWriter(str(output_p), fourcc, frame_rate, (controller.video_width, controller.video_height))
def process_frame(self, frame: npt.NDArray[np.uint8]) -> None:
""" Remove the alpha channel and send to the video writer as it arrives. """
self.video_writer.write(frame[:, :, :3])
def cleanup(self) -> None:
self.video_writer.release()
|