SwapMe / src /face_swap_engine.py
Help
back support trimming from frame start to end
a5030ef
"""
Face Swap Engine Module
This module contains the core face-swapping algorithms.
It's responsible for actually replacing faces from one image with faces from another.
For Non-Technical Developers:
- Uses AI models to swap faces between source (face to copy) and target (face to replace)
- Has multiple algorithms for different situations:
* Standard: Fast and simple
* Multiscale: Tries different detection levels for harder cases
* Landmark-based: Uses facial feature points for precision
* Preprocessing: Enhances images before detection for difficult videos
- Includes face enhancement (making swapped faces look cleaner with GFPGAN)
- Handles memory efficiently for long videos
"""
import gc
import cv2
import numpy as np
import torch
import threading
import traceback
import onnxruntime as ort
import warnings
from concurrent.futures import ThreadPoolExecutor
from gfpgan import GFPGANer
from insightface.model_zoo.inswapper import INSwapper
# Suppress non-critical deprecation warnings at startup
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', message='.*CUDAExecutionProvider.*')
warnings.filterwarnings('ignore', message='.*pretrained.*')
from src.config import (
DEVICE, INSWAPPER_MODEL_PATH, DEFAULT_HEADERS, DOWNLOAD_TIMEOUT,
PREPROCESS_STRENGTHS, MAX_WORKERS, TORCH_NUM_THREADS,
ONNX_INTRA_OP_THREADS, NO_FACE_LIMIT, MIN_LANDMARK_CONFIDENCE,
DEBUG_MODE, HF_TOKEN, INSWAPPER_HF_URL,
INSIGHTFACE_MODELS_DIR, GFPGAN_MODELS_DIR, GFPGAN_MODEL_URL
)
from src.face_detection import (
get_best_face_in_image, detect_faces_with_multiscale,
is_face_landmark_valid, get_face_embedding_from_url
)
from src.media_processor import encode_frames_to_gif, encode_frames_to_mp4_base64
import requests
import os
from src.logger import debug_log, log_error, log_warning
# ==================== MODEL INITIALIZATION ====================
def _check_and_download_model(url: str, path: str, description: str, token: str = None) -> bool:
"""
Check if a model exists, download if it doesn't.
This first checks if the model file exists. If it does, we skip the download.
If it doesn't exist and we're allowed to download, we download it.
Args:
url: Where to download from
path: Where to save the file
description: Human-readable name of the model
token: Optional authentication token (for Hugging Face)
Returns:
True if model exists or was downloaded successfully, False otherwise
"""
# Check if file already exists
if os.path.exists(path):
debug_log(f"{description} already exists at {path}")
return True
# Create parent directory
os.makedirs(os.path.dirname(path), exist_ok=True)
debug_log(f"Downloading {description}...")
headers = {'User-Agent': 'Mozilla/5.0'}
if token:
headers['Authorization'] = f'Bearer {token}'
try:
with requests.get(url, stream=True, headers=headers, timeout=DOWNLOAD_TIMEOUT) as r:
r.raise_for_status()
with open(path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024*1024):
if chunk:
f.write(chunk)
debug_log(f"Downloaded {description} successfully")
return True
except Exception as e:
log_error(f"Failed to download {description}", detail=f"URL={url} Path={path}", exc=e)
return False
# Try to download the face-swapping model
# If models were pre-downloaded during Docker build, this won't be necessary
if not _check_and_download_model(
INSWAPPER_HF_URL,
INSWAPPER_MODEL_PATH,
"InSwapper face-swapping model",
token=HF_TOKEN
):
log_warning("InSwapper model download failed or model not found")
# Configure PyTorch and ONNX Runtime (the AI libraries we use)
torch.set_num_threads(TORCH_NUM_THREADS)
# ONNX Runtime: Configuration for the inference engine
onnx_sess_options = ort.SessionOptions()
onnx_sess_options.intra_op_num_threads = ONNX_INTRA_OP_THREADS
onnx_sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
# Initialize the face-swapping model
try:
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if DEVICE == 'cuda' else ['CPUExecutionProvider']
swapper_session = ort.InferenceSession(INSWAPPER_MODEL_PATH, onnx_sess_options, providers=providers)
face_swapper = INSwapper(model_file=INSWAPPER_MODEL_PATH, session=swapper_session)
debug_log("Face swapper model loaded successfully")
except Exception as e:
log_error("Failed to load face swapper model", exc=e)
face_swapper = None
# Initialize the face enhancement model (makes swapped faces look cleaner)
try:
# Use GFPGAN_MODEL_URL from config to ensure version consistency
face_enhancer = GFPGANer(
model_path=GFPGAN_MODEL_URL,
upscale=1, # Don't upscale (1x = keep same size)
arch='clean', # Use the clean architecture
channel_multiplier=2,
bg_upsampler=None # Don't resize background
)
FACE_ENHANCEMENT_AVAILABLE = True
debug_log("Face enhancement model loaded successfully (v1.4)")
except Exception as e:
log_error("Face enhancement unavailable", exc=e)
face_enhancer = None
FACE_ENHANCEMENT_AVAILABLE = False
# Threading locks and thread pool
# These prevent multiple threads from accessing models at the same time
model_lock = threading.Lock() # Lock for face swapper
swap_thread_pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# ==================== HELPER FUNCTIONS ====================
def _clear_memory() -> None:
"""
Free up memory to prevent running out of RAM.
This is important for long videos - we need to clean up
processed frames from memory as we go.
"""
gc.collect() # Python garbage collection
if torch.cuda.is_available():
torch.cuda.empty_cache() # Clear GPU memory
def _enhance_face(bgr_frame: np.ndarray) -> np.ndarray:
"""
Apply GFPGAN face enhancement to make the swapped face look better.
GFPGAN makes faces clearer and removes artifacts from the swap.
Args:
bgr_frame: The frame with the swapped face
Returns:
Enhanced frame with cleaner face
"""
if not FACE_ENHANCEMENT_AVAILABLE or face_enhancer is None:
return bgr_frame
try:
_, _, enhanced_frame = face_enhancer.enhance(
bgr_frame,
has_aligned=False,
only_center_face=False,
paste_back=True
)
return enhanced_frame
except Exception as e:
debug_log(f"Face enhancement failed: {e}")
return bgr_frame
def _swap_face_in_frame(bgr_frame: np.ndarray, target_face: object, source_face: object) -> np.ndarray:
"""
Perform the actual face swap on a single frame.
This is the core operation - it replaces the target face with the source face.
Args:
bgr_frame: The frame to process
target_face: The face to REPLACE (destination)
source_face: The face to COPY FROM (source)
Returns:
Frame with face swapped
Raises:
ValueError: If swap fails
"""
if face_swapper is None:
raise ValueError("Face swapper model not loaded")
try:
with model_lock:
swapped_frame = face_swapper.get(
bgr_frame,
target_face,
source_face,
paste_back=True # Blend swapped face back into frame
)
return swapped_frame
except Exception as e:
raise ValueError(f"Face swap operation failed: {str(e)}")
def _is_valid_swap_face(face: object, frame_shape: tuple) -> bool:
"""
Validate that a detected face is suitable for swapping.
This helps avoid swapping on poorly detected faces or invalid landmarks,
which can cause artifacts in animated output.
"""
if face is None:
return False
return is_face_landmark_valid(face, frame_shape, MIN_LANDMARK_CONFIDENCE)
def _preprocess_frame_for_detection(bgr_frame: np.ndarray, strength: str = 'medium') -> np.ndarray:
"""
Enhance a frame to make face detection easier.
This is useful when video quality is poor or faces are hard to detect.
Args:
bgr_frame: The frame to preprocess
strength: 'light', 'medium', or 'heavy' - how much to enhance
Returns:
Enhanced frame that's easier to detect faces in
"""
if bgr_frame is None or bgr_frame.size == 0:
return bgr_frame
settings = PREPROCESS_STRENGTHS.get(strength, PREPROCESS_STRENGTHS['medium'])
# Step 1: CLAHE (Contrast Local Adaptive Histogram Equalization)
# Makes dark areas lighter and bright areas darker for better contrast
lab = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(
clipLimit=settings['clahe_limit'],
tileGridSize=(8, 8)
)
l = clahe.apply(l)
enhanced = cv2.cvtColor(cv2.merge([l, a, b]), cv2.COLOR_LAB2BGR)
# Step 2: Sharpening (makes edges clearer)
sharpen_strength = settings['sharpen_strength']
sharpening_kernel = np.array([
[0, -sharpen_strength, 0],
[-sharpen_strength, 1 + 4*sharpen_strength, -sharpen_strength],
[0, -sharpen_strength, 0]
])
sharpened = cv2.filter2D(enhanced, -1, sharpening_kernel)
# Step 3: Gamma correction (optional, for heavy preprocessing)
if settings['apply_gamma']:
gamma_lut = np.array([
((i / 255.0) ** (1.0 / 1.2)) * 255
for i in range(256)
], dtype=np.uint8)
sharpened = cv2.LUT(sharpened, gamma_lut)
return sharpened
def _durations_to_fps(durations: list) -> float:
if not durations:
return 30.0
avg_ms = sum(durations) / len(durations)
if avg_ms <= 0:
return 30.0
fps = 1000.0 / avg_ms
return max(1.0, min(60.0, fps))
def _build_output_result(frames: list, durations: list, target_url: str, target_media_type: str, output_format: str = 'gif'):
gif_base64 = encode_frames_to_gif(frames, durations)
if output_format != 'mp4':
return gif_base64
fps = _durations_to_fps(durations)
video_base64 = None
try:
audio_url = target_url if target_media_type == 'video' else None
video_base64 = encode_frames_to_mp4_base64(frames, fps, audio_url=audio_url)
except Exception as e:
debug_log(f"MP4 output generation failed: {e}")
return {
'gif_base64': gif_base64,
'video_base64': video_base64,
'video_mime_type': 'video/mp4'
}
# ==================== SWAPPING ALGORITHMS ====================
def process_swap_standard(source_url: str, target_url: str, use_enhancement: bool = False, max_frames: int = None, start_time: float = None, end_time: float = None, output_format: str = 'gif'):
"""
Standard face-swapping algorithm.
This is the simple, reliable method - good for clear videos.
Args:
source_url: URL of image with the face to COPY FROM
target_url: URL of video/image with faces to REPLACE
use_enhancement: Whether to enhance faces after swapping
max_frames: Maximum frames to process
output_format: 'gif' or 'mp4'
Returns:
Base64-encoded GIF of result or a dict with both GIF and MP4 output
Raises:
ValueError: If processing fails
"""
from src.media_processor import extract_frames
from src.media_handler import get_media_handler
# Get source face
source_face = get_face_embedding_from_url(source_url)
# Get target frames
target_info = get_media_handler(target_url)
target_frames, target_durations = extract_frames(
target_url,
target_info['media_type'],
max_frames,
start_time=start_time,
end_time=end_time
)
output_frames = []
locked_face = None # Remember last valid detected face
last_swapped_frame = None
consecutive_no_face = 0
for i, bgr_frame in enumerate(target_frames):
if bgr_frame is None:
continue
current_face = get_best_face_in_image(bgr_frame)
if not _is_valid_swap_face(current_face, bgr_frame.shape):
if current_face is not None:
debug_log(f"Frame {i}: detected face rejected for swap due to poor landmarks/pose")
current_face = None
consecutive_no_face += 1
else:
locked_face = current_face
consecutive_no_face = 0
face_to_swap = current_face or (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None)
if face_to_swap:
try:
swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face)
if use_enhancement:
swapped = _enhance_face(swapped)
output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB)
last_swapped_frame = output_frame.copy()
except Exception as e:
debug_log(f"Swap failed at frame {i}: {e}")
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
else:
if last_swapped_frame is not None:
debug_log(f"No valid face to swap at frame {i}; preserving last swapped frame")
output_frame = last_swapped_frame.copy()
else:
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
output_frames.append(output_frame)
if i % 30 == 0:
_clear_memory()
return _build_output_result(output_frames, target_durations, target_url, target_info['media_type'], output_format)
def process_swap_multiscale(source_url: str, target_url: str, use_enhancement: bool = False, max_frames: int = None, start_time: float = None, end_time: float = None, output_format: str = 'gif'):
"""
Multiscale face-swapping algorithm.
This tries multiple confidence levels to find difficult-to-detect faces.
Good for poor-quality videos or small faces.
Args:
source_url: URL of image with source face
target_url: URL of video to process
use_enhancement: Whether to enhance faces
max_frames: Maximum frames
output_format: 'gif' or 'mp4'
Returns:
Base64-encoded GIF or dict with mp4 output
"""
from src.media_processor import extract_frames
from src.media_handler import get_media_handler
source_face = get_face_embedding_from_url(source_url)
target_info = get_media_handler(target_url)
target_frames, target_durations = extract_frames(
target_url,
target_info['media_type'],
max_frames,
start_time=start_time,
end_time=end_time
)
output_frames = []
locked_face = None
last_swapped_frame = None
consecutive_no_face = 0
for i, bgr_frame in enumerate(target_frames):
if bgr_frame is None:
continue
current_face = detect_faces_with_multiscale(bgr_frame)
if not _is_valid_swap_face(current_face, bgr_frame.shape):
if current_face is not None:
debug_log(f"Frame {i}: multiscale detected face rejected for swap due to poor landmarks/pose")
current_face = None
consecutive_no_face += 1
else:
locked_face = current_face
consecutive_no_face = 0
face_to_swap = current_face or (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None)
if face_to_swap:
try:
swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face)
if use_enhancement:
swapped = _enhance_face(swapped)
output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB)
last_swapped_frame = output_frame.copy()
except Exception as e:
debug_log(f"Multiscale swap failed at frame {i}: {e}")
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
else:
if last_swapped_frame is not None:
debug_log(f"No valid face to swap at frame {i}; preserving last swapped frame")
output_frame = last_swapped_frame.copy()
else:
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
output_frames.append(output_frame)
if i % 30 == 0:
_clear_memory()
return _build_output_result(output_frames, target_durations, target_url, target_info['media_type'], output_format)
def process_swap_landmark(source_url: str, target_url: str, use_enhancement: bool = False,
max_frames: int = None, start_time: float = None, end_time: float = None, min_landmark_confidence: float = 0.2, output_format: str = 'gif'):
"""
Landmark-based face-swapping algorithm.
This validates that facial features (landmarks) are detected correctly.
Most accurate but slower - good for high-quality output.
Args:
source_url: URL of source face
target_url: URL of target
use_enhancement: Whether to enhance
max_frames: Max frames
min_landmark_confidence: Minimum confidence for landmarks
Returns:
Base64-encoded GIF
"""
from src.media_processor import extract_frames
from src.media_handler import get_media_handler
source_face = get_face_embedding_from_url(source_url)
target_info = get_media_handler(target_url)
target_frames, target_durations = extract_frames(
target_url,
target_info['media_type'],
max_frames,
start_time=start_time,
end_time=end_time
)
output_frames = []
locked_face = None
last_swapped_frame = None
consecutive_no_face = 0
for i, bgr_frame in enumerate(target_frames):
if bgr_frame is None:
continue
current_face = detect_faces_with_multiscale(bgr_frame)
# Validate that landmarks are good
is_valid = is_face_landmark_valid(current_face, bgr_frame.shape, min_landmark_confidence)
if is_valid:
locked_face = current_face
consecutive_no_face = 0
else:
consecutive_no_face += 1
face_to_swap = current_face if is_valid else (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None)
if face_to_swap:
try:
swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face)
if use_enhancement:
swapped = _enhance_face(swapped)
output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB)
last_swapped_frame = output_frame.copy()
except Exception as e:
debug_log(f"Landmark swap failed at frame {i}: {e}")
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
else:
if last_swapped_frame is not None:
debug_log(f"No valid landmarks at frame {i}; preserving last swapped frame")
output_frame = last_swapped_frame.copy()
else:
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
output_frames.append(output_frame)
if i % 30 == 0:
_clear_memory()
return encode_frames_to_gif(output_frames, target_durations)
def process_swap_preprocess(source_url: str, target_url: str, use_enhancement: bool = False,
max_frames: int = None, start_time: float = None, end_time: float = None, preprocess_strength: str = 'medium', output_format: str = 'gif'):
"""
Preprocessing-based face-swapping algorithm.
This preprocesses frames to improve face detection on low-quality videos.
Args:
source_url: URL of source face
target_url: URL of target
use_enhancement: Whether to enhance
max_frames: Max frames
preprocess_strength: 'light', 'medium', or 'heavy'
output_format: 'gif' or 'mp4'
Returns:
Base64-encoded GIF or dict with mp4 output
"""
from src.media_processor import extract_frames
from src.media_handler import get_media_handler
source_face = get_face_embedding_from_url(source_url)
target_info = get_media_handler(target_url)
target_frames, target_durations = extract_frames(
target_url,
target_info['media_type'],
max_frames,
start_time=start_time,
end_time=end_time
)
output_frames = []
locked_face = None
last_swapped_frame = None
consecutive_no_face = 0
for i, bgr_frame in enumerate(target_frames):
if bgr_frame is None:
continue
preprocessed = _preprocess_frame_for_detection(bgr_frame, strength=preprocess_strength)
current_face = get_best_face_in_image(preprocessed)
if not _is_valid_swap_face(current_face, bgr_frame.shape):
current_face = get_best_face_in_image(bgr_frame)
if not _is_valid_swap_face(current_face, bgr_frame.shape):
if current_face is not None:
debug_log(f"Frame {i}: preprocess detected face rejected for swap due to poor landmarks/pose")
current_face = None
consecutive_no_face += 1
else:
locked_face = current_face
consecutive_no_face = 0
face_to_swap = current_face or (locked_face if consecutive_no_face <= NO_FACE_LIMIT else None)
if face_to_swap:
try:
swapped = _swap_face_in_frame(bgr_frame, face_to_swap, source_face)
if use_enhancement:
swapped = _enhance_face(swapped)
output_frame = cv2.cvtColor(swapped, cv2.COLOR_BGR2RGB)
last_swapped_frame = output_frame.copy()
except Exception as e:
debug_log(f"Preprocess swap failed at frame {i}: {e}")
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
else:
if last_swapped_frame is not None:
debug_log(f"No valid face to swap at frame {i}; preserving last swapped frame")
output_frame = last_swapped_frame.copy()
else:
output_frame = cv2.cvtColor(bgr_frame, cv2.COLOR_BGR2RGB)
output_frames.append(output_frame)
if i % 30 == 0:
_clear_memory()
return _build_output_result(output_frames, target_durations, target_url, target_info['media_type'], output_format)
def clear_swap_cache() -> None:
"""
Clear all caches and free memory.
"""
from src.face_detection import clear_face_embedding_cache
from src.media_processor import clear_frame_cache
clear_face_embedding_cache()
clear_frame_cache()
_clear_memory()