EPiC-fps / gradio_app.py
Muhammad Taqi Raza
push
c50efbd
# import os
# import subprocess
# from datetime import datetime
# from pathlib import Path
# import gradio as gr
# # -----------------------------
# # Setup paths and env
# # -----------------------------
# HF_HOME = "/app/hf_cache"
# os.environ["HF_HOME"] = HF_HOME
# os.environ["TRANSFORMERS_CACHE"] = HF_HOME
# os.makedirs(HF_HOME, exist_ok=True)
# PRETRAINED_DIR = "/app/pretrained"
# os.makedirs(PRETRAINED_DIR, exist_ok=True)
# # -----------------------------
# # Step 1: Optional Model Download
# # -----------------------------
# def download_models():
# expected_model = os.path.join(PRETRAINED_DIR, "RAFT/raft-things.pth")
# if not Path(expected_model).exists():
# print("⚙️ Downloading pretrained models...")
# try:
# subprocess.check_call(["bash", "download/download_models.sh"])
# print("✅ Models downloaded.")
# except subprocess.CalledProcessError as e:
# print(f"❌ Model download failed: {e}")
# else:
# print("✅ Pretrained models already exist.")
# download_models()
# # -----------------------------
# # Step 2: Inference Logic
# # -----------------------------
# def run_epic_inference(video_path, caption, motion_type):
# temp_input_path = "/app/temp_input.mp4"
# output_dir = f"/app/output_anchor"
# video_output_path = f"{output_dir}/masked_videos/output.mp4"
# traj_name = motion_type
# traj_txt = f"/app/inference/v2v_data/test/trajs/{traj_name}.txt"
# # Save uploaded video
# if video_path:
# os.system(f"cp '{video_path}' {temp_input_path}")
# command = [
# "python", "/app/inference/v2v_data/inference.py",
# "--video_path", temp_input_path,
# "--stride", "1",
# "--out_dir", output_dir,
# "--radius_scale", "1",
# "--camera", "target",
# "--mask",
# "--target_pose", "0", "30", "-0.6", "0", "0",
# "--traj_txt", traj_txt,
# "--save_name", "output",
# "--mode", "gradual",
# ]
# # Run inference command
# try:
# result = subprocess.run(command, capture_output=True, text=True, check=True)
# print("Getting Anchor Videos run successfully.")
# logs = result.stdout
# except subprocess.CalledProcessError as e:
# logs = f"❌ Inference failed:\n{e.stderr}"
# return logs, None
# # Locate the output video
# if video_output_path:
# return logs, str(video_output_path)
# else:
# return f"Inference succeeded but no output video found in {output_dir}", None
# def print_output_directory(out_dir):
# result = ""
# for root, dirs, files in os.walk(out_dir):
# level = root.replace(out_dir, '').count(os.sep)
# indent = ' ' * 4 * level
# result += f"{indent}{os.path.basename(root)}/"
# sub_indent = ' ' * 4 * (level + 1)
# for f in files:
# result += f"{sub_indent}{f}\n"
# return result
# def inference(video_path, caption, motion_type):
# logs, video_masked = run_epic_inference(video_path, caption, motion_type)
# MODEL_PATH="/app/pretrained/CogVideoX-5b-I2V"
# ckpt_steps=500
# ckpt_dir="/app/out/EPiC_pretrained"
# ckpt_file=f"checkpoint-{ckpt_steps}.pt"
# ckpt_path=f"{ckpt_dir}/{ckpt_file}"
# video_root_dir= f"/app/output_anchor"
# out_dir=f"/app/output"
# command = [
# "python", "/app/inference/cli_demo_camera_i2v_pcd.py",
# "--video_root_dir", video_root_dir,
# "--base_model_path", MODEL_PATH,
# "--controlnet_model_path", ckpt_path,
# "--output_path", out_dir,
# "--start_camera_idx", "0",
# "--end_camera_idx", "8",
# "--controlnet_weights", "1.0",
# "--controlnet_guidance_start", "0.0",
# "--controlnet_guidance_end", "0.4",
# "--controlnet_input_channels", "3",
# "--controlnet_transformer_num_attn_heads", "4",
# "--controlnet_transformer_attention_head_dim", "64",
# "--controlnet_transformer_out_proj_dim_factor", "64",
# "--controlnet_transformer_out_proj_dim_zero_init",
# "--vae_channels", "16",
# "--num_frames", "49",
# "--controlnet_transformer_num_layers", "8",
# "--infer_with_mask",
# "--pool_style", "max",
# "--seed", "43"
# ]
# # Run the command
# result = subprocess.run(command, capture_output=True, text=True)
# if result.returncode == 0:
# print("Inference completed successfully.")
# else:
# print(f"Error occurred during inference: {result.stderr}")
# # Print output directory contents
# logs = result.stdout
# result = print_output_directory(out_dir)
# return logs+result, str(f"{out_dir}/00000_43_out.mp4")
# # output 43
# # output/ 00000_43_out.mp4
# # 00000_43_reference.mp4
# # 00000_43_out_reference.mp4
# # -----------------------------
# # Step 3: Create Gradio UI
# # -----------------------------
# demo = gr.Interface(
# fn=inference,
# inputs=[
# gr.Video(label="Upload Video (MP4)"),
# gr.Textbox(label="Caption", placeholder="e.g., Amalfi coast with boats"),
# gr.Dropdown(
# choices=["zoom_in", "rotate", "orbit", "pan", "loop1"],
# label="Camera Motion Type",
# value="zoom_in",
# ),
# ],
# outputs=[gr.Textbox(label="Inference Logs"), gr.Video(label="Generated Video")],
# title="🎬 EPiC: Efficient Video Camera Control",
# description="Upload a video, describe the scene, and apply cinematic camera motion using pretrained EPiC models.",
# )
# # -----------------------------
# # Step 4: Launch App
# # -----------------------------
# if __name__ == "__main__":
# demo.launch(server_name="0.0.0.0", server_port=7860)
import os
import subprocess
from datetime import datetime
from pathlib import Path
import gradio as gr
# -----------------------------
# Setup paths and env
# -----------------------------
HF_HOME = "/app/hf_cache"
os.environ["HF_HOME"] = HF_HOME
os.environ["TRANSFORMERS_CACHE"] = HF_HOME
os.makedirs(HF_HOME, exist_ok=True)
PRETRAINED_DIR = "/app/pretrained"
os.makedirs(PRETRAINED_DIR, exist_ok=True)
# -----------------------------
# Step 1: Optional Model Download
# -----------------------------
def download_models():
expected_model = os.path.join(PRETRAINED_DIR, "RAFT/raft-things.pth")
if not Path(expected_model).exists():
print("⚙️ Downloading pretrained models...")
try:
subprocess.check_call(["bash", "download/download_models.sh"])
print("✅ Models downloaded.")
except subprocess.CalledProcessError as e:
print(f"❌ Model download failed: {e}")
else:
print("✅ Pretrained models already exist.")
download_models()
# -----------------------------
# Step 2: Inference Logic
# -----------------------------
def run_epic_inference(video_path, num_frames, target_pose, mode):
temp_input_path = "/app/temp_input.mp4"
output_dir = "/app/output_anchor"
video_output_path = f"{output_dir}/masked_videos/output.mp4"
# Save uploaded video
if video_path:
os.system(f"cp '{video_path}' {temp_input_path}")
try:
theta, phi, r, x, y = target_pose.strip().split()
except ValueError:
return f"❌ Invalid target pose format. Use: θ φ r x y", None
logs = f"Running inference with target pose: θ={theta}, φ={phi}, r={r}, x={x}, y={y}\n"
command = [
"python", "/app/inference/v2v_data/inference.py",
"--video_path", temp_input_path,
"--stride", "1",
"--out_dir", output_dir,
"--radius_scale", "1",
"--camera", "target",
"--mask",
"--target_pose", theta, phi, r, x, y,
"--video_length", str(num_frames),
"--save_name", "output",
"--mode", mode,
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
logs += result.stdout
except subprocess.CalledProcessError as e:
logs += f"❌ Inference failed:\n{e.stderr}"
return logs, None
return logs, str(video_output_path) if os.path.exists(video_output_path) else (logs, None)
def print_output_directory(out_dir):
result = ""
for root, dirs, files in os.walk(out_dir):
level = root.replace(out_dir, '').count(os.sep)
indent = ' ' * 4 * level
result += f"{indent}{os.path.basename(root)}/\n"
sub_indent = ' ' * 4 * (level + 1)
for f in files:
result += f"{sub_indent}{f}\n"
return result
def inference(video_path, num_frames, fps, target_pose, mode):
logs, video_masked = run_epic_inference(video_path, num_frames, target_pose, mode)
result_dir = print_output_directory("/app/output_anchor")
MODEL_PATH = "/app/pretrained/CogVideoX-5b-I2V"
ckpt_steps = 500
ckpt_dir = "/app/out/EPiC_pretrained"
ckpt_file = f"checkpoint-{ckpt_steps}.pt"
ckpt_path = f"{ckpt_dir}/{ckpt_file}"
video_root_dir = "/app/output_anchor"
out_dir = "/app/output"
command = [
"python", "/app/inference/cli_demo_camera_i2v_pcd.py",
"--video_root_dir", video_root_dir,
"--base_model_path", MODEL_PATH,
"--controlnet_model_path", ckpt_path,
"--output_path", out_dir,
"--start_camera_idx", "0",
"--end_camera_idx", "8",
"--controlnet_weights", "1.0",
"--controlnet_guidance_start", "0.0",
"--controlnet_guidance_end", "0.4",
"--controlnet_input_channels", "3",
"--controlnet_transformer_num_attn_heads", "4",
"--controlnet_transformer_attention_head_dim", "64",
"--controlnet_transformer_out_proj_dim_factor", "64",
"--controlnet_transformer_out_proj_dim_zero_init",
"--vae_channels", "16",
"--num_frames", str(num_frames),
"--controlnet_transformer_num_layers", "8",
"--infer_with_mask",
"--pool_style", "max",
"--seed", "43"
]
result = subprocess.run(command, capture_output=True, text=True)
logs += "\n" + result.stdout
result_dir = print_output_directory(out_dir)
if result.returncode == 0:
logs += "Inference completed successfully."
else:
logs += f"Error occurred during inference: {result.stderr}"
return logs + result_dir + "Hello! it is successful", str(f"{out_dir}/00000_43_out.mp4")
# -----------------------------
# Step 3: Create Gradio UI
# -----------------------------
demo = gr.Interface(
fn=inference,
inputs=[
gr.Video(label="Upload Video (MP4)"),
gr.Slider(minimum=1, maximum=120, value=50, step=1, label="Number of Frames"),
gr.Slider(minimum=1, maximum=90, value=10, step=1, label="FPS"),
gr.Textbox(label="Target Pose (θ φ r x y)", placeholder="e.g., 0 30 -0.6 0 0"),
gr.Dropdown(choices=["gradual", "direct", "bullet"], value="gradual", label="Camera Mode"),
],
outputs=[
gr.Textbox(label="Inference Logs"),
gr.Video(label="Generated Video")
],
title="🎬 EPiC: Efficient Video Camera Control",
description="Upload a video, describe the scene, and apply cinematic camera motion using pretrained EPiC models.",
)
# -----------------------------
# Step 4: Launch App
# -----------------------------
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)