Spaces:
Running
Running
| """ | |
| Face Swap Engine Module | |
| This module contains the core face-swapping algorithms. | |
| It's responsible for actually replacing faces from one image with faces from another. | |
| For Non-Technical Developers: | |
| - Uses AI models to swap faces between source (face to copy) and target (face to replace) | |
| - Has multiple algorithms for different situations: | |
| * Standard: Fast and simple | |
| * Multiscale: Tries different detection levels for harder cases | |
| * Landmark-based: Uses facial feature points for precision | |
| * Preprocessing: Enhances images before detection for difficult videos | |
| - Includes face enhancement (making swapped faces look cleaner with GFPGAN) | |
| - Handles memory efficiently for long videos | |
| """ | |
| import gc | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| import threading | |
| import traceback | |
| import onnxruntime as ort | |
| import warnings | |
| from concurrent.futures import ThreadPoolExecutor | |
| from gfpgan import GFPGANer | |
| from insightface.model_zoo.inswapper import INSwapper | |
| # Suppress non-critical deprecation warnings at startup | |
| warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') | |
| warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') | |
| warnings.filterwarnings('ignore', message='.*CUDAExecutionProvider.*') | |
| warnings.filterwarnings('ignore', message='.*pretrained.*') | |
| from src.config import ( | |
| DEVICE, INSWAPPER_MODEL_PATH, DEFAULT_HEADERS, DOWNLOAD_TIMEOUT, | |
| PREPROCESS_STRENGTHS, MAX_WORKERS, TORCH_NUM_THREADS, | |
| ONNX_INTRA_OP_THREADS, NO_FACE_LIMIT, MIN_LANDMARK_CONFIDENCE, | |
| DEBUG_MODE, HF_TOKEN, INSWAPPER_HF_URL, | |
| INSIGHTFACE_MODELS_DIR, GFPGAN_MODELS_DIR, GFPGAN_MODEL_URL | |
| ) | |
| from src.face_detection import ( | |
| get_best_face_in_image, detect_faces_with_multiscale, | |
| is_face_landmark_valid, get_face_embedding_from_url | |
| ) | |
| from src.media_processor import encode_frames_to_gif, encode_frames_to_mp4_base64 | |
| import requests | |
| import os | |
| from src.logger import debug_log, log_error, log_warning | |
| # ==================== MODEL INITIALIZATION ==================== | |
| def _check_and_download_model(url: str, path: str, description: str, token: str = None) -> bool: | |
| """ | |
| Check if a model exists, download if it doesn't. | |
| This first checks if the model file exists. If it does, we skip the download. | |
| If it doesn't exist and we're allowed to download, we download it. | |
| Args: | |
| url: Where to download from | |
| path: Where to save the file | |
| description: Human-readable name of the model | |
| token: Optional authentication token (for Hugging Face) | |
| Returns: | |
| True if model exists or was downloaded successfully, False otherwise | |
| """ | |
| # Check if file already exists | |
| if os.path.exists(path): | |
| debug_log(f"{description} already exists at {path}") | |
| return True | |
| # Create parent directory | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| debug_log(f"Downloading {description}...") | |
| headers = {'User-Agent': 'Mozilla/5.0'} | |
| if token: | |
| headers['Authorization'] = f'Bearer {token}' | |
| try: | |
| with requests.get(url, stream=True, headers=headers, timeout=DOWNLOAD_TIMEOUT) as r: | |
| r.raise_for_status() | |
| with open(path, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=1024*1024): | |
| if chunk: | |
| f.write(chunk) | |
| debug_log(f"Downloaded {description} successfully") | |
| return True | |
| except Exception as e: | |
| log_error(f"Failed to download {description}", detail=f"URL={url} Path={path}", exc=e) | |
| return False | |
| # Try to download the face-swapping model | |
| # If models were pre-downloaded during Docker build, this won't be necessary | |
| if not _check_and_download_model( | |
| INSWAPPER_HF_URL, | |
| INSWAPPER_MODEL_PATH, | |
| "InSwapper face-swapping model", | |
| token=HF_TOKEN | |
| ): | |
| log_warning("InSwapper model download failed or model not found") | |
| # Configure PyTorch and ONNX Runtime (the AI libraries we use) | |
| torch.set_num_threads(TORCH_NUM_THREADS) | |
| # ONNX Runtime: Configuration for the inference engine | |
| onnx_sess_options = ort.SessionOptions() | |
| onnx_sess_options.intra_op_num_threads = ONNX_INTRA_OP_THREADS | |
| onnx_sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| # Initialize the face-swapping model | |
| try: | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if DEVICE == 'cuda' else ['CPUExecutionProvider'] | |
| swapper_session = ort.InferenceSession(INSWAPPER_MODEL_PATH, onnx_sess_options, providers=providers) | |
| face_swapper = INSwapper(model_file=INSWAPPER_MODEL_PATH, session=swapper_session) | |
| debug_log("Face swapper model loaded successfully") | |
| except Exception as e: | |
| log_error("Failed to load face swapper model", exc=e) | |
| face_swapper = None | |
| # Initialize the face enhancement model (makes swapped faces look cleaner) | |
| try: | |
| # Use GFPGAN_MODEL_URL from config to ensure version consistency | |
| face_enhancer = GFPGANer( | |
| model_path=GFPGAN_MODEL_URL, | |
| upscale=1, # Don't upscale (1x = keep same size) | |
| arch='clean', # Use the clean architecture | |
| channel_multiplier=2, | |
| bg_upsampler=None # Don't resize background | |
| ) | |
| FACE_ENHANCEMENT_AVAILABLE = True | |
| debug_log("Face enhancement model loaded successfully (v1.4)") | |
| except Exception as e: | |
| log_error("Face enhancement unavailable", exc=e) | |
| face_enhancer = None | |
| FACE_ENHANCEMENT_AVAILABLE = False | |
| # Threading locks and thread pool | |
| # These prevent multiple threads from accessing models at the same time | |
| model_lock = threading.Lock() # Lock for face swapper | |
| swap_thread_pool = ThreadPoolExecutor(max_workers=MAX_WORKERS) | |
| # ==================== HELPER FUNCTIONS ==================== | |
| def _clear_memory() -> None: | |
| """ | |
| Free up memory to prevent running out of RAM. | |
| This is important for long videos - we need to clean up | |
| processed frames from memory as we go. | |
| """ | |
| gc.collect() # Python garbage collection | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() # Clear GPU memory | |
| def _enhance_face(bgr_frame: np.ndarray) -> np.ndarray: | |
| """ | |
| Apply GFPGAN face enhancement to make the swapped face look better. | |
| GFPGAN makes faces clearer and removes artifacts from the swap. | |
| Args: | |
| bgr_frame: The frame with the swapped face | |
| Returns: | |
| Enhanced frame with cleaner face | |
| """ | |
| if not FACE_ENHANCEMENT_AVAILABLE or face_enhancer is None: | |
| return bgr_frame | |
| try: | |
| _, _, enhanced_frame = face_enhancer.enhance( | |
| bgr_frame, | |
| has_aligned=False, | |
| only_center_face=False, | |
| paste_back=True | |
| ) | |
| return enhanced_frame | |
| except Exception as e: | |
| debug_log(f"Face enhancement failed: {e}") | |
| return bgr_frame | |
| def _swap_face_in_frame(bgr_frame: np.ndarray, target_face: object, source_face: object) -> np.ndarray: | |
| """ | |
| Perform the actual face swap on a single frame. | |
| This is the core operation - it replaces the target face with the source face. | |
| Args: | |
| bgr_frame: The frame to process | |
| target_face: The face to REPLACE (destination) | |
| source_face: The face to COPY FROM (source) | |
| Returns: | |
| Frame with face swapped | |
| Raises: | |
| ValueError: If swap fails | |
| """ | |
| if face_swapper is None: | |
| raise ValueError("Face swapper model not loaded") | |
| try: | |
| with model_lock: | |
| swapped_frame = face_swapper.get( | |
| bgr_frame, | |
| target_face, | |
| source_face, | |
| paste_back=True # Blend swapped face back into frame | |
| ) | |
| return swapped_frame | |
| except Exception as e: | |
| raise ValueError(f"Face swap operation failed: {str(e)}") | |
| def _is_valid_swap_face(face: object, frame_shape: tuple) -> bool: | |
| """ | |
| Validate that a detected face is suitable for swapping. | |
| This helps avoid swapping on poorly detected faces or invalid landmarks, | |
| which can cause artifacts in animated output. | |
| """ | |
| if face is None: | |
| return False | |
| return is_face_landmark_valid(face, frame_shape, MIN_LANDMARK_CONFIDENCE) | |
| def _preprocess_frame_for_detection(bgr_frame: np.ndarray, strength: str = 'medium') -> np.ndarray: | |
| """ | |
| Enhance a frame to make face detection easier. | |
| This is useful when video quality is poor or faces are hard to detect. | |
| Args: | |
| bgr_frame: The frame to preprocess | |
| strength: 'light', 'medium', or 'heavy' - how much to enhance | |
| Returns: | |
| Enhanced frame that's easier to detect faces in | |
| """ | |
| if bgr_frame is None or bgr_frame.size == 0: | |
| return bgr_frame | |
| settings = PREPROCESS_STRENGTHS.get(strength, PREPROCESS_STRENGTHS['medium']) | |
| # Step 1: CLAHE (Contrast Local Adaptive Histogram Equalization) | |
| # Makes dark areas lighter and bright areas darker for better contrast | |
| lab = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2LAB) | |
| l, a, b = cv2.split(lab) | |
| clahe = cv2.createCLAHE( | |
| clipLimit=settings['clahe_limit'], | |
| tileGridSize=(8, 8) | |
| ) | |
| l = clahe.apply(l) | |
| enhanced = cv2.cvtColor(cv2.merge([l, a, b]), cv2.COLOR_LAB2BGR) | |
| # Step 2: Sharpening (makes edges clearer) | |
| sharpen_strength = settings['sharpen_strength'] | |
| sharpening_kernel = np.array([ | |
| [0, -sharpen_strength, 0], | |
| [-sharpen_strength, 1 + 4*sharpen_strength, -sharpen_strength], | |
| [0, -sharpen_strength, 0] | |
| ]) | |
| sharpened = cv2.filter2D(enhanced, -1, sharpening_kernel) | |
| # Step 3: Gamma correction (optional, for heavy preprocessing) | |
| if settings['apply_gamma']: | |
| gamma_lut = np.array([ | |
| ((i / 255.0) ** (1.0 / 1.2)) * 255 | |
| for i in range(256) | |
| ], dtype=np.uint8) | |
| sharpened = cv2.LUT(sharpened, gamma_lut) | |
| return sharpened | |
| def _durations_to_fps(durations: list) -> float: | |
| if not durations: | |
| return 30.0 | |
| avg_ms = sum(durations) / len(durations) | |
| if avg_ms <= 0: | |
| return 30.0 | |
| fps = 1000.0 / avg_ms | |
| return max(1.0, min(60.0, fps)) | |
| def _build_output_result(frames: list, durations: list, target_url: str, target_media_type: str, output_format: str = 'gif'): | |
| gif_base64 = encode_frames_to_gif(frames, durations) | |
| if output_format != 'mp4': | |
| return gif_base64 | |
| fps = _durations_to_fps(durations) | |
| video_base64 = None | |
| try: | |
| audio_url = target_url if target_media_type == 'video' else None | |
| video_base64 = encode_frames_to_mp4_base64(frames, fps, audio_url=audio_url) | |
| except Exception as e: | |
| debug_log(f"MP4 output generation failed: {e}") | |
| return { | |
| 'gif_base64': gif_base64, | |
| 'video_base64': video_base64, | |
| 'video_mime_type': 'video/mp4' | |
| } | |
| # ==================== SWAPPING ALGORITHMS ==================== | |
| def process_swap_standard(source_url: str, target_url: str, use_enhancement: bool = False, max_frames: int = None, start_time: float = None, end_time: float = None, output_format: str = 'gif'): | |
| """ | |
| Standard face-swapping algorithm. | |
| This is the simple, reliable method - good for clear videos. | |
| Args: | |
| source_url: URL of image with the face to COPY FROM | |
| target_url: URL of video/image with faces to REPLACE | |
| use_enhancement: Whether to enhance faces after swapping | |
| max_frames: Maximum frames to process | |
| output_format: 'gif' or 'mp4' | |
| Returns: | |
| Base64-encoded GIF of result or a dict with both GIF and MP4 output | |
| Raises: | |
| ValueError: If processing fails | |
| """ | |
| from src.media_processor import extract_frames | |
| from src.media_handler import get_media_handler | |
| # Get source face | |
| source_face = get_face_embedding_from_url(source_url) | |
| # Get target frames | |
| target_info = get_media_handler(target_url) | |
| target_frames, target_durations = extract_frames( | |
| target_url, | |
| target_info['media_type'], | |
| max_frames, | |
| start_time=start_time, | |
| end_time=end_time | |
| ) | |
| output_frames = [] | |
| locked_face = None # Remember last valid detected face | |
| last_swapped_frame = None | |
| consecutive_no_face = 0 | |
| for i, bgr_frame in enumerate(target_frames): | |
| if bgr_frame is None: | |
| continue | |
| current_face = get_best_face_in_image(bgr_frame) | |
| if not _is_valid_swap_face(current_face, bgr_frame.shape): | |
| if current_face is not None: | |
| debug_log(f"Frame {i}: detected face rejected for swap due to poor landmarks/pose") | |
| current_face = None | |
| consecutive_no_face += 1 | |
| else: | |
| locked_face = current_face | |
| consecutive_no_face = 0 | |
| face_to_swap = current_face or (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None) | |
| if face_to_swap: | |
| try: | |
| swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face) | |
| if use_enhancement: | |
| swapped = _enhance_face(swapped) | |
| output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB) | |
| last_swapped_frame = output_frame.copy() | |
| except Exception as e: | |
| debug_log(f"Swap failed at frame {i}: {e}") | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| else: | |
| if last_swapped_frame is not None: | |
| debug_log(f"No valid face to swap at frame {i}; preserving last swapped frame") | |
| output_frame = last_swapped_frame.copy() | |
| else: | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| output_frames.append(output_frame) | |
| if i % 30 == 0: | |
| _clear_memory() | |
| return _build_output_result(output_frames, target_durations, target_url, target_info['media_type'], output_format) | |
| def process_swap_multiscale(source_url: str, target_url: str, use_enhancement: bool = False, max_frames: int = None, start_time: float = None, end_time: float = None, output_format: str = 'gif'): | |
| """ | |
| Multiscale face-swapping algorithm. | |
| This tries multiple confidence levels to find difficult-to-detect faces. | |
| Good for poor-quality videos or small faces. | |
| Args: | |
| source_url: URL of image with source face | |
| target_url: URL of video to process | |
| use_enhancement: Whether to enhance faces | |
| max_frames: Maximum frames | |
| output_format: 'gif' or 'mp4' | |
| Returns: | |
| Base64-encoded GIF or dict with mp4 output | |
| """ | |
| from src.media_processor import extract_frames | |
| from src.media_handler import get_media_handler | |
| source_face = get_face_embedding_from_url(source_url) | |
| target_info = get_media_handler(target_url) | |
| target_frames, target_durations = extract_frames( | |
| target_url, | |
| target_info['media_type'], | |
| max_frames, | |
| start_time=start_time, | |
| end_time=end_time | |
| ) | |
| output_frames = [] | |
| locked_face = None | |
| last_swapped_frame = None | |
| consecutive_no_face = 0 | |
| for i, bgr_frame in enumerate(target_frames): | |
| if bgr_frame is None: | |
| continue | |
| current_face = detect_faces_with_multiscale(bgr_frame) | |
| if not _is_valid_swap_face(current_face, bgr_frame.shape): | |
| if current_face is not None: | |
| debug_log(f"Frame {i}: multiscale detected face rejected for swap due to poor landmarks/pose") | |
| current_face = None | |
| consecutive_no_face += 1 | |
| else: | |
| locked_face = current_face | |
| consecutive_no_face = 0 | |
| face_to_swap = current_face or (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None) | |
| if face_to_swap: | |
| try: | |
| swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face) | |
| if use_enhancement: | |
| swapped = _enhance_face(swapped) | |
| output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB) | |
| last_swapped_frame = output_frame.copy() | |
| except Exception as e: | |
| debug_log(f"Multiscale swap failed at frame {i}: {e}") | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| else: | |
| if last_swapped_frame is not None: | |
| debug_log(f"No valid face to swap at frame {i}; preserving last swapped frame") | |
| output_frame = last_swapped_frame.copy() | |
| else: | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| output_frames.append(output_frame) | |
| if i % 30 == 0: | |
| _clear_memory() | |
| return _build_output_result(output_frames, target_durations, target_url, target_info['media_type'], output_format) | |
| def process_swap_landmark(source_url: str, target_url: str, use_enhancement: bool = False, | |
| max_frames: int = None, start_time: float = None, end_time: float = None, min_landmark_confidence: float = 0.2, output_format: str = 'gif'): | |
| """ | |
| Landmark-based face-swapping algorithm. | |
| This validates that facial features (landmarks) are detected correctly. | |
| Most accurate but slower - good for high-quality output. | |
| Args: | |
| source_url: URL of source face | |
| target_url: URL of target | |
| use_enhancement: Whether to enhance | |
| max_frames: Max frames | |
| min_landmark_confidence: Minimum confidence for landmarks | |
| Returns: | |
| Base64-encoded GIF | |
| """ | |
| from src.media_processor import extract_frames | |
| from src.media_handler import get_media_handler | |
| source_face = get_face_embedding_from_url(source_url) | |
| target_info = get_media_handler(target_url) | |
| target_frames, target_durations = extract_frames( | |
| target_url, | |
| target_info['media_type'], | |
| max_frames, | |
| start_time=start_time, | |
| end_time=end_time | |
| ) | |
| output_frames = [] | |
| locked_face = None | |
| last_swapped_frame = None | |
| consecutive_no_face = 0 | |
| for i, bgr_frame in enumerate(target_frames): | |
| if bgr_frame is None: | |
| continue | |
| current_face = detect_faces_with_multiscale(bgr_frame) | |
| # Validate that landmarks are good | |
| is_valid = is_face_landmark_valid(current_face, bgr_frame.shape, min_landmark_confidence) | |
| if is_valid: | |
| locked_face = current_face | |
| consecutive_no_face = 0 | |
| else: | |
| consecutive_no_face += 1 | |
| face_to_swap = current_face if is_valid else (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None) | |
| if face_to_swap: | |
| try: | |
| swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face) | |
| if use_enhancement: | |
| swapped = _enhance_face(swapped) | |
| output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB) | |
| last_swapped_frame = output_frame.copy() | |
| except Exception as e: | |
| debug_log(f"Landmark swap failed at frame {i}: {e}") | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| else: | |
| if last_swapped_frame is not None: | |
| debug_log(f"No valid landmarks at frame {i}; preserving last swapped frame") | |
| output_frame = last_swapped_frame.copy() | |
| else: | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| output_frames.append(output_frame) | |
| if i % 30 == 0: | |
| _clear_memory() | |
| return encode_frames_to_gif(output_frames, target_durations) | |
| def process_swap_preprocess(source_url: str, target_url: str, use_enhancement: bool = False, | |
| max_frames: int = None, start_time: float = None, end_time: float = None, preprocess_strength: str = 'medium', output_format: str = 'gif'): | |
| """ | |
| Preprocessing-based face-swapping algorithm. | |
| This preprocesses frames to improve face detection on low-quality videos. | |
| Args: | |
| source_url: URL of source face | |
| target_url: URL of target | |
| use_enhancement: Whether to enhance | |
| max_frames: Max frames | |
| preprocess_strength: 'light', 'medium', or 'heavy' | |
| output_format: 'gif' or 'mp4' | |
| Returns: | |
| Base64-encoded GIF or dict with mp4 output | |
| """ | |
| from src.media_processor import extract_frames | |
| from src.media_handler import get_media_handler | |
| source_face = get_face_embedding_from_url(source_url) | |
| target_info = get_media_handler(target_url) | |
| target_frames, target_durations = extract_frames( | |
| target_url, | |
| target_info['media_type'], | |
| max_frames, | |
| start_time=start_time, | |
| end_time=end_time | |
| ) | |
| output_frames = [] | |
| locked_face = None | |
| last_swapped_frame = None | |
| consecutive_no_face = 0 | |
| for i, bgr_frame in enumerate(target_frames): | |
| if bgr_frame is None: | |
| continue | |
| preprocessed = _preprocess_frame_for_detection(bgr_frame, strength=preprocess_strength) | |
| current_face = get_best_face_in_image(preprocessed) | |
| if not _is_valid_swap_face(current_face, bgr_frame.shape): | |
| current_face = get_best_face_in_image(bgr_frame) | |
| if not _is_valid_swap_face(current_face, bgr_frame.shape): | |
| if current_face is not None: | |
| debug_log(f"Frame {i}: preprocess detected face rejected for swap due to poor landmarks/pose") | |
| current_face = None | |
| consecutive_no_face += 1 | |
| else: | |
| locked_face = current_face | |
| consecutive_no_face = 0 | |
| face_to_swap = current_face or (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None) | |
| if face_to_swap: | |
| try: | |
| swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face) | |
| if use_enhancement: | |
| swapped = _enhance_face(swapped) | |
| output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB) | |
| last_swapped_frame = output_frame.copy() | |
| except Exception as e: | |
| debug_log(f"Preprocess swap failed at frame {i}: {e}") | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| else: | |
| if last_swapped_frame is not None: | |
| debug_log(f"No valid face to swap at frame {i}; preserving last swapped frame") | |
| output_frame = last_swapped_frame.copy() | |
| else: | |
| output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB) | |
| output_frames.append(output_frame) | |
| if i % 30 == 0: | |
| _clear_memory() | |
| return _build_output_result(output_frames, target_durations, target_url, target_info['media_type'], output_format) | |
| def clear_swap_cache() -> None: | |
| """ | |
| Clear all caches and free memory. | |
| """ | |
| from src.face_detection import clear_face_embedding_cache | |
| from src.media_processor import clear_frame_cache | |
| clear_face_embedding_cache() | |
| clear_frame_cache() | |
| _clear_memory() | |