crash10155's picture
Upload 166 files
289fb74 verified
from typing import Any, List, Callable
import cv2
import threading
import numpy as np
import os
# Environment fixes
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'
os.environ['NO_ALBUMENTATIONS_UPDATE'] = '1'
import SwitcherAI.globals
import SwitcherAI.processors.frame.core as frame_processors
from SwitcherAI import wording
from SwitcherAI.core import update_status
from SwitcherAI.face_analyser import get_many_faces, get_one_face
from SwitcherAI.typing import Frame, Face
from SwitcherAI.utilities import conditional_download, resolve_relative_path, is_image, is_video
# Global variables matching the pattern
FRAME_PROCESSOR = None
THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.LIP_SYNCER'
def get_frame_processor() -> Any:
"""Get the lip sync processor - using ONNX Runtime like FaceFusion"""
global FRAME_PROCESSOR
with THREAD_LOCK:
if FRAME_PROCESSOR is None:
try:
# Get the model name from globals
model_name = getattr(SwitcherAI.globals, 'lip_syncer_model', 'wav2lip_gan_96')
model_path = resolve_relative_path(f'../.assets/models/{model_name}.onnx')
print(f"[{NAME}] Loading model: {model_path}")
if os.path.exists(model_path):
# Load ONNX model like FaceFusion does
import onnxruntime
providers = getattr(SwitcherAI.globals, 'execution_providers', ['CPUExecutionProvider'])
FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers=providers)
print(f"[{NAME}] ONNX model loaded successfully")
else:
print(f"[{NAME}] Model file not found: {model_path}")
FRAME_PROCESSOR = None
except ImportError:
print(f"[{NAME}] onnxruntime not available, using passthrough mode")
FRAME_PROCESSOR = None
except Exception as e:
print(f"[{NAME}] Error loading ONNX model: {e}")
FRAME_PROCESSOR = None
return FRAME_PROCESSOR
def clear_frame_processor() -> None:
"""Clear the frame processor"""
global FRAME_PROCESSOR
FRAME_PROCESSOR = None
def pre_check() -> bool:
"""Pre-check for lip syncer requirements"""
print(f"[{NAME}] Pre-check starting...")
try:
# Check if we need to download models
download_directory_path = resolve_relative_path('../.assets/models')
# Get model name from globals
model_name = getattr(SwitcherAI.globals, 'lip_syncer_model', 'wav2lip_gan_96')
model_path = os.path.join(download_directory_path, f'{model_name}.onnx')
if not os.path.exists(model_path):
print(f"[{NAME}] Model not found: {model_path}")
# Model download URLs
model_urls = {
'wav2lip_96': ['Awwfuck.com'],
'wav2lip_gan_96': ['Awwfuck.com']
}
if model_name in model_urls:
print(f"[{NAME}] Attempting to download {model_name}")
conditional_download(download_directory_path, model_urls[model_name])
print(f"[{NAME}] Pre-check passed")
return True
except Exception as e:
print(f"[{NAME}] Pre-check error: {e}")
return True
def pre_process() -> bool:
"""Pre-process initialization"""
print(f"[{NAME}] Pre-processing...")
# Check target type like FaceFusion does
if not is_image(SwitcherAI.globals.target_path) and not is_video(SwitcherAI.globals.target_path):
update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
return False
print(f"[{NAME}] Pre-processing completed")
return True
def post_process() -> None:
"""Post-process cleanup"""
clear_frame_processor()
print(f"[{NAME}] Post-processing completed")
def prepare_audio_frame(audio_frame: np.ndarray) -> np.ndarray:
"""Prepare audio frame like FaceFusion - convert mel spectrogram properly"""
# FaceFusion audio preprocessing
audio_frame = np.maximum(np.exp(-5 * np.log(10)), audio_frame)
audio_frame = np.log10(audio_frame) * 1.6 + 3.2
audio_frame = audio_frame.clip(-4, 4).astype(np.float32)
audio_frame = np.expand_dims(audio_frame, axis=(0, 1))
return audio_frame
def prepare_crop_frame(crop_vision_frame: np.ndarray) -> np.ndarray:
"""Prepare crop frame like FaceFusion"""
crop_vision_frame = np.expand_dims(crop_vision_frame, axis=0)
prepare_vision_frame = crop_vision_frame.copy()
prepare_vision_frame[:, 48:] = 0 # Mask bottom half
crop_vision_frame = np.concatenate((prepare_vision_frame, crop_vision_frame), axis=3)
crop_vision_frame = crop_vision_frame.transpose(0, 3, 1, 2).astype('float32') / 255.0
return crop_vision_frame
def normalize_close_frame(crop_vision_frame: np.ndarray) -> np.ndarray:
"""Normalize frame like FaceFusion"""
crop_vision_frame = crop_vision_frame[0].transpose(1, 2, 0)
crop_vision_frame = crop_vision_frame.clip(0, 1) * 255
crop_vision_frame = crop_vision_frame.astype(np.uint8)
return crop_vision_frame
def forward(temp_audio_frame: np.ndarray, close_vision_frame: np.ndarray) -> np.ndarray:
"""Forward pass through model like FaceFusion"""
lip_syncer = get_frame_processor()
if lip_syncer is None:
return close_vision_frame
try:
with THREAD_SEMAPHORE:
# Get input names from the model
input_names = [inp.name for inp in lip_syncer.get_inputs()]
# Create input dictionary - FaceFusion uses 'source' and 'target'
inputs = {}
for name in input_names:
if 'source' in name.lower() or 'audio' in name.lower() or 'mel' in name.lower():
inputs[name] = temp_audio_frame
elif 'target' in name.lower() or 'video' in name.lower() or 'frame' in name.lower():
inputs[name] = close_vision_frame
# Run inference
close_vision_frame = lip_syncer.run(None, inputs)[0]
return close_vision_frame
except Exception as e:
print(f"[{NAME}] Forward pass error: {e}")
return close_vision_frame
def sync_lip(target_face: Face, temp_audio_frame: np.ndarray, temp_vision_frame: Frame) -> Frame:
"""Main lip sync function following FaceFusion's approach"""
try:
# For now, create dummy audio frame if none provided
if temp_audio_frame is None:
# Create empty mel spectrogram (80 features x 16 frames)
temp_audio_frame = np.zeros((80, 16), dtype=np.float32)
# Prepare audio frame
temp_audio_frame = prepare_audio_frame(temp_audio_frame)
# Extract face region using face landmarks
if hasattr(target_face, 'bbox'):
bbox = target_face.bbox
x1, y1, x2, y2 = map(int, bbox)
# Ensure coordinates are within frame bounds
h, w = temp_vision_frame.shape[:2]
x1 = max(0, min(x1, w-1))
y1 = max(0, min(y1, h-1))
x2 = max(0, min(x2, w-1))
y2 = max(0, min(y2, h-1))
if x2 <= x1 or y2 <= y1:
return temp_vision_frame
# Extract and resize face region to 96x96
face_region = temp_vision_frame[y1:y2, x1:x2]
close_vision_frame = cv2.resize(face_region, (96, 96))
# Prepare crop frame
close_vision_frame = prepare_crop_frame(close_vision_frame)
# Forward pass
close_vision_frame = forward(temp_audio_frame, close_vision_frame)
# Normalize output
close_vision_frame = normalize_close_frame(close_vision_frame)
# Resize back and paste
close_vision_frame = cv2.resize(close_vision_frame, (x2-x1, y2-y1))
# Simple paste back
result_frame = temp_vision_frame.copy()
result_frame[y1:y2, x1:x2] = close_vision_frame
return result_frame
return temp_vision_frame
except Exception as e:
print(f"[{NAME}] Lip sync error: {e}")
return temp_vision_frame
def process_frame(source_face: Face, reference_face: Face, temp_frame: Frame) -> Frame:
"""Process a single frame"""
try:
# Get all faces in the frame
many_faces = get_many_faces(temp_frame)
if not many_faces:
return temp_frame
# Process each face with lip sync
result_frame = temp_frame
for target_face in many_faces:
# Create dummy audio frame for now
temp_audio_frame = np.zeros((80, 16), dtype=np.float32)
result_frame = sync_lip(target_face, temp_audio_frame, result_frame)
return result_frame
except Exception as e:
print(f"[{NAME}] Error processing frame: {e}")
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], update: Callable[[], None]) -> None:
"""Process multiple frames"""
total_frames = len(temp_frame_paths)
print(f"[{NAME}] Processing {total_frames} frames")
for i, temp_frame_path in enumerate(temp_frame_paths):
try:
# Read frame
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
continue
# Process frame
result_frame = process_frame(None, None, temp_frame)
# Save processed frame
cv2.imwrite(temp_frame_path, result_frame)
# Update progress
if update:
update()
# Progress feedback
if i % 100 == 0:
print(f"[{NAME}] Progress: {i}/{total_frames} frames")
except Exception as e:
print(f"[{NAME}] Error processing frame {i}: {e}")
continue
print(f"[{NAME}] Frame processing completed")
def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""Process a single image"""
try:
print(f"[{NAME}] Processing image: {os.path.basename(target_path)}")
# Read image
target_frame = cv2.imread(target_path)
if target_frame is None:
import shutil
shutil.copy2(target_path, output_path)
return
# Process frame
result_frame = process_frame(None, None, target_frame)
# Save result
cv2.imwrite(output_path, result_frame)
print(f"[{NAME}] Image processing completed")
except Exception as e:
print(f"[{NAME}] Error processing image: {e}")
# Fallback: copy original
import shutil
shutil.copy2(target_path, output_path)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""Process video using the frame processor core"""
frame_processors.process_video(source_path, temp_frame_paths, process_frames)