| | |
| | |
| | |
| | |
| | |
| | import numpy as np |
| | import matplotlib.cm as cm |
| | import imageio |
| | try: |
| | from decord import VideoReader, cpu |
| | DECORD_AVAILABLE = True |
| | except: |
| | import cv2 |
| | DECORD_AVAILABLE = False |
| |
|
| | def ensure_even(value): |
| | return value if value % 2 == 0 else value + 1 |
| |
|
| | def read_video_frames(video_path, process_length, target_fps=-1, max_res=-1): |
| | if DECORD_AVAILABLE: |
| | vid = VideoReader(video_path, ctx=cpu(0)) |
| | original_height, original_width = vid.get_batch([0]).shape[1:3] |
| | height = original_height |
| | width = original_width |
| | print(f'==> original video size: {original_height} x {original_width}') |
| | if max_res > 0 and max(height, width) > max_res: |
| | scale = max_res / max(original_height, original_width) |
| | height = ensure_even(round(original_height * scale)) |
| | width = ensure_even(round(original_width * scale)) |
| | print(f'==> downsample video size: {height} x {width}') |
| |
|
| | vid = VideoReader(video_path, ctx=cpu(0), width=width, height=height) |
| |
|
| | fps = vid.get_avg_fps() if target_fps == -1 else target_fps |
| | stride = round(vid.get_avg_fps() / fps) |
| | stride = max(stride, 1) |
| | frames_idx = list(range(0, len(vid), stride)) |
| | if process_length != -1 and process_length < len(frames_idx): |
| | frames_idx = frames_idx[:process_length] |
| | frames = vid.get_batch(frames_idx).asnumpy() |
| | else: |
| | cap = cv2.VideoCapture(video_path) |
| | original_fps = cap.get(cv2.CAP_PROP_FPS) |
| | original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| |
|
| | if max_res > 0 and max(original_height, original_width) > max_res: |
| | scale = max_res / max(original_height, original_width) |
| | height = round(original_height * scale) |
| | width = round(original_width * scale) |
| |
|
| | fps = original_fps if target_fps < 0 else target_fps |
| |
|
| | stride = max(round(original_fps / fps), 1) |
| |
|
| | frames = [] |
| | frame_count = 0 |
| | while cap.isOpened(): |
| | ret, frame = cap.read() |
| | if not ret or (process_length > 0 and frame_count >= process_length): |
| | break |
| | if frame_count % stride == 0: |
| | frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| | if max_res > 0 and max(original_height, original_width) > max_res: |
| | frame = cv2.resize(frame, (width, height)) |
| | frames.append(frame) |
| | frame_count += 1 |
| | cap.release() |
| | frames = np.stack(frames, axis=0) |
| |
|
| | return frames, fps |
| |
|
| |
|
| | def save_video(frames, output_video_path, fps=10, is_depths=False, grayscale=False): |
| | writer = imageio.get_writer(output_video_path, fps=fps, macro_block_size=1, codec='libx264', ffmpeg_params=['-crf', '18']) |
| | if is_depths: |
| | colormap = np.array(cm.get_cmap("inferno").colors) |
| | d_min, d_max = frames.min(), frames.max() |
| | for i in range(frames.shape[0]): |
| | depth = frames[i] |
| | depth_norm = ((depth - d_min) / (d_max - d_min) * 255).astype(np.uint8) |
| | depth_vis = (colormap[depth_norm] * 255).astype(np.uint8) if not grayscale else depth_norm |
| | writer.append_data(depth_vis) |
| | else: |
| | for i in range(frames.shape[0]): |
| | writer.append_data(frames[i]) |
| |
|
| | writer.close() |
| |
|