File size: 11,854 Bytes
289fb74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
from typing import Any, List, Callable
import cv2
import threading
import numpy as np
import os

# Environment fixes
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'
os.environ['NO_ALBUMENTATIONS_UPDATE'] = '1'

import SwitcherAI.globals
import SwitcherAI.processors.frame.core as frame_processors
from SwitcherAI import wording
from SwitcherAI.core import update_status
from SwitcherAI.face_analyser import get_many_faces, get_one_face
from SwitcherAI.typing import Frame, Face
from SwitcherAI.utilities import conditional_download, resolve_relative_path, is_image, is_video

# Global variables matching the pattern
FRAME_PROCESSOR = None
THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.LIP_SYNCER'

def get_frame_processor() -> Any:
    """Get the lip sync processor - using ONNX Runtime like FaceFusion"""
    global FRAME_PROCESSOR

    with THREAD_LOCK:
        if FRAME_PROCESSOR is None:
            try:
                # Get the model name from globals
                model_name = getattr(SwitcherAI.globals, 'lip_syncer_model', 'wav2lip_gan_96')
                model_path = resolve_relative_path(f'../.assets/models/{model_name}.onnx')
                
                print(f"[{NAME}] Loading model: {model_path}")
                
                if os.path.exists(model_path):
                    # Load ONNX model like FaceFusion does
                    import onnxruntime
                    
                    providers = getattr(SwitcherAI.globals, 'execution_providers', ['CPUExecutionProvider'])
                    FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers=providers)
                    
                    print(f"[{NAME}] ONNX model loaded successfully")
                    
                else:
                    print(f"[{NAME}] Model file not found: {model_path}")
                    FRAME_PROCESSOR = None
                    
            except ImportError:
                print(f"[{NAME}] onnxruntime not available, using passthrough mode")
                FRAME_PROCESSOR = None
            except Exception as e:
                print(f"[{NAME}] Error loading ONNX model: {e}")
                FRAME_PROCESSOR = None
    
    return FRAME_PROCESSOR

def clear_frame_processor() -> None:
    """Clear the frame processor"""
    global FRAME_PROCESSOR
    FRAME_PROCESSOR = None

def pre_check() -> bool:
    """Pre-check for lip syncer requirements"""
    print(f"[{NAME}] Pre-check starting...")
    
    try:
        # Check if we need to download models
        download_directory_path = resolve_relative_path('../.assets/models')
        
        # Get model name from globals
        model_name = getattr(SwitcherAI.globals, 'lip_syncer_model', 'wav2lip_gan_96')
        model_path = os.path.join(download_directory_path, f'{model_name}.onnx')
        
        if not os.path.exists(model_path):
            print(f"[{NAME}] Model not found: {model_path}")
            
            # Model download URLs
            model_urls = {
                'wav2lip_96': ['Awwfuck.com'],
                'wav2lip_gan_96': ['Awwfuck.com']
            }
            
            if model_name in model_urls:
                print(f"[{NAME}] Attempting to download {model_name}")
                conditional_download(download_directory_path, model_urls[model_name])
        
        print(f"[{NAME}] Pre-check passed")
        return True
        
    except Exception as e:
        print(f"[{NAME}] Pre-check error: {e}")
        return True

def pre_process() -> bool:
    """Pre-process initialization"""
    print(f"[{NAME}] Pre-processing...")
    
    # Check target type like FaceFusion does
    if not is_image(SwitcherAI.globals.target_path) and not is_video(SwitcherAI.globals.target_path):
        update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
        return False
    
    print(f"[{NAME}] Pre-processing completed")
    return True

def post_process() -> None:
    """Post-process cleanup"""
    clear_frame_processor()
    print(f"[{NAME}] Post-processing completed")

def prepare_audio_frame(audio_frame: np.ndarray) -> np.ndarray:
    """Prepare audio frame like FaceFusion - convert mel spectrogram properly"""
    # FaceFusion audio preprocessing
    audio_frame = np.maximum(np.exp(-5 * np.log(10)), audio_frame)
    audio_frame = np.log10(audio_frame) * 1.6 + 3.2
    audio_frame = audio_frame.clip(-4, 4).astype(np.float32)
    audio_frame = np.expand_dims(audio_frame, axis=(0, 1))
    return audio_frame

def prepare_crop_frame(crop_vision_frame: np.ndarray) -> np.ndarray:
    """Prepare crop frame like FaceFusion"""
    crop_vision_frame = np.expand_dims(crop_vision_frame, axis=0)
    prepare_vision_frame = crop_vision_frame.copy()
    prepare_vision_frame[:, 48:] = 0  # Mask bottom half
    crop_vision_frame = np.concatenate((prepare_vision_frame, crop_vision_frame), axis=3)
    crop_vision_frame = crop_vision_frame.transpose(0, 3, 1, 2).astype('float32') / 255.0
    return crop_vision_frame

def normalize_close_frame(crop_vision_frame: np.ndarray) -> np.ndarray:
    """Normalize frame like FaceFusion"""
    crop_vision_frame = crop_vision_frame[0].transpose(1, 2, 0)
    crop_vision_frame = crop_vision_frame.clip(0, 1) * 255
    crop_vision_frame = crop_vision_frame.astype(np.uint8)
    return crop_vision_frame

