File size: 7,045 Bytes
289fb74
 
 
a3d5e88
289fb74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a3d5e88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289fb74
 
 
 
 
 
 
 
 
a3d5e88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289fb74
 
 
a3d5e88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289fb74
 
 
 
 
 
 
a3d5e88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289fb74
 
 
a3d5e88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289fb74
 
 
a3d5e88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
from typing import Any, List, Callable
import cv2
import threading
from pathlib import Path

import SwitcherAI.globals
import SwitcherAI.processors.frame.core as frame_processors
from SwitcherAI import wording
from SwitcherAI.core import update_status
from SwitcherAI.face_analyser import get_many_faces
from SwitcherAI.typing import Frame, Face
from SwitcherAI.utilities import conditional_download, resolve_relative_path, is_image, is_video

FRAME_PROCESSOR = None
THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_ENHANCER'


def get_frame_processor() -> Any:
	global FRAME_PROCESSOR

	with THREAD_LOCK:
		if FRAME_PROCESSOR is None:
			try:
				# Import GFPGAN here to handle import errors gracefully
				from gfpgan.utils import GFPGANer
				
				model_path = resolve_relative_path('../.assets/models/GFPGANv1.4.pth')
				
				# Convert to Path object if it's a string
				if isinstance(model_path, str):
					model_path = Path(model_path)
				
				# Check if model exists
				if not model_path.exists():
					print(f"⚠️ GFPGAN model not found at: {model_path}")
					print("🔄 Attempting to download model...")
					if not pre_check():
						print("❌ Failed to download GFPGAN model")
						return None
				
				FRAME_PROCESSOR = GFPGANer(
					model_path = str(model_path),
					upscale = 1,
					device = frame_processors.get_device()
				)
				print("✅ GFPGAN frame processor initialized")
				
			except ImportError as e:
				print(f"⚠️ GFPGAN not available: {e}")
				print("💡 Install with: pip install gfpgan")
				FRAME_PROCESSOR = None
			except Exception as e:
				print(f"⚠️ Failed to initialize GFPGAN: {e}")
				FRAME_PROCESSOR = None
				
	return FRAME_PROCESSOR


def clear_frame_processor() -> None:
	global FRAME_PROCESSOR
	FRAME_PROCESSOR = None


def pre_check() -> bool:
	try:
		download_directory_path = resolve_relative_path('../.assets/models')
		
		# Ensure download directory exists
		if isinstance(download_directory_path, str):
			download_directory_path = Path(download_directory_path)
		download_directory_path.mkdir(parents=True, exist_ok=True)
		
		# Download GFPGAN model
		model_urls = [
			'https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth'
		]
		
		conditional_download(str(download_directory_path), model_urls)
		
		# Verify the model was downloaded
		model_path = download_directory_path / 'GFPGANv1.4.pth'
		if model_path.exists() and model_path.stat().st_size > 0:
			print(f"✅ GFPGAN model verified: {model_path.stat().st_size / (1024*1024):.1f}MB")
			return True
		else:
			print("❌ GFPGAN model download failed or file is empty")
			return False
			
	except Exception as e:
		print(f"❌ GFPGAN pre-check failed: {e}")
		return False


def pre_process() -> bool:
	try:
		# Check if we have valid input
		if not is_image(SwitcherAI.globals.target_path) and not is_video(SwitcherAI.globals.target_path):
			update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
			return False
		
		# Check if GFPGAN is available
		processor = get_frame_processor()
		if processor is None:
			print("⚠️ GFPGAN not available, face enhancement will be skipped")
			return False
			
		return True
		
	except Exception as e:
		print(f"⚠️ Face enhancer pre-process failed: {e}")
		return False


def post_process() -> None:
	clear_frame_processor()


def enhance_face(target_face: Face, temp_frame: Frame) -> Frame:
	"""Enhanced face enhancement with error handling"""
	try:
		processor = get_frame_processor()
		if processor is None:
			print("⚠️ GFPGAN processor not available, returning original frame")
			return temp_frame
		
		start_x, start_y, end_x, end_y = map(int, target_face['bbox'])
		padding_x = int((end_x - start_x) * 0.5)
		padding_y = int((end_y - start_y) * 0.5)
		start_x = max(0, start_x - padding_x)
		start_y = max(0, start_y - padding_y)
		end_x = max(0, end_x + padding_x)
		end_y = max(0, end_y + padding_y)
		
		# Ensure coordinates are within frame bounds
		height, width = temp_frame.shape[:2]
		end_x = min(end_x, width)
		end_y = min(end_y, height)
		
		crop_frame = temp_frame[start_y:end_y, start_x:end_x]
		
		if crop_frame.size > 0:
			with THREAD_SEMAPHORE:
				try:
					_, _, enhanced_crop = processor.enhance(
						crop_frame,
						paste_back = True
					)
					temp_frame[start_y:end_y, start_x:end_x] = enhanced_crop
				except Exception as e:
					print(f"⚠️ Face enhancement failed: {e}")
					# Return original frame if enhancement fails
					pass
					
	except Exception as e:
		print(f"⚠️ Error in enhance_face: {e}")
		
	return temp_frame


def process_frame(source_face: Face, reference_face: Face, temp_frame: Frame) -> Frame:
	"""Process frame with enhanced error handling"""
	try:
		# Check if processor is available
		processor = get_frame_processor()
		if processor is None:
			print("⚠️ Face enhancer not available, skipping enhancement")
			return temp_frame
		
		many_faces = get_many_faces(temp_frame)
		if many_faces:
			for target_face in many_faces:
				temp_frame = enhance_face(target_face, temp_frame)
				
	except Exception as e:
		print(f"⚠️ Error in process_frame: {e}")
		
	return temp_frame


def process_frames(source_path: str, temp_frame_paths: List[str], update: Callable[[], None]) -> None:
	"""Process multiple frames with progress updates"""
	try:
		processor = get_frame_processor()
		if processor is None:
			print("⚠️ Face enhancer not available, skipping frame enhancement")
			if update:
				update()
			return
		
		for temp_frame_path in temp_frame_paths:
			try:
				temp_frame = cv2.imread(temp_frame_path)
				if temp_frame is not None:
					result_frame = process_frame(None, None, temp_frame)
					cv2.imwrite(temp_frame_path, result_frame)
				else:
					print(f"⚠️ Failed to read frame: {temp_frame_path}")
					
			except Exception as e:
				print(f"⚠️ Error processing frame {temp_frame_path}: {e}")
				
			if update:
				update()
				
	except Exception as e:
		print(f"⚠️ Error in process_frames: {e}")


def process_image(source_path: str, target_path: str, output_path: str) -> None:
	"""Process single image with error handling"""
	try:
		processor = get_frame_processor()
		if processor is None:
			print("⚠️ Face enhancer not available, copying original image")
			import shutil
			shutil.copy2(target_path, output_path)
			return
		
		target_frame = cv2.imread(target_path)
		if target_frame is not None:
			result_frame = process_frame(None, None, target_frame)
			cv2.imwrite(output_path, result_frame)
		else:
			print(f"⚠️ Failed to read image: {target_path}")
			
	except Exception as e:
		print(f"⚠️ Error in process_image: {e}")


def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
	"""Process video frames"""
	try:
		SwitcherAI.processors.frame.core.process_video(None, temp_frame_paths, process_frames)
	except Exception as e:
		print(f"⚠️ Error in process_video: {e}")