Spaces:
Paused
Paused
import gradio as gr | |
import os | |
import subprocess | |
import cv2 | |
import numpy as np | |
from moviepy.editor import VideoFileClip, concatenate_videoclips | |
import math | |
from huggingface_hub import snapshot_download | |
model_ids = [ | |
'runwayml/stable-diffusion-v1-5', | |
'lllyasviel/sd-controlnet-depth', | |
'lllyasviel/sd-controlnet-canny', | |
'lllyasviel/sd-controlnet-openpose', | |
] | |
for model_id in model_ids: | |
model_name = model_id.split('/')[-1] | |
snapshot_download(model_id, local_dir=f'checkpoints/{model_name}') | |
def get_frame_count(filepath): | |
if filepath is not None: | |
video = cv2.VideoCapture(filepath) | |
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
video.release() | |
#LIMITS | |
if frame_count > 24 : | |
frame_count = 24 # limit to 24 frames to avoid cuDNN errors | |
return gr.update(maximum=frame_count) | |
else: | |
return gr.update(value=1, maximum=12 ) | |
def get_video_dimension(filepath): | |
video = cv2.VideoCapture(filepath) | |
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(video.get(cv2.CAP_PROP_FPS)) | |
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
video.release() | |
return width, height, fps, frame_count | |
def resize_video(input_vid, output_vid, width, height, fps): | |
print(f"RESIZING ...") | |
# Open the input video file | |
video = cv2.VideoCapture(input_vid) | |
# Get the original video's width and height | |
original_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
original_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Create a VideoWriter object to write the resized video | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for the output video | |
output_video = cv2.VideoWriter(output_vid, fourcc, fps, (width, height)) | |
while True: | |
# Read a frame from the input video | |
ret, frame = video.read() | |
if not ret: | |
break | |
# Resize the frame to the desired dimensions | |
resized_frame = cv2.resize(frame, (width, height)) | |
# Write the resized frame to the output video file | |
output_video.write(resized_frame) | |
# Release the video objects | |
video.release() | |
output_video.release() | |
print(f"RESIZE VIDEO DONE!") | |
return output_vid | |
def normalize_and_save_video(input_video_path, output_video_path): | |
print(f"NORMALIZING ...") | |
cap = cv2.VideoCapture(input_video_path) | |
# Get video properties | |
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
# Create VideoWriter object to save the normalized video | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Specify the codec (e.g., 'mp4v', 'XVID', 'MPEG') | |
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) | |
# Iterate through each frame in the video | |
for _ in range(frame_count): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Convert frame to floating point | |
frame = frame.astype(np.float32) | |
# Normalize pixel values to the range [0, 1] | |
frame /= 255.0 | |
# Convert normalized frame back to 8-bit unsigned integer | |
frame = (frame * 255.0).astype(np.uint8) | |
# Write the normalized frame to the output video file | |
out.write(frame) | |
# Release the VideoCapture and VideoWriter objects | |
cap.release() | |
out.release() | |
print(f"NORMALIZE DONE!") | |
return output_video_path | |
def make_nearest_multiple_of_32(number): | |
remainder = number % 32 | |
if remainder <= 16: | |
number -= remainder | |
else: | |
number += 32 - remainder | |
return number | |
def run_inference(prompt, video_path, condition, video_length, seed, steps): | |
seed = math.floor(seed) | |
o_width = get_video_dimension(video_path)[0] | |
o_height = get_video_dimension(video_path)[1] | |
# Prepare dimensions | |
if o_width > 512 : | |
# Calculate the new height while maintaining the aspect ratio | |
n_height = int(o_height / o_width * 512) | |
n_width = 512 | |
# Get FPS of original video input | |
target_fps = get_video_dimension(video_path)[2] | |
if target_fps > 12 : | |
print(f"FPS is too high") | |
target_fps = 12 | |
print(f"INPUT FPS: {target_fps}") | |
# Count total frames according to fps | |
total_frames = get_video_dimension(video_path)[3] | |
# Resize the video | |
r_width = make_nearest_multiple_of_32(n_width) | |
r_height = make_nearest_multiple_of_32(n_height) | |
print(f"multiple of 32 sizes : {r_width}x{r_height}") | |
# Check if the file already exists | |
if os.path.exists('resized.mp4'): | |
# Delete the existing file | |
os.remove('resized.mp4') | |
resized = resize_video(video_path, 'resized.mp4', r_width, r_height, target_fps) | |
# normalize pixels | |
#normalized = normalize_and_save_video(resized, 'normalized.mp4') | |
output_path = 'output/' | |
os.makedirs(output_path, exist_ok=True) | |
# Check if the file already exists | |
if os.path.exists(os.path.join(output_path, f"result.mp4")): | |
# Delete the existing file | |
os.remove(os.path.join(output_path, f"result.mp4")) | |
print(f"RUNNING INFERENCE ...") | |
if video_length > 12: | |
command = f"python inference.py --prompt '{prompt}' --inference_steps {steps} --condition '{condition}' --video_path '{resized}' --output_path '{output_path}' --temp_chunk_path 'result' --width {r_width} --height {r_height} --fps {target_fps} --seed {seed} --video_length {video_length} --smoother_steps 19 20 --is_long_video" | |
else: | |
command = f"python inference.py --prompt '{prompt}' --inference_steps {steps} --condition '{condition}' --video_path '{resized}' --output_path '{output_path}' --temp_chunk_path 'result' --width {r_width} --height {r_height} --fps {target_fps} --seed {seed} --video_length {video_length} --smoother_steps 19 20" | |
try: | |
subprocess.run(command, shell=True) | |
except cuda.Error as e: | |
return f"CUDA Error: {e}", None | |
except RuntimeError as e: | |
return f"Runtime Error: {e}", None | |
# Construct the video path | |
video_path_output = os.path.join(output_path, f"result.mp4") | |
# Resize to original video input size | |
#o_width = get_video_dimension(video_path)[0] | |
#o_height = get_video_dimension(video_path)[1] | |
#resize_video(video_path_output, 'resized_final.mp4', o_width, o_height, target_fps) | |
print(f"FINISHED !") | |
return "done", video_path_output | |
css=""" | |
#col-container {max-width: 810px; margin-left: auto; margin-right: auto;} | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown(""" | |
<h1 style="text-align: center;">ControlVideo</h1> | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
#video_in = gr.Video(source="upload", type="filepath", visible=True) | |
video_path = gr.Video(source="upload", type="filepath", visible=True) | |
prompt = gr.Textbox(label="prompt") | |
with gr.Column(): | |
video_length = gr.Slider(label="Video length", info="How many frames do you want to process ? For demo purpose, max is set to 24", minimum=1, maximum=12, step=1, value=2) | |
with gr.Row(): | |
condition = gr.Dropdown(label="Condition", choices=["depth", "canny", "pose"], value="depth") | |
seed = gr.Number(label="seed", value=42) | |
inference_steps = gr.Slider(label="Inference steps", minimum=25, maximum=50, step=1, value=25) | |
submit_btn = gr.Button("Submit") | |
with gr.Column(): | |
video_res = gr.Video(label="result") | |
status = gr.Textbox(label="result") | |
video_path.change(fn=get_frame_count, | |
inputs=[video_path], | |
outputs=[video_length], | |
queue=False | |
) | |
submit_btn.click(fn=run_inference, | |
inputs=[prompt, | |
video_path, | |
condition, | |
video_length, | |
seed, | |
inference_steps | |
], | |
outputs=[status, video_res]) | |
demo.queue(max_size=12).launch() |