Spaces:
Runtime error
Runtime error
import os | |
import sys | |
import cv2 | |
import re | |
from shared import path_manager | |
import modules.async_worker as worker | |
from tqdm import tqdm | |
from modules.util import generate_temp_filename | |
from PIL import Image | |
import imageio.v3 as iio | |
import numpy as np | |
import torch | |
import insightface | |
from importlib.abc import MetaPathFinder, Loader | |
from importlib.util import spec_from_loader, module_from_spec | |
class ImportRedirector(MetaPathFinder): | |
def __init__(self, redirect_map): | |
self.redirect_map = redirect_map | |
def find_spec(self, fullname, path, target=None): | |
if fullname in self.redirect_map: | |
return spec_from_loader(fullname, ImportLoader(self.redirect_map[fullname])) | |
return None | |
class ImportLoader(Loader): | |
def __init__(self, redirect): | |
self.redirect = redirect | |
def create_module(self, spec): | |
return None | |
def exec_module(self, module): | |
import importlib | |
redirected = importlib.import_module(self.redirect) | |
module.__dict__.update(redirected.__dict__) | |
# Set up the redirection | |
redirect_map = { | |
'torchvision.transforms.functional_tensor': 'torchvision.transforms.functional' | |
} | |
sys.meta_path.insert(0, ImportRedirector(redirect_map)) | |
import gfpgan | |
from facexlib.utils.face_restoration_helper import FaceRestoreHelper | |
# Requirements: | |
# insightface==0.7.3 | |
# onnxruntime-gpu==1.16.1 | |
# gfpgan==1.3.8 | |
# | |
# add to settings/powerup.json | |
# | |
# "Faceswap": { | |
# "type": "faceswap" | |
# } | |
# | |
# Models in models/faceswap/ | |
# https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth | |
# and inswapper_128.onnx from where you can find it | |
class pipeline: | |
pipeline_type = ["faceswap"] | |
analyser_model = None | |
analyser_hash = "" | |
swapper_model = None | |
swapper_hash = "" | |
gfpgan_model = None | |
def parse_gen_data(self, gen_data): | |
gen_data["original_image_number"] = gen_data["image_number"] | |
gen_data["image_number"] = 1 | |
gen_data["show_preview"] = False | |
return gen_data | |
def load_base_model(self, name): | |
model_name = "inswapper_128.onnx" | |
if not self.swapper_hash == model_name: | |
print(f"Loading swapper model: {model_name}") | |
model_path = os.path.join(path_manager.model_paths["faceswap_path"], model_name) | |
try: | |
with open(os.devnull, "w") as sys.stdout: | |
self.swapper_model = insightface.model_zoo.get_model( | |
model_path, | |
download=False, | |
download_zip=False, | |
) | |
self.swapper_hash = model_name | |
sys.stdout = sys.__stdout__ | |
except: | |
print(f"Failed loading model! {model_path}") | |
model_name = "buffalo_l" | |
det_thresh = 0.5 | |
if not self.analyser_hash == model_name: | |
print(f"Loading analyser model: {model_name}") | |
try: | |
with open(os.devnull, "w") as sys.stdout: | |
self.analyser_model = insightface.app.FaceAnalysis(name=model_name) | |
self.analyser_model.prepare( | |
ctx_id=0, det_thresh=det_thresh, det_size=(640, 640) | |
) | |
self.analyser_hash = model_name | |
sys.stdout = sys.__stdout__ | |
except: | |
print(f"Failed loading model! {model_name}") | |
def load_gfpgan_model(self): | |
if self.gfpgan_model is None: | |
channel_multiplier = 2 | |
model_name = "GFPGANv1.4.pth" | |
model_path = os.path.join(path_manager.model_paths["faceswap_path"], model_name) | |
# https://github.com/TencentARC/GFPGAN/blob/master/inference_gfpgan.py | |
self.gfpgan_model = gfpgan.GFPGANer | |
self.gfpgan_model.bg_upsampler = None | |
# initialize model | |
self.gfpgan_model.device = torch.device( | |
"cuda" if torch.cuda.is_available() else "cpu" | |
) | |
upscale = 2 | |
self.gfpgan_model.face_helper = FaceRestoreHelper( | |
upscale, | |
det_model="retinaface_resnet50", | |
model_rootpath=path_manager.model_paths["faceswap_path"], | |
) | |
# face_size=512, | |
# crop_ratio=(1, 1), | |
# save_ext='png', | |
# use_parse=True, | |
# device=self.device, | |
self.gfpgan_model.gfpgan = gfpgan.GFPGANv1Clean( | |
out_size=512, | |
num_style_feat=512, | |
channel_multiplier=channel_multiplier, | |
decoder_load_path=None, | |
fix_decoder=False, | |
num_mlp=8, | |
input_is_latent=True, | |
different_w=True, | |
narrow=1, | |
sft_half=True, | |
) | |
loadnet = torch.load(model_path) | |
if "params_ema" in loadnet: | |
keyname = "params_ema" | |
else: | |
keyname = "params" | |
self.gfpgan_model.gfpgan.load_state_dict(loadnet[keyname], strict=True) | |
self.gfpgan_model.gfpgan.eval() | |
self.gfpgan_model.gfpgan = self.gfpgan_model.gfpgan.to( | |
self.gfpgan_model.device | |
) | |
def load_keywords(self, lora): | |
return "" | |
def load_loras(self, loras): | |
return | |
def refresh_controlnet(self, name=None): | |
return | |
def clean_prompt_cond_caches(self): | |
return | |
def swap_faces(self, original_image, input_faces, out_faces): | |
idx = 0 | |
for out_face in out_faces: | |
original_image = self.swapper_model.get( | |
original_image, | |
out_face, | |
input_faces[idx % len(input_faces)], | |
paste_back=True, | |
) | |
idx += 1 | |
return original_image | |
def restore_faces(self, image): | |
self.load_gfpgan_model() | |
image_bgr = image[:, :, ::-1] | |
_cropped_faces, _restored_faces, gfpgan_output_bgr = self.gfpgan_model.enhance( | |
self.gfpgan_model, | |
image_bgr, | |
has_aligned=False, | |
only_center_face=False, | |
paste_back=True, | |
weight=0.5, | |
) | |
image = gfpgan_output_bgr[:, :, ::-1] | |
return image | |
def process( | |
self, | |
gen_data=None, | |
callback=None, | |
): | |
worker.add_result( | |
gen_data["task_id"], | |
"preview", | |
(-1, f"Generating ...", None) | |
) | |
input_image = gen_data["input_image"] | |
input_image = cv2.cvtColor(np.asarray(input_image), cv2.COLOR_RGB2BGR) | |
input_faces = sorted( | |
self.analyser_model.get(input_image), key=lambda x: x.bbox[0] | |
) | |
prompt = gen_data["prompt"].strip() | |
if re.fullmatch("https?://.*\.gif", prompt, re.IGNORECASE) is not None: | |
x = iio.immeta(prompt) | |
duration = x["duration"] | |
loop = x["loop"] | |
gif = cv2.VideoCapture(prompt) | |
# Swap | |
in_imgs = [] | |
out_imgs = [] | |
while True: | |
ret, frame = gif.read() | |
if not ret: | |
break | |
in_imgs.append(frame) | |
with tqdm(total=len(in_imgs), desc="Groop", unit="frames") as progress: | |
i=0 | |
steps=len(in_imgs) | |
for frame in in_imgs: | |
out_faces = sorted( | |
self.analyser_model.get(frame), key=lambda x: x.bbox[0] | |
) | |
frame = self.swap_faces(frame, input_faces, out_faces) | |
out_imgs.append( | |
Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
) | |
i+=1 | |
callback(i, 0, 0, steps, out_imgs[-1]) | |
progress.update(1) | |
images = generate_temp_filename( | |
folder=path_manager.model_paths["temp_outputs_path"], extension="gif" | |
) | |
os.makedirs(os.path.dirname(images), exist_ok=True) | |
out_imgs[0].save( | |
images, | |
save_all=True, | |
append_images=out_imgs[1:], | |
optimize=True, | |
duration=duration, | |
loop=loop, | |
) | |
else: | |
output_image = cv2.imread(gen_data["main_view"]) | |
if output_image is None: | |
images = "html/error.png" | |
else: | |
output_faces = sorted( | |
self.analyser_model.get(output_image), key=lambda x: x.bbox[0] | |
) | |
result_image = self.swap_faces(output_image, input_faces, output_faces) | |
result_image = self.restore_faces(result_image) | |
images = Image.fromarray(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB)) | |
return [images] | |