|
import numpy as np |
|
import gradio as gr |
|
import roop.globals |
|
from roop.core import ( |
|
start, |
|
decode_execution_providers, |
|
suggest_max_memory, |
|
suggest_execution_threads, |
|
) |
|
from roop.processors.frame.core import get_frame_processors_modules |
|
from roop.utilities import normalize_output_path |
|
import os |
|
from PIL import Image |
|
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, padding |
|
from cryptography.hazmat.primitives import serialization, hashes |
|
from cryptography.hazmat.backends import default_backend |
|
from cryptography.hazmat.primitives.asymmetric import utils |
|
import base64 |
|
import json |
|
import datetime |
|
import pylru |
|
token_queue = pylru.lrucache(1000) |
|
def load_public_key_from_file(file_path): |
|
with open(file_path, "rb") as key_file: |
|
public_key = serialization.load_pem_public_key( |
|
key_file.read(), |
|
backend=default_backend() |
|
) |
|
return public_key |
|
|
|
def verify_signature(public_key, data, signature): |
|
""" |
|
Verify a signature with a public key. |
|
Converts the data to bytes if it's not already in byte format. |
|
""" |
|
|
|
if isinstance(data, str): |
|
data = data.encode('utf-8') |
|
|
|
try: |
|
|
|
public_key.verify( |
|
signature, |
|
data, |
|
padding.PSS( |
|
mgf=padding.MGF1(hashes.SHA256()), |
|
salt_length=padding.PSS.MAX_LENGTH |
|
), |
|
hashes.SHA256() |
|
) |
|
return True |
|
except Exception as e: |
|
print("Verification failed:", e) |
|
return False |
|
|
|
public_key = load_public_key_from_file("./nsfwais.pubkey.pem") |
|
|
|
def swap_face(source_file, target_file, doFaceEnhancer, skey, key): |
|
skey = json.loads(skey) |
|
|
|
signature = base64.b64decode(skey["s"]) |
|
if not skey or not key or not verify_signature(public_key, skey["t"] + "_" + key, signature): |
|
raise Exception("verify authkey failed.") |
|
timestamp_requested = float(skey["t"]) |
|
timestamp_now = int(datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).timestamp()) |
|
if timestamp_now - timestamp_requested > 600: |
|
raise Exception(f"authkey timeout, {timestamp_now - timestamp_requested}") |
|
print(f"authkey pass, {timestamp_now - timestamp_requested}") |
|
|
|
token = skey["t"] |
|
token_cache = token_queue.get(token, None) |
|
if token_cache: |
|
raise Exception(f"duplicated token, {token_cache}") |
|
token_queue[token] = True |
|
source_path = "input.jpg" |
|
target_path = "target.jpg" |
|
|
|
source_image = Image.fromarray(source_file) |
|
source_image.save(source_path) |
|
target_image = Image.fromarray(target_file) |
|
target_image.save(target_path) |
|
|
|
print("source_path: ", source_path) |
|
print("target_path: ", target_path) |
|
|
|
roop.globals.source_path = source_path |
|
roop.globals.target_path = target_path |
|
output_path = "output.jpg" |
|
roop.globals.output_path = normalize_output_path( |
|
roop.globals.source_path, roop.globals.target_path, output_path |
|
) |
|
if doFaceEnhancer: |
|
roop.globals.frame_processors = ["face_swapper", "face_enhancer"] |
|
else: |
|
roop.globals.frame_processors = ["face_swapper"] |
|
roop.globals.headless = True |
|
roop.globals.keep_fps = True |
|
roop.globals.keep_audio = True |
|
roop.globals.keep_frames = False |
|
roop.globals.many_faces = True |
|
roop.globals.video_encoder = "libx264" |
|
roop.globals.video_quality = 18 |
|
roop.globals.max_memory = suggest_max_memory() |
|
roop.globals.execution_providers = decode_execution_providers(["cuda"]) |
|
roop.globals.execution_threads = suggest_execution_threads() |
|
|
|
print( |
|
"start process", |
|
roop.globals.source_path, |
|
roop.globals.target_path, |
|
roop.globals.output_path, |
|
) |
|
|
|
for frame_processor in get_frame_processors_modules( |
|
roop.globals.frame_processors |
|
): |
|
if not frame_processor.pre_check(): |
|
return |
|
|
|
start() |
|
return output_path |
|
|
|
app = gr.Blocks() |
|
|
|
with app: |
|
gr.Interface( |
|
fn=swap_face, |
|
inputs=[gr.Image(), gr.Image(), gr.Checkbox(label="face_enhancer?", info="do face enhancer?"), gr.Textbox(visible=False), gr.Textbox(visible=False)], |
|
outputs="image" |
|
) |
|
app.queue(max_size=5,status_update_rate=5.0) |
|
app.launch(max_threads=1) |
|
|