Spaces:
Paused
Paused
File size: 7,045 Bytes
289fb74 a3d5e88 289fb74 a3d5e88 289fb74 a3d5e88 289fb74 a3d5e88 289fb74 a3d5e88 289fb74 a3d5e88 289fb74 a3d5e88 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
from typing import Any, List, Callable
import cv2
import threading
from pathlib import Path
import SwitcherAI.globals
import SwitcherAI.processors.frame.core as frame_processors
from SwitcherAI import wording
from SwitcherAI.core import update_status
from SwitcherAI.face_analyser import get_many_faces
from SwitcherAI.typing import Frame, Face
from SwitcherAI.utilities import conditional_download, resolve_relative_path, is_image, is_video
FRAME_PROCESSOR = None
THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_ENHANCER'
def get_frame_processor() -> Any:
global FRAME_PROCESSOR
with THREAD_LOCK:
if FRAME_PROCESSOR is None:
try:
# Import GFPGAN here to handle import errors gracefully
from gfpgan.utils import GFPGANer
model_path = resolve_relative_path('../.assets/models/GFPGANv1.4.pth')
# Convert to Path object if it's a string
if isinstance(model_path, str):
model_path = Path(model_path)
# Check if model exists
if not model_path.exists():
print(f"⚠️ GFPGAN model not found at: {model_path}")
print("🔄 Attempting to download model...")
if not pre_check():
print("❌ Failed to download GFPGAN model")
return None
FRAME_PROCESSOR = GFPGANer(
model_path = str(model_path),
upscale = 1,
device = frame_processors.get_device()
)
print("✅ GFPGAN frame processor initialized")
except ImportError as e:
print(f"⚠️ GFPGAN not available: {e}")
print("💡 Install with: pip install gfpgan")
FRAME_PROCESSOR = None
except Exception as e:
print(f"⚠️ Failed to initialize GFPGAN: {e}")
FRAME_PROCESSOR = None
return FRAME_PROCESSOR
def clear_frame_processor() -> None:
global FRAME_PROCESSOR
FRAME_PROCESSOR = None
def pre_check() -> bool:
try:
download_directory_path = resolve_relative_path('../.assets/models')
# Ensure download directory exists
if isinstance(download_directory_path, str):
download_directory_path = Path(download_directory_path)
download_directory_path.mkdir(parents=True, exist_ok=True)
# Download GFPGAN model
model_urls = [
'https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth'
]
conditional_download(str(download_directory_path), model_urls)
# Verify the model was downloaded
model_path = download_directory_path / 'GFPGANv1.4.pth'
if model_path.exists() and model_path.stat().st_size > 0:
print(f"✅ GFPGAN model verified: {model_path.stat().st_size / (1024*1024):.1f}MB")
return True
else:
print("❌ GFPGAN model download failed or file is empty")
return False
except Exception as e:
print(f"❌ GFPGAN pre-check failed: {e}")
return False
def pre_process() -> bool:
try:
# Check if we have valid input
if not is_image(SwitcherAI.globals.target_path) and not is_video(SwitcherAI.globals.target_path):
update_status(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
return False
# Check if GFPGAN is available
processor = get_frame_processor()
if processor is None:
print("⚠️ GFPGAN not available, face enhancement will be skipped")
return False
return True
except Exception as e:
print(f"⚠️ Face enhancer pre-process failed: {e}")
return False
def post_process() -> None:
clear_frame_processor()
def enhance_face(target_face: Face, temp_frame: Frame) -> Frame:
"""Enhanced face enhancement with error handling"""
try:
processor = get_frame_processor()
if processor is None:
print("⚠️ GFPGAN processor not available, returning original frame")
return temp_frame
start_x, start_y, end_x, end_y = map(int, target_face['bbox'])
padding_x = int((end_x - start_x) * 0.5)
padding_y = int((end_y - start_y) * 0.5)
start_x = max(0, start_x - padding_x)
start_y = max(0, start_y - padding_y)
end_x = max(0, end_x + padding_x)
end_y = max(0, end_y + padding_y)
# Ensure coordinates are within frame bounds
height, width = temp_frame.shape[:2]
end_x = min(end_x, width)
end_y = min(end_y, height)
crop_frame = temp_frame[start_y:end_y, start_x:end_x]
if crop_frame.size > 0:
with THREAD_SEMAPHORE:
try:
_, _, enhanced_crop = processor.enhance(
crop_frame,
paste_back = True
)
temp_frame[start_y:end_y, start_x:end_x] = enhanced_crop
except Exception as e:
print(f"⚠️ Face enhancement failed: {e}")
# Return original frame if enhancement fails
pass
except Exception as e:
print(f"⚠️ Error in enhance_face: {e}")
return temp_frame
def process_frame(source_face: Face, reference_face: Face, temp_frame: Frame) -> Frame:
"""Process frame with enhanced error handling"""
try:
# Check if processor is available
processor = get_frame_processor()
if processor is None:
print("⚠️ Face enhancer not available, skipping enhancement")
return temp_frame
many_faces = get_many_faces(temp_frame)
if many_faces:
for target_face in many_faces:
temp_frame = enhance_face(target_face, temp_frame)
except Exception as e:
print(f"⚠️ Error in process_frame: {e}")
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], update: Callable[[], None]) -> None:
"""Process multiple frames with progress updates"""
try:
processor = get_frame_processor()
if processor is None:
print("⚠️ Face enhancer not available, skipping frame enhancement")
if update:
update()
return
for temp_frame_path in temp_frame_paths:
try:
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is not None:
result_frame = process_frame(None, None, temp_frame)
cv2.imwrite(temp_frame_path, result_frame)
else:
print(f"⚠️ Failed to read frame: {temp_frame_path}")
except Exception as e:
print(f"⚠️ Error processing frame {temp_frame_path}: {e}")
if update:
update()
except Exception as e:
print(f"⚠️ Error in process_frames: {e}")
def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""Process single image with error handling"""
try:
processor = get_frame_processor()
if processor is None:
print("⚠️ Face enhancer not available, copying original image")
import shutil
shutil.copy2(target_path, output_path)
return
target_frame = cv2.imread(target_path)
if target_frame is not None:
result_frame = process_frame(None, None, target_frame)
cv2.imwrite(output_path, result_frame)
else:
print(f"⚠️ Failed to read image: {target_path}")
except Exception as e:
print(f"⚠️ Error in process_image: {e}")
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""Process video frames"""
try:
SwitcherAI.processors.frame.core.process_video(None, temp_frame_paths, process_frames)
except Exception as e:
print(f"⚠️ Error in process_video: {e}") |