def forward(temp_audio_frame: np.ndarray, close_vision_frame: np.ndarray) -> np.ndarray:
    """Forward pass through model like FaceFusion"""
    lip_syncer = get_frame_processor()
    if lip_syncer is None:
        return close_vision_frame
    
    try:
        with THREAD_SEMAPHORE:
            # Get input names from the model
            input_names = [inp.name for inp in lip_syncer.get_inputs()]
            
            # Create input dictionary - FaceFusion uses 'source' and 'target'
            inputs = {}
            for name in input_names:
                if 'source' in name.lower() or 'audio' in name.lower() or 'mel' in name.lower():
                    inputs[name] = temp_audio_frame
                elif 'target' in name.lower() or 'video' in name.lower() or 'frame' in name.lower():
                    inputs[name] = close_vision_frame
            
            # Run inference
            close_vision_frame = lip_syncer.run(None, inputs)[0]
            
        return close_vision_frame
        
    except Exception as e:
        print(f"[{NAME}] Forward pass error: {e}")
        return close_vision_frame

def sync_lip(target_face: Face, temp_audio_frame: np.ndarray, temp_vision_frame: Frame) -> Frame:
    """Main lip sync function following FaceFusion's approach"""
    try:
        # For now, create dummy audio frame if none provided
        if temp_audio_frame is None:
            # Create empty mel spectrogram (80 features x 16 frames)
            temp_audio_frame = np.zeros((80, 16), dtype=np.float32)
        
        # Prepare audio frame
        temp_audio_frame = prepare_audio_frame(temp_audio_frame)
        
        # Extract face region using face landmarks
        if hasattr(target_face, 'bbox'):
            bbox = target_face.bbox
            x1, y1, x2, y2 = map(int, bbox)
            
            # Ensure coordinates are within frame bounds
            h, w = temp_vision_frame.shape[:2]
            x1 = max(0, min(x1, w-1))
            y1 = max(0, min(y1, h-1))
            x2 = max(0, min(x2, w-1))
            y2 = max(0, min(y2, h-1))
            
            if x2 <= x1 or y2 <= y1:
                return temp_vision_frame
            
            # Extract and resize face region to 96x96
            face_region = temp_vision_frame[y1:y2, x1:x2]
            close_vision_frame = cv2.resize(face_region, (96, 96))
            
            # Prepare crop frame
            close_vision_frame = prepare_crop_frame(close_vision_frame)
            
            # Forward pass
            close_vision_frame = forward(temp_audio_frame, close_vision_frame)
            
            # Normalize output
            close_vision_frame = normalize_close_frame(close_vision_frame)
            
            # Resize back and paste
            close_vision_frame = cv2.resize(close_vision_frame, (x2-x1, y2-y1))
            
            # Simple paste back
            result_frame = temp_vision_frame.copy()
            result_frame[y1:y2, x1:x2] = close_vision_frame
            
            return result_frame
        
        return temp_vision_frame
        
    except Exception as e:
        print(f"[{NAME}] Lip sync error: {e}")
        return temp_vision_frame

def process_frame(source_face: Face, reference_face: Face, temp_frame: Frame) -> Frame:
    """Process a single frame"""
    try:
        # Get all faces in the frame
        many_faces = get_many_faces(temp_frame)
        
        if not many_faces:
            return temp_frame
        
        # Process each face with lip sync
        result_frame = temp_frame
        
        for target_face in many_faces:
            # Create dummy audio frame for now
            temp_audio_frame = np.zeros((80, 16), dtype=np.float32)
            result_frame = sync_lip(target_face, temp_audio_frame, result_frame)
        
        return result_frame
        
    except Exception as e:
        print(f"[{NAME}] Error processing frame: {e}")
        return temp_frame

def process_frames(source_path: str, temp_frame_paths: List[str], update: Callable[[], None]) -> None:
    """Process multiple frames"""
    total_frames = len(temp_frame_paths)
    print(f"[{NAME}] Processing {total_frames} frames")
    
    for i, temp_frame_path in enumerate(temp_frame_paths):
        try:
            # Read frame
            temp_frame = cv2.imread(temp_frame_path)
            if temp_frame is None:
                continue
            
            # Process frame
            result_frame = process_frame(None, None, temp_frame)
            
            # Save processed frame
            cv2.imwrite(temp_frame_path, result_frame)
            
            # Update progress
            if update:
                update()
            
            # Progress feedback
            if i % 100 == 0:
                print(f"[{NAME}] Progress: {i}/{total_frames} frames")
                
        except Exception as e:
            print(f"[{NAME}] Error processing frame {i}: {e}")
            continue
    
    print(f"[{NAME}] Frame processing completed")

def process_image(source_path: str, target_path: str, output_path: str) -> None:
    """Process a single image"""
    try:
        print(f"[{NAME}] Processing image: {os.path.basename(target_path)}")
        
        # Read image
        target_frame = cv2.imread(target_path)
        if target_frame is None:
            import shutil
            shutil.copy2(target_path, output_path)
            return
        
        # Process frame
        result_frame = process_frame(None, None, target_frame)
        
        # Save result
        cv2.imwrite(output_path, result_frame)
        print(f"[{NAME}] Image processing completed")
        
    except Exception as e:
        print(f"[{NAME}] Error processing image: {e}")
        # Fallback: copy original
        import shutil
        shutil.copy2(target_path, output_path)

def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
    """Process video using the frame processor core"""
    frame_processors.process_video(source_path, temp_frame_paths, process_frames)