Spaces:
Paused
Paused
File size: 11,854 Bytes
289fb74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
from typing import Any, List, Callable
import cv2
import threading
import numpy as np
import os
# Environment fixes
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'
os.environ['NO_ALBUMENTATIONS_UPDATE'] = '1'
import SwitcherAI.globals
import SwitcherAI.processors.frame.core as frame_processors
from SwitcherAI import wording
from SwitcherAI.core import update_status
from SwitcherAI.face_analyser import get_many_faces, get_one_face
from SwitcherAI.typing import Frame, Face
from SwitcherAI.utilities import conditional_download, resolve_relative_path, is_image, is_video
# Global variables matching the pattern
FRAME_PROCESSOR = None
THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.LIP_SYNCER'
def get_frame_processor() -> Any:
"""Get the lip sync processor - using ONNX Runtime like FaceFusion"""
global FRAME_PROCESSOR
with THREAD_LOCK:
if FRAME_PROCESSOR is None:
try:
# Get the model name from globals
model_name = getattr(SwitcherAI.globals, 'lip_syncer_model', 'wav2lip_gan_96')
model_path = resolve_relative_path(f'../.assets/models/{model_name}.onnx')
print(f"[{NAME}] Loading model: {model_path}")
if os.path.exists(model_path):
# Load ONNX model like FaceFusion does
import onnxruntime
providers = getattr(SwitcherAI.globals, 'execution_providers', ['CPUExecutionProvider'])
FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers=providers)
print(f"[{NAME}] ONNX model loaded successfully")
else:
print(f"[{NAME}] Model file not found: {model_path}")
FRAME_PROCESSOR = None
except ImportError:
print(f"[{NAME}] onnxruntime not available, using passthrough mode")
FRAME_PROCESSOR = None
except Exception as e:
print(f"[{NAME}] Error loading ONNX model: {e}")
FRAME_PROCESSOR = None
return FRAME_PROCESSOR
def clear_frame_processor() -> None:
"""Clear the frame processor"""
global FRAME_PROCESSOR
FRAME_PROCESSOR = None
def pre_check() -> bool:
"""Pre-check for lip syncer requirements"""
print(f"[{NAME}] Pre-check starting...")
try:
# Check if we need to download models
download_directory_path = resolve_relative_path('../.assets/models')
# Get model name from globals
model_name = getattr(SwitcherAI.globals, 'lip_syncer_model', 'wav2lip_gan_96')
model_path = os.path.join(download_directory_path, f'{model_name}.onnx')
if not os.path.exists(model_path):
print(f"[{NAME}] Model not found: {model_path}")
# Model download URLs
model_urls = {
'wav2lip_96': ['Awwfuck.com'],
'wav2lip_gan_96': ['Awwfuck.com']
}
if model_name in model_urls:
print(f"[{NAME}] Attempting to download {model_name}")
conditional_download(download_directory_path, model_urls[model_name])
print(f"[{NAME}] Pre-check passed")
return True
except Exception as e:
print(f"[{NAME}] Pre-check error: {e}")
return True
def pre_process() -> bool:
"""Pre-process initialization"""
print(f"[{NAME}] Pre-processing...")
# Check target type like FaceFusion does
if not is_image(SwitcherAI.globals.target_path) and not is_video(SwitcherAI.globals.target_path):
update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
return False
print(f"[{NAME}] Pre-processing completed")
return True
def post_process() -> None:
"""Post-process cleanup"""
clear_frame_processor()
print(f"[{NAME}] Post-processing completed")
def prepare_audio_frame(audio_frame: np.ndarray) -> np.ndarray:
"""Prepare audio frame like FaceFusion - convert mel spectrogram properly"""
# FaceFusion audio preprocessing
audio_frame = np.maximum(np.exp(-5 * np.log(10)), audio_frame)
audio_frame = np.log10(audio_frame) * 1.6 + 3.2
audio_frame = audio_frame.clip(-4, 4).astype(np.float32)
audio_frame = np.expand_dims(audio_frame, axis=(0, 1))
return audio_frame
def prepare_crop_frame(crop_vision_frame: np.ndarray) -> np.ndarray:
"""Prepare crop frame like FaceFusion"""
crop_vision_frame = np.expand_dims(crop_vision_frame, axis=0)
prepare_vision_frame = crop_vision_frame.copy()
prepare_vision_frame[:, 48:] = 0 # Mask bottom half
crop_vision_frame = np.concatenate((prepare_vision_frame, crop_vision_frame), axis=3)
crop_vision_frame = crop_vision_frame.transpose(0, 3, 1, 2).astype('float32') / 255.0
return crop_vision_frame
def normalize_close_frame(crop_vision_frame: np.ndarray) -> np.ndarray:
"""Normalize frame like FaceFusion"""
crop_vision_frame = crop_vision_frame[0].transpose(1, 2, 0)
crop_vision_frame = crop_vision_frame.clip(0, 1) * 255
crop_vision_frame = crop_vision_frame.astype(np.uint8)
return crop_vision_frame
def forward(temp_audio_frame: np.ndarray, close_vision_frame: np.ndarray) -> np.ndarray:
"""Forward pass through model like FaceFusion"""
lip_syncer = get_frame_processor()
if lip_syncer is None:
return close_vision_frame
try:
with THREAD_SEMAPHORE:
# Get input names from the model
input_names = [inp.name for inp in lip_syncer.get_inputs()]
# Create input dictionary - FaceFusion uses 'source' and 'target'
inputs = {}
for name in input_names:
if 'source' in name.lower() or 'audio' in name.lower() or 'mel' in name.lower():
inputs[name] = temp_audio_frame
elif 'target' in name.lower() or 'video' in name.lower() or 'frame' in name.lower():
inputs[name] = close_vision_frame
# Run inference
close_vision_frame = lip_syncer.run(None, inputs)[0]
return close_vision_frame
except Exception as e:
print(f"[{NAME}] Forward pass error: {e}")
return close_vision_frame
def sync_lip(target_face: Face, temp_audio_frame: np.ndarray, temp_vision_frame: Frame) -> Frame:
"""Main lip sync function following FaceFusion's approach"""
try:
# For now, create dummy audio frame if none provided
if temp_audio_frame is None:
# Create empty mel spectrogram (80 features x 16 frames)
temp_audio_frame = np.zeros((80, 16), dtype=np.float32)
# Prepare audio frame
temp_audio_frame = prepare_audio_frame(temp_audio_frame)
# Extract face region using face landmarks
if hasattr(target_face, 'bbox'):
bbox = target_face.bbox
x1, y1, x2, y2 = map(int, bbox)
# Ensure coordinates are within frame bounds
h, w = temp_vision_frame.shape[:2]
x1 = max(0, min(x1, w-1))
y1 = max(0, min(y1, h-1))
x2 = max(0, min(x2, w-1))
y2 = max(0, min(y2, h-1))
if x2 <= x1 or y2 <= y1:
return temp_vision_frame
# Extract and resize face region to 96x96
face_region = temp_vision_frame[y1:y2, x1:x2]
close_vision_frame = cv2.resize(face_region, (96, 96))
# Prepare crop frame
close_vision_frame = prepare_crop_frame(close_vision_frame)
# Forward pass
close_vision_frame = forward(temp_audio_frame, close_vision_frame)
# Normalize output
close_vision_frame = normalize_close_frame(close_vision_frame)
# Resize back and paste
close_vision_frame = cv2.resize(close_vision_frame, (x2-x1, y2-y1))
# Simple paste back
result_frame = temp_vision_frame.copy()
result_frame[y1:y2, x1:x2] = close_vision_frame
return result_frame
return temp_vision_frame
except Exception as e:
print(f"[{NAME}] Lip sync error: {e}")
return temp_vision_frame
def process_frame(source_face: Face, reference_face: Face, temp_frame: Frame) -> Frame:
"""Process a single frame"""
try:
# Get all faces in the frame
many_faces = get_many_faces(temp_frame)
if not many_faces:
return temp_frame
# Process each face with lip sync
result_frame = temp_frame
for target_face in many_faces:
# Create dummy audio frame for now
temp_audio_frame = np.zeros((80, 16), dtype=np.float32)
result_frame = sync_lip(target_face, temp_audio_frame, result_frame)
return result_frame
except Exception as e:
print(f"[{NAME}] Error processing frame: {e}")
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], update: Callable[[], None]) -> None:
"""Process multiple frames"""
total_frames = len(temp_frame_paths)
print(f"[{NAME}] Processing {total_frames} frames")
for i, temp_frame_path in enumerate(temp_frame_paths):
try:
# Read frame
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
continue
# Process frame
result_frame = process_frame(None, None, temp_frame)
# Save processed frame
cv2.imwrite(temp_frame_path, result_frame)
# Update progress
if update:
update()
# Progress feedback
if i % 100 == 0:
print(f"[{NAME}] Progress: {i}/{total_frames} frames")
except Exception as e:
print(f"[{NAME}] Error processing frame {i}: {e}")
continue
print(f"[{NAME}] Frame processing completed")
def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""Process a single image"""
try:
print(f"[{NAME}] Processing image: {os.path.basename(target_path)}")
# Read image
target_frame = cv2.imread(target_path)
if target_frame is None:
import shutil
shutil.copy2(target_path, output_path)
return
# Process frame
result_frame = process_frame(None, None, target_frame)
# Save result
cv2.imwrite(output_path, result_frame)
print(f"[{NAME}] Image processing completed")
except Exception as e:
print(f"[{NAME}] Error processing image: {e}")
# Fallback: copy original
import shutil
shutil.copy2(target_path, output_path)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""Process video using the frame processor core"""
frame_processors.process_video(source_path, temp_frame_paths, process_frames) |