Spaces:
Paused
Paused
""" | |
🎭 Advanced Face Swap Studio - HuggingFace Spaces Optimized | |
========================================================= | |
✅ FEATURES: | |
- Professional face swapping with GPU acceleration | |
- Batch processing for multiple videos | |
- Real-time processing monitor | |
- Lip sync integration (beta) | |
- Enhanced face detection and analysis | |
🚀 Optimized exclusively for HuggingFace Spaces environment | |
""" | |
import os | |
import sys | |
import tempfile | |
import time | |
import shutil | |
import subprocess as sp | |
import uuid | |
import zipfile | |
import gc | |
from pathlib import Path | |
# Set up environment for HuggingFace Spaces | |
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "TRUE" | |
os.environ["PYTHONPATH"] = "." | |
# Core imports | |
import gradio as gr | |
import torch | |
# Optional imports with graceful degradation | |
try: | |
import onnxruntime as ort | |
print("✅ ONNX Runtime loaded successfully") | |
except ImportError as e: | |
print(f"⚠️ ONNX Runtime not available: {e}") | |
try: | |
from moviepy.editor import VideoFileClip | |
MOVIEPY_AVAILABLE = True | |
print("✅ MoviePy loaded successfully") | |
except ImportError as e: | |
print(f"⚠️ MoviePy not available: {e}") | |
MOVIEPY_AVAILABLE = False | |
# Try to import enhancement modules - make this more robust | |
ENHANCEMENT_AVAILABLE = False | |
try: | |
import importlib.util | |
# Check if the modules exist | |
face_enhancer_path = Path("SwitcherAI/processors/frame/modules/face_enhancer.py") | |
frame_enhancer_path = Path("SwitcherAI/processors/frame/modules/frame_enhancer.py") | |
if face_enhancer_path.exists() and frame_enhancer_path.exists(): | |
sys.path.insert(0, str(Path("SwitcherAI/processors/frame/modules").resolve())) | |
import face_enhancer | |
import frame_enhancer | |
ENHANCEMENT_AVAILABLE = True | |
print("✅ Enhancement modules loaded successfully") | |
else: | |
print("⚠️ Enhancement module files not found") | |
except Exception as e: | |
print(f"⚠️ Enhancement modules not available: {e}") | |
# Directory setup for HuggingFace Spaces | |
BASE_DIR = Path(__file__).parent.resolve() | |
TEMP_DIR = BASE_DIR / "temp_workspace" | |
OUTPUT_DIR = BASE_DIR / "outputs" | |
CONVERT_DIR = BASE_DIR / "Convert" | |
ASSETS_DIR = BASE_DIR / ".assets" / "models" | |
# Create directories with better error handling | |
for directory in [TEMP_DIR, OUTPUT_DIR, CONVERT_DIR, ASSETS_DIR]: | |
try: | |
directory.mkdir(parents=True, exist_ok=True) | |
print(f"📁 Directory ready: {directory}") | |
except Exception as e: | |
print(f"⚠️ Failed to create directory {directory}: {e}") | |
print(f"📁 Base directory: {BASE_DIR}") | |
print(f"📂 Temp directory: {TEMP_DIR}") | |
print(f"📤 Output directory: {OUTPUT_DIR}") | |
print(f"🎯 Assets directory: {ASSETS_DIR}") | |
print(f"📁 Convert directory: {CONVERT_DIR}") | |
# Try to set up SwitcherAI temp directory | |
try: | |
sys.path.insert(0, str(BASE_DIR)) | |
from SwitcherAI.utilities import conditional_download | |
# Set up temp directory for SwitcherAI | |
temp_switcher_dir = TEMP_DIR / "switcher_temp" | |
temp_switcher_dir.mkdir(exist_ok=True) | |
# Set environment variable for temp directory | |
os.environ['SWITCHER_TEMP_DIR'] = str(temp_switcher_dir) | |
print("🔧 SwitcherAI utilities loaded successfully") | |
except ImportError as e: | |
print(f"⚠️ Could not import SwitcherAI utilities: {e}") | |
print("🔄 Using default temp directory behavior") | |
# Download required model files with better error handling | |
def download_required_models(): | |
"""Download required model files if not present""" | |
import urllib.request | |
import urllib.error | |
models_to_download = [ | |
{ | |
'name': 'inswapper_128_fp16.onnx', | |
'url': 'https://huggingface.co/ninjawick/webui-faceswap-unlocked/resolve/main/inswapper_128_fp16.onnx', | |
'path': ASSETS_DIR / 'inswapper_128_fp16.onnx', | |
'description': 'InSwapper FP16 face swap model' | |
}, | |
{ | |
'name': 'inswapper_128.onnx', | |
'url': 'https://huggingface.co/xingren23/comfyflow-models/resolve/main/insightface/inswapper_128.onnx', | |
'path': ASSETS_DIR / 'inswapper_128.onnx', | |
'description': 'InSwapper face swap model' | |
}, | |
{ | |
'name': 'GFPGANv1.4.pth', | |
'url': 'https://huggingface.co/gmk123/GFPGAN/resolve/main/GFPGANv1.4.pth', | |
'path': ASSETS_DIR / 'GFPGANv1.4.pth', | |
'description': 'GFPGAN face enhancement model' | |
} | |
] | |
for model in models_to_download: | |
model_path = model['path'] | |
model_url = model['url'] | |
model_name = model['name'] | |
try: | |
if model_path.exists() and model_path.stat().st_size > 1024: # Check if file exists and is > 1KB | |
file_size = model_path.stat().st_size / (1024 * 1024) # MB | |
print(f"✅ {model_name} already exists ({file_size:.1f}MB)") | |
continue | |
except Exception as e: | |
print(f"⚠️ Error checking {model_name}: {e}") | |
try: | |
print(f"📥 Downloading {model_name}...") | |
print(f" Description: {model['description']}") | |
print(f" URL: {model_url}") | |
print(f" Path: {model_path}") | |
# Ensure parent directory exists | |
model_path.parent.mkdir(parents=True, exist_ok=True) | |
# Create a progress callback | |
def progress_callback(block_num, block_size, total_size): | |
if total_size > 0: | |
percent = min(100, (block_num * block_size * 100) / total_size) | |
if block_num % 200 == 0: # Update every 200 blocks to avoid spam | |
print(f" Progress: {percent:.1f}%") | |
# Download with progress and proper headers for HuggingFace | |
req = urllib.request.Request(model_url) | |
req.add_header('User-Agent', 'Mozilla/5.0 (compatible; FaceSwapStudio/1.0)') | |
with urllib.request.urlopen(req) as response: | |
total_size = int(response.headers.get('Content-Length', 0)) | |
downloaded = 0 | |
with open(model_path, 'wb') as f: | |
while True: | |
chunk = response.read(8192) | |
if not chunk: | |
break | |
f.write(chunk) | |
downloaded += len(chunk) | |
if total_size > 0 and downloaded % (8192 * 100) == 0: # Progress every ~800KB | |
percent = (downloaded * 100) / total_size | |
print(f" Progress: {percent:.1f}%") | |
# Verify download | |
if model_path.exists() and model_path.stat().st_size > 1024: | |
file_size = model_path.stat().st_size / (1024 * 1024) # MB | |
print(f"✅ {model_name} downloaded successfully ({file_size:.1f}MB)") | |
else: | |
print(f"❌ {model_name} download failed - file not created or too small") | |
# Clean up failed download | |
if model_path.exists(): | |
model_path.unlink() | |
except urllib.error.URLError as e: | |
print(f"❌ Network error downloading {model_name}: {e}") | |
except Exception as e: | |
print(f"❌ Error downloading {model_name}: {e}") | |
# Download models at startup - BEFORE web interface | |
print("\n🔄 Checking required model files...") | |
try: | |
download_required_models() | |
print("✅ Model check complete\n") | |
except Exception as e: | |
print(f"⚠️ Model download failed: {e}\n") | |
# Global variables | |
current_process = None | |
last_output_path = None | |
last_batch_mode = False | |
def get_available_gpus(): | |
"""Get list of available CUDA devices for HuggingFace Spaces""" | |
print("🔍 Detecting GPU devices...") | |
available_gpus = [] | |
if not torch.cuda.is_available(): | |
print("❌ CUDA not available") | |
return ["CPU Only"] | |
try: | |
device_count = torch.cuda.device_count() | |
print(f"🔢 CUDA devices detected: {device_count}") | |
for i in range(device_count): | |
try: | |
props = torch.cuda.get_device_properties(i) | |
gpu_name = props.name | |
gpu_memory = props.total_memory / (1024**3) # GB | |
# Test device accessibility | |
torch.cuda.set_device(i) | |
test_tensor = torch.tensor([1.0], device=f'cuda:{i}') | |
gpu_entry = f"GPU {i}: {gpu_name} ({gpu_memory:.1f}GB)" | |
available_gpus.append(gpu_entry) | |
print(f"✅ {gpu_entry}") | |
del test_tensor | |
torch.cuda.empty_cache() | |
except Exception as e: | |
print(f"❌ Error with GPU {i}: {e}") | |
available_gpus.append(f"GPU {i}: Error") | |
except Exception as e: | |
print(f"❌ GPU detection failed: {e}") | |
available_gpus.append("CPU Only") | |
return available_gpus | |
def set_gpu_device(gpu_selection): | |
"""Set CUDA device based on selection""" | |
try: | |
if gpu_selection.startswith("GPU") and "Error" not in gpu_selection: | |
gpu_id = gpu_selection.split(":")[0].split(" ")[1] | |
os.environ["CUDA_VISIBLE_DEVICES"] = gpu_id | |
print(f"🖥️ Using GPU {gpu_id}") | |
return gpu_id | |
else: | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
print("🖥️ Using CPU mode") | |
return "cpu" | |
except Exception as e: | |
print(f"⚠️ Error setting GPU device: {e}") | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
return "cpu" | |
def safe_copy_file(source, destination): | |
"""Safely copy file with verification""" | |
try: | |
if isinstance(source, str): | |
source = Path(source) | |
if isinstance(destination, str): | |
destination = Path(destination) | |
destination.parent.mkdir(parents=True, exist_ok=True) | |
# Check source file exists and is readable | |
if not source.exists(): | |
print(f"❌ Source file does not exist: {source}") | |
return False | |
if source.stat().st_size == 0: | |
print(f"❌ Source file is empty: {source}") | |
return False | |
shutil.copy2(source, destination) | |
# Verify copy | |
if destination.exists() and destination.stat().st_size > 0: | |
print(f"✅ File copied: {destination.name}") | |
return True | |
else: | |
print(f"❌ Copy verification failed: {destination.name}") | |
return False | |
except Exception as e: | |
print(f"❌ Copy error: {e}") | |
return False | |
def handle_batch_file_upload(files): | |
"""Handle multiple file uploads for batch mode""" | |
if not files: | |
return "📁 No files uploaded" | |
# Clear existing files in convert directory | |
try: | |
for existing_file in CONVERT_DIR.glob("*"): | |
if existing_file.is_file(): | |
existing_file.unlink() | |
except Exception as e: | |
print(f"⚠️ Error cleaning convert directory: {e}") | |
uploaded_count = 0 | |
failed_count = 0 | |
for file in files: | |
try: | |
if file is None: | |
continue | |
# Get the original filename | |
original_name = Path(file.name).name if hasattr(file, 'name') else f"video_{uploaded_count}.mp4" | |
# Copy file to convert directory | |
dest_path = CONVERT_DIR / original_name | |
if safe_copy_file(file, dest_path): | |
file_size = dest_path.stat().st_size / (1024 * 1024) # MB | |
print(f"✅ Uploaded: {original_name} ({file_size:.1f}MB)") | |
uploaded_count += 1 | |
else: | |
print(f"❌ Failed to upload: {original_name}") | |
failed_count += 1 | |
except Exception as e: | |
print(f"❌ Error uploading file: {e}") | |
failed_count += 1 | |
status_msg = f"📦 Batch Upload Complete:\n✅ Uploaded: {uploaded_count} files\n" | |
if failed_count > 0: | |
status_msg += f"❌ Failed: {failed_count} files\n" | |
# List uploaded files | |
try: | |
uploaded_files = [f.name for f in CONVERT_DIR.glob("*.mp4")] + [f.name for f in CONVERT_DIR.glob("*.avi")] + [f.name for f in CONVERT_DIR.glob("*.mov")] | |
if uploaded_files: | |
status_msg += f"📁 Files ready for processing:\n" + "\n".join([f" • {f}" for f in uploaded_files[:10]]) | |
if len(uploaded_files) > 10: | |
status_msg += f"\n ... and {len(uploaded_files) - 10} more" | |
except Exception as e: | |
print(f"⚠️ Error listing files: {e}") | |
return status_msg | |
def resize_video(input_path, output_path, fps=30): | |
"""Resize/process video with fallback""" | |
try: | |
if not MOVIEPY_AVAILABLE: | |
print("⚠️ MoviePy not available - copying video directly") | |
shutil.copy2(input_path, output_path) | |
return True | |
print(f"🎬 Processing video: {input_path.name}") | |
clip = VideoFileClip(str(input_path)) | |
clip.write_videofile(str(output_path), fps=fps, audio_codec='aac', verbose=False, logger=None) | |
clip.close() | |
print("✅ Video processed successfully") | |
return True | |
except Exception as e: | |
print(f"❌ Video processing failed: {e}") | |
try: | |
shutil.copy2(input_path, output_path) | |
return True | |
except Exception as e2: | |
print(f"❌ Fallback copy failed: {e2}") | |
return False | |
def extract_audio(video_path, audio_path): | |
"""Extract audio from video""" | |
try: | |
if not MOVIEPY_AVAILABLE: | |
print("⚠️ MoviePy not available - cannot extract audio") | |
return False | |
clip = VideoFileClip(str(video_path)) | |
if clip.audio is not None: | |
clip.audio.write_audiofile(str(audio_path), logger=None, verbose=False) | |
clip.close() | |
return True | |
else: | |
clip.close() | |
return False | |
except Exception as e: | |
print(f"❌ Audio extraction failed: {e}") | |
return False | |
def cleanup_temp_files(): | |
"""Clean up temporary files""" | |
try: | |
for file in TEMP_DIR.glob("*"): | |
if file.is_file(): | |
file.unlink() | |
print("🧹 Temp files cleaned") | |
except Exception as e: | |
print(f"⚠️ Cleanup error: {e}") | |
def cleanup_convert_files(): | |
"""Clean up convert directory files""" | |
try: | |
for file in CONVERT_DIR.glob("*"): | |
if file.is_file(): | |
file.unlink() | |
print("🧹 Convert directory cleaned") | |
except Exception as e: | |
print(f"⚠️ Convert cleanup error: {e}") | |
def create_batch_zip(): | |
"""Create zip file of all output files""" | |
try: | |
output_files = list(OUTPUT_DIR.glob("*.mp4")) + list(OUTPUT_DIR.glob("*.avi")) | |
if not output_files: | |
return None | |
zip_path = OUTPUT_DIR / f"batch_results_{int(time.time())}.zip" | |
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
for file in output_files: | |
zipf.write(file, file.name) | |
print(f"📦 Added to zip: {file.name}") | |
print(f"✅ Batch zip created: {zip_path.name}") | |
return zip_path | |
except Exception as e: | |
print(f"❌ Zip creation failed: {e}") | |
return None | |
def get_download_file(): | |
"""Get the latest output file for download""" | |
try: | |
output_files = list(OUTPUT_DIR.glob("*.mp4")) + list(OUTPUT_DIR.glob("*.avi")) + list(OUTPUT_DIR.glob("*.zip")) | |
if not output_files: | |
return None, "📁 No output files found" | |
latest_file = max(output_files, key=lambda f: f.stat().st_ctime) | |
file_size = latest_file.stat().st_size / (1024 * 1024) # MB | |
return str(latest_file), f"📥 Ready: {latest_file.name} ({file_size:.1f}MB)" | |
except Exception as e: | |
return None, f"❌ Error: {e}" | |
def run_single_video(source_image, target_video, frame_processor, face_analyser_direction, | |
face_recognition, face_analyser_gender, face_analyser_age, skip_audio, | |
keep_fps, lip_syncer_model, enable_lip_sync, gpu_selection): | |
"""Process single video""" | |
global last_output_path, last_batch_mode, current_process | |
last_batch_mode = False | |
try: | |
set_gpu_device(gpu_selection) | |
# Setup temp files | |
temp_source = TEMP_DIR / 'source-image.jpg' | |
temp_target = TEMP_DIR / 'resize-vid.mp4' | |
# Copy and process files | |
if not safe_copy_file(Path(source_image), temp_source): | |
return "❌ Failed to copy source image", "" | |
if not resize_video(Path(target_video), temp_target): | |
return "❌ Video processing failed", "" | |
# Generate output filename | |
source_name = Path(source_image).stem | |
target_name = Path(target_video).stem | |
suffix = "_lipsynced" if enable_lip_sync else "" | |
output_filename = f"{source_name}_{target_name}{suffix}.mp4" | |
output_path = OUTPUT_DIR / output_filename | |
# Handle lip sync | |
audio_path = None | |
if enable_lip_sync: | |
audio_path = TEMP_DIR / 'target-audio.wav' | |
if not extract_audio(temp_target, audio_path): | |
print("⚠️ Lip sync disabled - audio extraction failed") | |
enable_lip_sync = False | |
# Build command | |
execution_provider = "cuda" if gpu_selection.startswith("GPU") and "Error" not in gpu_selection else "cpu" | |
cmd = [ | |
sys.executable, "run.py", | |
"--execution-providers", execution_provider, | |
"--execution-thread-count", "8", | |
"--reference-face-distance", "1.5", | |
"-s", str(temp_source), | |
"-t", str(temp_target), | |
"-o", str(output_path), | |
"--frame-processors"] + frame_processor + [ | |
"--face-analyser-direction", face_analyser_direction, | |
"--face-analyser-age", face_analyser_age | |
] | |
if enable_lip_sync and audio_path: | |
cmd.extend(["--source-paths", str(audio_path)]) | |
cmd.extend(["--lip-syncer-model", lip_syncer_model]) | |
if 'lip_syncer' not in frame_processor: | |
idx = cmd.index("--frame-processors") + 1 | |
cmd[idx:idx] = ['lip_syncer'] | |
if face_recognition != 'none': | |
cmd.extend(["--face-recognition", face_recognition]) | |
if face_analyser_gender != 'none': | |
cmd.extend(["--face-analyser-gender", face_analyser_gender]) | |
if skip_audio and not enable_lip_sync: | |
cmd.append("--skip-audio") | |
if keep_fps: | |
cmd.append("--keep-fps") | |
print("🚀 Starting face swap processing...") | |
print(f"📋 Command: {' '.join(cmd)}") | |
start_time = time.time() | |
current_process = sp.Popen( | |
cmd, | |
stdout=sp.PIPE, | |
stderr=sp.STDOUT, | |
text=True, | |
bufsize=1, | |
cwd=str(BASE_DIR) | |
) | |
cli_output = "" | |
while True: | |
output = current_process.stdout.readline() | |
if output == '' and current_process.poll() is not None: | |
break | |
if output: | |
line = output.strip() | |
print(line) | |
cli_output += line + "\n" | |
# Keep output manageable | |
lines = cli_output.split('\n') | |
if len(lines) > 50: | |
cli_output = '\n'.join(lines[-50:]) | |
yield None, cli_output | |
rc = current_process.poll() | |
execution_time = time.time() - start_time | |
if rc != 0: | |
return "❌ Processing failed", cli_output + f"\n\n⏱️ Time: {execution_time:.2f}s" | |
# Cleanup | |
try: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
gc.collect() | |
if audio_path and audio_path.exists(): | |
audio_path.unlink() | |
except Exception as e: | |
print(f"⚠️ Cleanup error: {e}") | |
last_output_path = str(output_path) | |
return str(output_path), cli_output + f"\n\n✅ Completed in {execution_time:.2f}s" | |
except Exception as e: | |
return f"❌ Error: {e}", "" | |
def run_batch_processing(source_image, frame_processor, face_analyser_direction, face_recognition, | |
face_analyser_gender, skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, gpu_selection): | |
"""Process all videos in Convert folder""" | |
global last_output_path, last_batch_mode, current_process | |
last_batch_mode = True | |
try: | |
set_gpu_device(gpu_selection) | |
video_extensions = ['*.mp4', '*.avi', '*.mov', '*.mkv'] | |
video_files = [] | |
for ext in video_extensions: | |
video_files.extend(CONVERT_DIR.glob(ext)) | |
if not video_files: | |
yield None, f"📁 No video files found in Convert folder.\nPlease upload videos using the file input above." | |
return | |
temp_source = TEMP_DIR / 'source-image.jpg' | |
if not safe_copy_file(Path(source_image), temp_source): | |
yield None, "❌ Failed to copy source image" | |
return | |
source_name = Path(source_image).stem | |
cli_output = f"📊 Processing {len(video_files)} videos in batch mode\n🎯 Source: {source_name}\n\n" | |
yield None, cli_output | |
successful = 0 | |
failed = 0 | |
for i, video_file in enumerate(video_files, 1): | |
current_output = f"[{i}/{len(video_files)}] 🎬 {video_file.name}\n" | |
cli_output += current_output | |
yield None, cli_output | |
temp_target = TEMP_DIR / 'resize-vid.mp4' | |
if not resize_video(video_file, temp_target): | |
error_msg = f"❌ Video resize failed\n" | |
cli_output += error_msg | |
failed += 1 | |
yield None, cli_output | |
continue | |
suffix = "_lipsynced" if enable_lip_sync else "" | |
output_filename = f"{source_name}_{video_file.stem}{suffix}.mp4" | |
output_path = OUTPUT_DIR / output_filename | |
# Handle lip sync | |
audio_path = None | |
if enable_lip_sync: | |
audio_path = TEMP_DIR / 'target-audio.wav' | |
if not extract_audio(temp_target, audio_path): | |
enable_lip_sync = False | |
# Build command | |
execution_provider = "cuda" if gpu_selection.startswith("GPU") and "Error" not in gpu_selection else "cpu" | |
cmd = [ | |
sys.executable, "run.py", | |
"--execution-providers", execution_provider, | |
"--execution-thread-count", "8", | |
"--reference-face-distance", "1.5", | |
"-s", str(temp_source), | |
"-t", str(temp_target), | |
"-o", str(output_path), | |
"--frame-processors"] + frame_processor + [ | |
"--face-analyser-direction", face_analyser_direction | |
] | |
if enable_lip_sync and audio_path: | |
cmd.extend(["--source-paths", str(audio_path)]) | |
cmd.extend(["--lip-syncer-model", lip_syncer_model]) | |
if 'lip_syncer' not in frame_processor: | |
idx = cmd.index("--frame-processors") + 1 | |
cmd[idx:idx] = ['lip_syncer'] | |
if face_recognition != 'none': | |
cmd.extend(["--face-recognition", face_recognition]) | |
if face_analyser_gender != 'none': | |
cmd.extend(["--face-analyser-gender", face_analyser_gender]) | |
if skip_audio and not enable_lip_sync: | |
cmd.append("--skip-audio") | |
if keep_fps: | |
cmd.append("--keep-fps") | |
try: | |
start_time = time.time() | |
current_process = sp.Popen( | |
cmd, | |
stdout=sp.PIPE, | |
stderr=sp.STDOUT, | |
text=True, | |
bufsize=1, | |
cwd=str(BASE_DIR) | |
) | |
while True: | |
output = current_process.stdout.readline() | |
if output == '' and current_process.poll() is not None: | |
break | |
if output: | |
line = output.strip() | |
print(line) | |
rc = current_process.poll() | |
execution_time = time.time() - start_time | |
if rc == 0: | |
success_msg = f"✅ Completed in {execution_time:.2f}s\n\n" | |
cli_output += success_msg | |
successful += 1 | |
else: | |
error_msg = f"❌ Processing failed\n\n" | |
cli_output += error_msg | |
failed += 1 | |
yield None, cli_output | |
# Cleanup | |
try: | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
gc.collect() | |
if audio_path and audio_path.exists(): | |
audio_path.unlink() | |
except Exception as e: | |
print(f"⚠️ Cleanup error: {e}") | |
except Exception as e: | |
error_msg = f"❌ Error: {e}\n\n" | |
cli_output += error_msg | |
failed += 1 | |
yield None, cli_output | |
# Final summary | |
final_msg = f"\n=== BATCH COMPLETE ===\n✅ Successful: {successful}\n❌ Failed: {failed}\n" | |
cli_output += final_msg | |
if successful > 0: | |
last_output_path = str(create_batch_zip()) | |
yield None, cli_output | |
except Exception as e: | |
yield None, f"❌ Batch processing error: {e}" | |
def handle_processing(source_image, target_video, frame_processor, face_analyser_direction, face_recognition, | |
face_analyser_gender, face_analyser_age, skip_audio, keep_fps, | |
lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection): | |
"""Main processing handler""" | |
try: | |
if use_folder_mode: | |
for _, cli_output in run_batch_processing( | |
source_image, frame_processor, face_analyser_direction, face_recognition, | |
face_analyser_gender, skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, gpu_selection | |
): | |
yield cli_output, "⏹️ CANCEL" | |
yield cli_output + "\n🎉 Batch processing complete!", "📥 DOWNLOAD" | |
else: | |
for video_result, cli_output in run_single_video( | |
source_image, target_video, frame_processor, face_analyser_direction, face_recognition, | |
face_analyser_gender, face_analyser_age, skip_audio, keep_fps, | |
lip_syncer_model, enable_lip_sync, gpu_selection | |
): | |
yield cli_output, "⏹️ CANCEL" | |
if video_result and not video_result.startswith("❌"): | |
yield cli_output + "\n🎉 Processing complete!", "📥 DOWNLOAD" | |
else: | |
yield cli_output, "🔄 RESET" | |
except Exception as e: | |
yield f"❌ Processing error: {e}", "🔄 RESET" | |
def cancel_processing(): | |
"""Cancel current processing""" | |
global current_process | |
try: | |
if current_process and current_process.poll() is None: | |
current_process.terminate() | |
current_process.wait(timeout=10) | |
return "⏹️ Processing cancelled" | |
else: | |
return "⚠️ No active processing" | |
except Exception as e: | |
try: | |
if current_process: | |
current_process.kill() | |
current_process.wait() | |
return f"⏹️ Processing force-cancelled: {e}" | |
except: | |
return f"❌ Cancel failed: {e}" | |
def reset_interface(): | |
"""Reset interface to defaults""" | |
try: | |
cleanup_temp_files() | |
cleanup_convert_files() | |
return ( | |
None, # source_image | |
None, # target_video | |
['face_swapper'] + (['face_enhancer'] if ENHANCEMENT_AVAILABLE else []), # frame_processor | |
'top-bottom', # face_analyser_direction | |
'reference', # face_recognition | |
'female', # face_analyser_gender | |
'adult', # face_analyser_age | |
False, # skip_audio | |
True, # keep_fps | |
'wav2lip_gan_96', # lip_syncer_model | |
False, # enable_lip_sync | |
False, # use_folder_mode | |
AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only", # gpu_selection | |
"🔧 Interface reset. Ready for new session!", # cli_output | |
"🚀 START PROCESSING" # button text | |
) | |
except Exception as e: | |
return (None, None, ['face_swapper'], 'top-bottom', 'reference', 'female', 'adult', | |
False, True, 'wav2lip_gan_96', False, False, "CPU Only", | |
f"⚠️ Reset error: {e}", "🚀 START PROCESSING") | |
def handle_download(): | |
"""Handle download button click""" | |
try: | |
download_path, status = get_download_file() | |
if download_path: | |
return download_path, status, gr.update(visible=True), gr.update(visible=False) | |
else: | |
return None, status, gr.update(visible=False), gr.update(visible=True) | |
except Exception as e: | |
return None, f"❌ Download error: {e}", gr.update(visible=False), gr.update(visible=True) | |
def handle_action_button(button_text, *inputs): | |
"""Handle multi-purpose action button""" | |
try: | |
if "RESET" in button_text: | |
return reset_interface() | |
elif "CANCEL" in button_text: | |
cancel_msg = cancel_processing() | |
return inputs + (cancel_msg, "🔄 RESET") | |
else: | |
return inputs + ("", button_text) | |
except Exception as e: | |
return inputs + (f"❌ Action error: {e}", "🔄 RESET") | |
def toggle_batch_mode(use_folder_mode): | |
"""Handle batch mode toggle""" | |
try: | |
if use_folder_mode: | |
return gr.update( | |
label="📁 Target Videos (Drag multiple files here)", | |
file_count="multiple", | |
file_types=["video"] | |
) | |
else: | |
return gr.update( | |
label="Target Video (Video to modify)", | |
file_count="single", | |
file_types=["video"] | |
) | |
except Exception as e: | |
print(f"⚠️ Toggle batch mode error: {e}") | |
return gr.update(label="Target Video") | |
def handle_file_upload(files, use_folder_mode): | |
"""Handle file uploads - single or multiple""" | |
try: | |
if use_folder_mode and files: | |
# Handle batch upload | |
return handle_batch_file_upload(files) | |
elif not use_folder_mode and files: | |
# Single file mode - just return status | |
return f"✅ Single video uploaded: {Path(files.name).name if hasattr(files, 'name') else 'video file'}" | |
else: | |
return "📁 No files uploaded" | |
except Exception as e: | |
return f"❌ Upload error: {e}" | |
# Initialize GPU detection | |
try: | |
AVAILABLE_GPUS = get_available_gpus() | |
print(f"🖥️ Available GPUs: {AVAILABLE_GPUS}") | |
except Exception as e: | |
print(f"⚠️ GPU detection failed: {e}") | |
AVAILABLE_GPUS = ["CPU Only"] | |
# Gradio Interface | |
def create_interface(): | |
with gr.Blocks( | |
theme=gr.themes.Monochrome( | |
primary_hue=gr.themes.colors.teal, | |
secondary_hue=gr.themes.colors.gray, | |
font=gr.themes.GoogleFont('Inter') | |
).set( | |
background_fill_primary="#1f1f1f", | |
background_fill_secondary="#2d2d2d" | |
), | |
css=""" | |
.gradio-container { max-width: 1400px !important; margin: 0 auto !important; } | |
.main-header { text-align: center; padding: 1rem; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 12px; color: white; margin-bottom: 1rem; } | |
.control-panel { background: rgba(102, 126, 234, 0.1); border-radius: 12px; padding: 1rem; margin-bottom: 1rem; border: 2px solid rgba(102, 126, 234, 0.2); } | |
.section-header { font-weight: 600; color: #667eea; margin-bottom: 1rem; border-bottom: 2px solid #667eea; padding-bottom: 0.5rem; } | |
""" | |
) as interface: | |
# Header | |
with gr.Column(elem_classes="main-header"): | |
gr.Markdown("# 🎭 Advanced Face Swap Studio\n**HuggingFace Spaces Optimized**") | |
with gr.Row(): | |
# Left Column - Input & Controls | |
with gr.Column(scale=2): | |
with gr.Group(elem_classes="control-panel"): | |
gr.HTML('<div class="section-header">📸 Input Files</div>') | |
source_image = gr.File( | |
label="Source Image (Face to use)", | |
file_types=["image"], | |
file_count="single" | |
) | |
# Batch mode toggle | |
use_folder_mode = gr.Checkbox( | |
label="📁 Batch Mode (Process multiple videos)", | |
value=False | |
) | |
target_video = gr.File( | |
label="Target Video (Video to modify)", | |
file_types=["video"], | |
file_count="single" | |
) | |
# Upload status display | |
upload_status = gr.Textbox( | |
label="Upload Status", | |
value="Ready to upload files...", | |
interactive=False, | |
lines=3 | |
) | |
with gr.Group(elem_classes="control-panel"): | |
gr.HTML('<div class="section-header">🎮 Controls</div>') | |
start_button = gr.Button("🚀 START PROCESSING", variant="primary", size="lg") | |
action_button = gr.Button("🔄 RESET", variant="secondary", size="lg") | |
download_button = gr.Button("📥 DOWNLOAD", variant="secondary", size="lg") | |
download_status = gr.Textbox( | |
label="Download Status", | |
value="Ready for processing...", | |
interactive=False, | |
lines=2 | |
) | |
download_file = gr.File( | |
label="Download File", | |
visible=False, | |
interactive=False | |
) | |
# Middle Column - Configuration | |
with gr.Column(scale=3): | |
with gr.Group(elem_classes="control-panel"): | |
gr.HTML('<div class="section-header">⚙️ Processing Configuration</div>') | |
with gr.Row(): | |
with gr.Column(): | |
# Frame processing | |
available_processors = ['face_swapper'] | |
if ENHANCEMENT_AVAILABLE: | |
available_processors.extend(['face_enhancer', 'frame_enhancer']) | |
frame_processor = gr.CheckboxGroup( | |
choices=available_processors, | |
label='Frame Processors', | |
value=['face_swapper'] + (['face_enhancer'] if ENHANCEMENT_AVAILABLE else []) | |
) | |
enable_lip_sync = gr.Checkbox(label="🎵 Enable Lip Sync (Beta)", value=False) | |
lip_syncer_model = gr.Dropdown( | |
label='Lip Sync Model', | |
choices=['wav2lip_96', 'wav2lip_gan_96'], | |
value='wav2lip_gan_96', | |
visible=False | |
) | |
with gr.Column(): | |
# Face analysis | |
face_recognition = gr.Dropdown( | |
label='Recognition Mode', | |
choices=['none', 'reference', 'many'], | |
value='reference' | |
) | |
face_analyser_direction = gr.Dropdown( | |
label='Analysis Direction', | |
choices=['left-right', 'right-left', 'top-bottom', 'bottom-top', 'small-large', 'large-small'], | |
value='top-bottom' | |
) | |
face_analyser_gender = gr.Dropdown( | |
label='Target Gender', | |
choices=['none', 'male', 'female'], | |
value='female' | |
) | |
face_analyser_age = gr.Dropdown( | |
label='Target Age Group', | |
choices=['child', 'teen', 'adult', 'senior'], | |
value='adult' | |
) | |
# Right Column - Monitor & Options | |
with gr.Column(scale=3): | |
with gr.Group(elem_classes="control-panel"): | |
gr.HTML('<div class="section-header">🖥️ Processing Monitor</div>') | |
cli_output = gr.Textbox( | |
label="Live Processing Output", | |
lines=15, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="🔧 System ready. Configure settings and start processing..." | |
) | |
with gr.Group(elem_classes="control-panel"): | |
gr.HTML('<div class="section-header">🛠️ Processing Options</div>') | |
with gr.Row(): | |
with gr.Column(): | |
gpu_selection = gr.Dropdown( | |
label="🖥️ Compute Device", | |
choices=AVAILABLE_GPUS, | |
value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only" | |
) | |
skip_audio = gr.Checkbox(label="🔇 Skip Audio", value=False) | |
with gr.Column(): | |
keep_fps = gr.Checkbox(label="🎬 Keep Original FPS", value=True) | |
# Event handlers with error handling | |
try: | |
enable_lip_sync.change( | |
lambda x: gr.update(visible=x), | |
inputs=[enable_lip_sync], | |
outputs=[lip_syncer_model] | |
) | |
use_folder_mode.change( | |
toggle_batch_mode, | |
inputs=[use_folder_mode], | |
outputs=[target_video] | |
) | |
target_video.upload( | |
handle_file_upload, | |
inputs=[target_video, use_folder_mode], | |
outputs=[upload_status] | |
) | |
start_button.click( | |
handle_processing, | |
inputs=[ | |
source_image, target_video, frame_processor, face_analyser_direction, | |
face_recognition, face_analyser_gender, face_analyser_age, | |
skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, | |
use_folder_mode, gpu_selection | |
], | |
outputs=[cli_output, action_button] | |
) | |
action_button.click( | |
handle_action_button, | |
inputs=[ | |
action_button, source_image, target_video, frame_processor, | |
face_analyser_direction, face_recognition, face_analyser_gender, | |
face_analyser_age, skip_audio, keep_fps, lip_syncer_model, | |
enable_lip_sync, use_folder_mode, gpu_selection | |
], | |
outputs=[ | |
source_image, target_video, frame_processor, face_analyser_direction, | |
face_recognition, face_analyser_gender, face_analyser_age, | |
skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, | |
use_folder_mode, gpu_selection, cli_output, action_button | |
] | |
) | |
download_button.click( | |
handle_download, | |
outputs=[download_file, download_status, download_file, download_button] | |
) | |
download_file.change( | |
lambda: (gr.update(visible=False), gr.update(visible=True), "Ready for next download"), | |
outputs=[download_file, download_button, download_status] | |
) | |
except Exception as e: | |
print(f"⚠️ Error setting up event handlers: {e}") | |
return interface | |
# Launch application | |
if __name__ == "__main__": | |
print("\n" + "="*60) | |
print("🎭 Advanced Face Swap Studio - HuggingFace Spaces") | |
print("="*60) | |
print(f"📁 Directories configured:") | |
print(f" - Base: {BASE_DIR}") | |
print(f" - Temp: {TEMP_DIR}") | |
print(f" - Output: {OUTPUT_DIR}") | |
print(f" - Convert: {CONVERT_DIR}") | |
print(f"🖥️ GPU Support: {torch.cuda.is_available()}") | |
print(f"🎬 MoviePy: {'✅' if MOVIEPY_AVAILABLE else '❌'}") | |
print(f"✨ Enhancement: {'✅' if ENHANCEMENT_AVAILABLE else '❌'}") | |
print("="*60) | |
# Clean startup | |
cleanup_temp_files() | |
# Create and launch interface | |
try: | |
app = create_interface() | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False, | |
show_error=True, | |
max_file_size="1500mb" | |
) | |
except Exception as e: | |
print(f"❌ Failed to launch application: {e}") | |
print("🔄 Please check your dependencies and try again") |