aastha-malik's picture
Upload-only: remove YouTube (DNS blocked on HF free tier), show clear message
337462b
import os, sys, shutil, types, subprocess
import numpy as np
import cv2
import gradio as gr
# ── Paths ────────────────────────────────────────────────────────────
MODEL_DIR = "/tmp/models"
WORK_DIR = "/tmp/workspace"
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(f"{WORK_DIR}/temp", exist_ok=True)
os.makedirs(f"{WORK_DIR}/outputs", exist_ok=True)
# ── Model download ───────────────────────────────────────────────────
INSWAPPER_PATH = f"{MODEL_DIR}/inswapper_128.onnx"
def download_models():
from huggingface_hub import hf_hub_download
if not os.path.exists(INSWAPPER_PATH):
print("Downloading inswapper_128.onnx ...")
hf_hub_download(
repo_id="ezioruan/inswapper_128.onnx",
filename="inswapper_128.onnx",
local_dir=MODEL_DIR,
)
print("inswapper ready.")
download_models()
# ── Load models ──────────────────────────────────────────────────────
import insightface
from insightface.app import FaceAnalysis
import onnxruntime as ort
PROVIDERS = (
["CUDAExecutionProvider", "CPUExecutionProvider"]
if "CUDAExecutionProvider" in ort.get_available_providers()
else ["CPUExecutionProvider"]
)
print(f"Using providers: {PROVIDERS}")
face_app = FaceAnalysis(name="buffalo_l", providers=PROVIDERS)
face_app.prepare(ctx_id=0, det_size=(640, 640))
swapper = insightface.model_zoo.get_model(INSWAPPER_PATH, providers=PROVIDERS)
print("Models loaded.")
def to_h264(src: str, dst: str):
subprocess.run(
["ffmpeg", "-y", "-i", src,
"-vcodec", "libx264", "-acodec", "aac", "-preset", "fast",
dst, "-loglevel", "error"],
check=True,
)
# ── Core processing ──────────────────────────────────────────────────
def process(face_image, video_file, trim_seconds, progress=gr.Progress(track_tqdm=True)):
if face_image is None:
return None, "Please upload a source face image."
if video_file is None:
return None, "Please upload a video file."
try:
progress(0.0, desc="Detecting source face...")
# Source face
source_img = cv2.imread(face_image)
source_faces = face_app.get(source_img)
if not source_faces:
source_img_r = cv2.resize(source_img, (640, 640))
source_faces = face_app.get(source_img_r)
if not source_faces:
return None, "No face detected β€” use a clear, front-facing photo."
source_face = sorted(
source_faces,
key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]),
reverse=True,
)[0]
source_face.embedding /= np.linalg.norm(source_face.embedding)
# Prepare video
progress(0.05, desc="Preparing video...")
raw_video = f"{WORK_DIR}/temp/input.mp4"
converted = f"{WORK_DIR}/temp/input_h264.mp4"
shutil.copy(video_file, raw_video)
to_h264(raw_video, converted)
# Verify codec
cap_check = cv2.VideoCapture(converted)
ok, _ = cap_check.read()
cap_check.release()
if not ok:
return None, "Could not read the video β€” try a different file format."
# Trim
input_video = converted
if trim_seconds and int(trim_seconds) > 0:
trimmed = f"{WORK_DIR}/temp/input_trimmed.mp4"
subprocess.run(
["ffmpeg", "-y", "-i", converted,
"-t", str(int(trim_seconds)),
"-c:v", "libx264", "-c:a", "aac",
trimmed, "-loglevel", "error"],
check=True,
)
input_video = trimmed
# Video info
cap = cv2.VideoCapture(input_video)
fps = cap.get(cv2.CAP_PROP_FPS)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Frame pipeline
temp_out = f"{WORK_DIR}/temp/no_audio.mp4"
final_out = f"{WORK_DIR}/outputs/face_swapped.mp4"
writer = cv2.VideoWriter(
temp_out, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)
)
for i in range(total):
ret, frame = cap.read()
if not ret:
break
progress(0.1 + 0.8 * (i / total), desc=f"Frame {i+1}/{total}")
faces = face_app.get(frame)
result = frame.copy()
for face in faces:
result = swapper.get(result, face, source_face, paste_back=True)
writer.write(result)
cap.release()
writer.release()
# Merge audio
progress(0.92, desc="Merging audio...")
subprocess.run(
["ffmpeg", "-y",
"-i", temp_out, "-i", input_video,
"-map", "0:v:0", "-map", "1:a:0",
"-c:v", "copy", "-c:a", "aac", "-shortest",
final_out, "-loglevel", "error"],
)
if not os.path.exists(final_out):
shutil.copy(temp_out, final_out)
progress(1.0, desc="Done!")
size = os.path.getsize(final_out) / (1024 * 1024)
return final_out, f"Done! {total} frames | {size:.1f} MB"
except Exception as e:
return None, f"Error: {e}"
# ── Gradio UI ────────────────────────────────────────────────────────
with gr.Blocks(title="Face Fusion") as demo:
gr.Markdown("""
# 🎭 Face Fusion β€” AI Video Face Swap
Swap any face into a video using **InsightFace + inswapper_128**.
> **Note:** Runs on CPU β€” ~1–3 min per 10 seconds of video. For GPU speed, run the notebook on Kaggle.
""")
with gr.Row():
with gr.Column():
face_input = gr.Image(
label="Source Face Photo",
type="filepath",
height=220,
)
gr.Markdown("> ⚠️ **YouTube URLs don't work on HF free Spaces** (DNS blocked). Download your video locally first, then upload it below.")
video_input = gr.Video(label="Upload Video File")
trim_input = gr.Slider(
label="Trim to first N seconds (0 = full video)",
minimum=0, maximum=60, step=5, value=10,
)
run_btn = gr.Button("Run Face Swap", variant="primary", size="lg")
with gr.Column():
status_box = gr.Textbox(label="Status", interactive=False, lines=2)
video_out = gr.Video(label="Output Video", height=400)
gr.Markdown("""
---
**Tips for best results**
- Clear, front-facing photo β€” no sunglasses or heavy shadows
- Keep video under 15 seconds for reasonable CPU processing time
- Single-face videos give the cleanest swap
""")
run_btn.click(
fn=process,
inputs=[face_input, video_input, trim_input],
outputs=[video_out, status_box],
)
demo.launch()