Spaces:
Running
Running
#!/usr/bin/env python3 | |
# zoom_video_composer.py v0.2.1 | |
# https://github.com/mwydmuch/ZoomVideoComposer | |
# https://github.com/miwaniza/ZoomVideoComposer | |
# Copyright (c) 2023 Marek Wydmuch, Dmytro Yemelianov | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import os | |
import shutil | |
from concurrent.futures import ThreadPoolExecutor | |
from hashlib import md5 | |
from math import ceil, pow, sin, cos, pi | |
import cv2 | |
import gradio as gr | |
from moviepy.editor import AudioFileClip, VideoFileClip | |
EASING_FUNCTIONS = { | |
"linear": lambda x: x, | |
"easeInSine": lambda x: 1 - cos((x * pi) / 2), | |
"easeOutSine": lambda x: sin((x * pi) / 2), | |
"easeInOutSine": lambda x: -(cos(pi * x) - 1) / 2, | |
"easeInQuad": lambda x: x * x, | |
"easeOutQuad": lambda x: 1 - (1 - x) * (1 - x), | |
"easeInOutQuad": lambda x: 2 * x * x if x < 0.5 else 1 - pow(-2 * x + 2, 2) / 2, | |
"easeInCubic": lambda x: x * x * x, | |
"easeOutCubic": lambda x: 1 - pow(1 - x, 3), | |
"easeInOutCubic": lambda x: 4 * x * x * x | |
if x < 0.5 | |
else 1 - pow(-2 * x + 2, 3) / 2, | |
} | |
DEFAULT_EASING_KEY = "easeInOutSine" | |
DEFAULT_EASING_FUNCTION = EASING_FUNCTIONS[DEFAULT_EASING_KEY] | |
def zoom_crop_cv2(image, zoom): | |
height, width, channels = image.shape | |
zoom_size = (int(width * zoom), int(height * zoom)) | |
# crop box as integers | |
crop_box = ( | |
int((zoom_size[0] - width) / 2), | |
int((zoom_size[1] - height) / 2), | |
int((zoom_size[0] + width) / 2), | |
int((zoom_size[1] + height) / 2), | |
) | |
im = cv2.resize(image, zoom_size, interpolation=cv2.INTER_LANCZOS4) | |
im = im[crop_box[1]:crop_box[3], crop_box[0]:crop_box[2]] | |
return im | |
def resize_scale(image, scale): | |
height, width = image.shape[:2] | |
return cv2.resize(image, (int(width * scale), int(height * scale)), interpolation=cv2.INTER_LANCZOS4) | |
def zoom_in_log(easing_func, i, num_frames, num_images): | |
return (easing_func(i / (num_frames - 1))) * num_images | |
def zoom_out_log(easing_func, i, num_frames, num_images): | |
return (1 - easing_func(i / (num_frames - 1))) * num_images | |
def zoom_in(zoom, easing_func, i, num_frames, num_images): | |
return zoom ** zoom_in_log(easing_func, i, num_frames, num_images) | |
def zoom_out(zoom, easing_func, i, num_frames, num_images): | |
return zoom ** zoom_out_log(easing_func, i, num_frames, num_images) | |
def get_px_or_fraction(value, reference_value): | |
if value <= 1: | |
value = reference_value * value | |
return int(value) | |
def zoom_video_composer( | |
image_paths, | |
audio_path, | |
zoom, | |
duration, | |
easing, | |
direction, | |
fps, | |
reverse_images, | |
progress=gr.Progress() | |
): | |
"""Compose a zoom video from multiple provided images.""" | |
output = "output.mp4" | |
tmp_dir = "tmp" | |
width = 1 | |
height = 1 | |
margin = 0.05 | |
keep_frames = False | |
skip_video_generation = False | |
# Read images from image_paths | |
images_cv2 = list(cv2.imread(image_path.name) for image_path in image_paths) | |
if len(images_cv2) < 2: | |
raise gr.Error("At least two images are required to create a zoom video") | |
progress(0, desc="Images loaded") | |
# Setup some additional variables | |
easing_func = EASING_FUNCTIONS.get(easing, None) | |
if easing_func is None: | |
raise gr.Error(f"Unsupported easing function: {easing}") | |
num_images = len(images_cv2) - 1 | |
num_frames = int(duration * fps) | |
num_frames_half = int(num_frames / 2) | |
tmp_dir_hash = os.path.join(tmp_dir, md5(output.encode("utf-8")).hexdigest()) | |
width = get_px_or_fraction(width, images_cv2[0].shape[1]) | |
height = get_px_or_fraction(height, images_cv2[0].shape[0]) | |
margin = get_px_or_fraction(margin, min(images_cv2[0].shape[1], images_cv2[0].shape[0])) | |
# Create tmp dir | |
if not os.path.exists(tmp_dir_hash): | |
progress(0, desc="Creating temporary directory for frames") | |
os.makedirs(tmp_dir_hash, exist_ok=True) | |
if direction in ["out", "outin"]: | |
images_cv2.reverse() | |
if reverse_images: | |
images_cv2.reverse() | |
# Blend images (take care of margins) | |
progress(0, desc=f"Blending {len(images_cv2)} images") | |
for i in progress.tqdm(range(1, num_images + 1), desc="Blending images"): | |
inner_image = images_cv2[i] | |
outer_image = images_cv2[i - 1] | |
inner_image = inner_image[ | |
margin:inner_image.shape[0] - margin, | |
margin:inner_image.shape[1] - margin | |
] | |
image = zoom_crop_cv2(outer_image, zoom) | |
image[ | |
margin:margin + inner_image.shape[0], | |
margin:margin + inner_image.shape[1] | |
] = inner_image | |
images_cv2[i] = image | |
images_resized = [resize_scale(i, zoom) for i in images_cv2] | |
for i in progress.tqdm(range(num_images, 0, -1), desc="Resizing images"): | |
inner_image = images_resized[i] | |
image = images_resized[i - 1] | |
inner_image = resize_scale(inner_image, 1.0 / zoom) | |
h, w = image.shape[:2] | |
ih, iw = inner_image.shape[:2] | |
x = int((w - iw) / 2) | |
y = int((h - ih) / 2) | |
image[y:y + ih, x:x + iw] = inner_image | |
images_resized[i] = image | |
images_cv2 = images_resized | |
# Create frames | |
def process_frame(i): # to improve | |
if direction == "in": | |
current_zoom_log = zoom_in_log(easing_func, i, num_frames, num_images) | |
elif direction == "out": | |
current_zoom_log = zoom_out_log(easing_func, i, num_frames, num_images) | |
elif direction == "inout": | |
if i < num_frames_half: | |
current_zoom_log = zoom_in_log( | |
easing_func, i, num_frames_half, num_images | |
) | |
else: | |
current_zoom_log = zoom_out_log( | |
easing_func, i - num_frames_half, num_frames_half, num_images | |
) | |
elif direction == "outin": | |
if i < num_frames_half: | |
current_zoom_log = zoom_out_log( | |
easing_func, i, num_frames_half, num_images | |
) | |
else: | |
current_zoom_log = zoom_in_log( | |
easing_func, i - num_frames_half, num_frames_half, num_images | |
) | |
else: | |
raise gr.Error(f"Unsupported direction: {direction}") | |
current_image_idx = ceil(current_zoom_log) | |
local_zoom = zoom ** (current_zoom_log - current_image_idx + 1) | |
if current_zoom_log == 0.0: | |
frame_image = images_cv2[0] | |
else: | |
frame_image = images_cv2[current_image_idx] | |
frame_image = zoom_crop_cv2(frame_image, local_zoom) | |
frame_image = cv2.resize(frame_image, (width, height), interpolation=cv2.INTER_LANCZOS4) | |
frame_path = os.path.join(tmp_dir_hash, f"{i:06d}.png") | |
cv2.imwrite(frame_path, frame_image) | |
progress(0, desc=f"Creating {num_frames} frames") | |
with ThreadPoolExecutor(8) as executor: | |
list(progress.tqdm(executor.map(process_frame, range(num_frames)), total=num_frames, desc="Creating frames")) | |
# Write video | |
progress(0, desc=f"Writing video to: {output}") | |
image_files = [ | |
os.path.join(tmp_dir_hash, f"{i:06d}.png") for i in range(num_frames) | |
] | |
# Create video clip using images in tmp dir and audio if provided | |
frame_size = (width, height) | |
out = cv2.VideoWriter(output, cv2.VideoWriter_fourcc(*'mp4v'), fps, frame_size) | |
for i in progress.tqdm(range(num_frames), desc="Writing video"): | |
frame = cv2.imread(image_files[i]) | |
out.write(frame) | |
out.release() | |
if audio_path is not None: | |
audio = AudioFileClip(audio_path.name) | |
video = VideoFileClip(output) | |
audio = audio.subclip(0, video.end) | |
video = video.set_audio(audio) | |
video_write_kwargs = {"audio_codec": "aac"} | |
output_audio = os.path.splitext(output)[0] + "_audio.mp4" | |
video.write_videofile(output_audio, **video_write_kwargs) | |
output = output_audio | |
# Remove tmp dir | |
if not keep_frames and not skip_video_generation: | |
shutil.rmtree(tmp_dir_hash, ignore_errors=False, onerror=None) | |
if not os.listdir(tmp_dir): | |
progress(0, desc=f"Removing empty temporary directory for frames: {tmp_dir} ...") | |
os.rmdir(tmp_dir) | |
return output | |
grInputs = [ | |
gr.File(file_count="multiple", label="Upload images as folder", file_types=["image"]), | |
gr.File(file_count="single", label="Upload audio", file_types=["audio"]), | |
gr.inputs.Slider(label="Zoom factor/ratio between images", minimum=1.0, maximum=5.0, step=0.1, default=2.0), | |
gr.inputs.Slider(label="Duration of the video in seconds", minimum=1.0, maximum=60.0, step=1.0, default=10.0), | |
gr.inputs.Dropdown(label="Easing function used for zooming", | |
choices=["linear", "easeInSine", "easeOutSine", "easeInOutSine", "easeInQuad", "easeOutQuad", | |
"easeInOutQuad", "easeInCubic", "easeOutCubic", "easeInOutCubic"], | |
default="easeInOutSine"), | |
gr.inputs.Dropdown(label="Zoom direction. Inout and outin combine both directions", | |
choices=["in", "out", "inout", "outin"], default="out"), | |
gr.inputs.Slider(label="Frames per second of the output video", minimum=1, maximum=60, step=1, default=30), | |
gr.inputs.Checkbox(label="Reverse images", default=False) | |
] | |
iface = gr.Interface( | |
fn=zoom_video_composer, | |
inputs=grInputs, | |
outputs=[gr.outputs.Video(label="Video")], | |
title="Zoom Video Composer", | |
description="Compose a zoom video from multiple provided images.", | |
allow_flagging=False, | |
allow_screenshot=True, | |
allow_embedding=True, | |
allow_download=True) | |
iface.queue(concurrency_count=10).launch() |