Spaces:
Running
Running
File size: 3,647 Bytes
8fb085a 08814b6 dc8b067 08814b6 8fb085a 08814b6 ddf360c 8c9e7c2 8fb085a ddf360c 8fb085a ddf360c 8c9e7c2 8fb085a ddf360c 8c9e7c2 ddf360c 8c9e7c2 8fb085a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
import os
import base64
import time
from fastapi import FastAPI, APIRouter, Body
from facefusion.api.model import Params, print_globals
import facefusion.globals as globals
import facefusion.processors.frame.globals as frame_processors_globals
from facefusion import core
from facefusion.utilities import normalize_output_path
app = FastAPI()
router = APIRouter()
app_dir = os.path.dirname(__file__)
work_dir = os.path.dirname(app_dir)+'/'
print("工作目录",work_dir)
@app.get("/")
def root():
return {"API": "hello"}
@router.post("/")
async def process_frames(params: Params = Body(...)) -> dict:
# delete_files_in_directory(f"{work_dir}temp/source")
# delete_files_in_directory(f"{work_dir}temp/target")
# delete_files_in_directory(f"{work_dir}temp/output")
start_time = time.time()
if not (params.source or params.target):
return {"message": "Source image or path is required"}
update_global_variables(params)
globals.source_path = f"{work_dir+params.user_id}source-{int(time.time())}.{params.source_type}"
globals.target_path = f"{work_dir+params.user_id}target-{int(time.time())}.{params.target_type}"
globals.output_path = f"{work_dir+params.user_id}output-{int(time.time())}.{params.target_type}"
# globals.source_path = f"{work_dir+params.user_id}source.{params.source_type}"
# globals.target_path = f"{work_dir+params.user_id}target.{params.target_type}"
# globals.output_path = f"{work_dir+params.user_id}output.{params.target_type}"
print(globals.output_path)
print_globals()
save_file(globals.source_path, params.source)
save_file(globals.target_path, params.target)
end_time = time.time()
print("程序处理时间:", end_time - start_time)
try:
core.api_conditional_process()
except Exception as e:
print(e)
return {"message": "Error"}
output = image_to_base64_str(globals.output_path)
# delete_files_in_directory(globals.source_path)
# delete_files_in_directory(globals.target_path)
# delete_files_in_directory(globals.output_path)
end_time2 = time.time()
print("生成时间:", end_time2 - end_time)
os.remove(globals.source_path)
os.remove(globals.target_path)
os.remove(globals.output_path)
end_time3 = time.time()
print("删除文件时间:", end_time3 - end_time2)
print("最终运行时间:", end_time3 - start_time)
return {'output': output}
def update_global_variables(params: Params):
for var_name, value in vars(params).items():
if value is not None:
if hasattr(globals, var_name):
setattr(globals, var_name, value)
elif hasattr(frame_processors_globals, var_name):
setattr(frame_processors_globals, var_name, value)
def image_to_base64_str(image_path):
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read())
return encoded_string.decode('utf-8')
def save_file(file_path: str, encoded_image: str):
data = base64.b64decode(encoded_image)
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
with open(file_path, "wb") as file:
file.write(data)
def delete_files_in_directory(directory_path):
for filename in os.listdir(directory_path):
file_path = os.path.join(directory_path, filename)
if os.path.isfile(file_path):
os.remove(file_path)
print(f"Deleted {file_path}")
app.include_router(router)
def launch():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860) |