Spaces:
Paused
Paused
| import cv2 | |
| import os | |
| import insightface | |
| import onnxruntime | |
| from tqdm import tqdm | |
| import shutil | |
| import gfpgan | |
| import gradio as gr | |
| import subprocess | |
| from PIL import Image | |
| progress = 0 | |
| def video_to_frames(video_path, output_folder): | |
| vidcap = cv2.VideoCapture(video_path) | |
| fps = vidcap.get(cv2.CAP_PROP_FPS) | |
| success, image = vidcap.read() | |
| count = 0 | |
| if not os.path.exists(output_folder): | |
| os.makedirs(output_folder) | |
| while success: | |
| frame_name = os.path.join(output_folder, f"frame_{count}.jpg") | |
| cv2.imwrite(frame_name, image) | |
| success, image = vidcap.read() | |
| count += 1 | |
| if count > 550: | |
| break | |
| print(f"{count} frames extracted from {video_path}.") | |
| return [count, fps] | |
| def frames_to_video(frame_folder, video_path, image_path, frame_count, fps): | |
| global progress | |
| frames = [f for f in os.listdir(frame_folder) if f.endswith('.jpg')] | |
| frames.sort(key=lambda x: int(x.split('_')[1].split('.')[0])) # Sort frames in ascending order | |
| frame = cv2.imread(os.path.join(frame_folder, frames[0])) | |
| height, width, _ = frame.shape | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(video_path, fourcc, fps, (width, height)) | |
| providers = ["CUDAExecutionProvider"] | |
| app = insightface.app.FaceAnalysis(name='buffalo_l', providers=providers) | |
| app.prepare(ctx_id=0, det_size=(640, 640)) | |
| swapper = insightface.model_zoo.get_model("inswapper_128.onnx", download=False, download_zip=False, providers=providers) | |
| face_enhancer = gfpgan.GFPGANer(model_path="GFPGANv1.4.pth", upscale=1, device='cuda') | |
| for i in tqdm(range(frame_count), desc="Converting frames to video"): | |
| print("Progress:",progress) | |
| img1 = cv2.imread(os.path.join(frame_folder, frames[i])) | |
| faces1 = app.get(img1) | |
| for _ in range(20): | |
| faces2 = app.get(image_path) | |
| if faces2: | |
| break | |
| else: | |
| return | |
| if faces1: | |
| face1 = faces1[0] | |
| face2 = faces2[0] | |
| result = img1.copy() | |
| result = swapper.get(result, face1, face2, paste_back=True) | |
| _, _, result = face_enhancer.enhance(result) | |
| out.write(result) | |
| else: | |
| out.write(img1) | |
| progress = int((i + 1) / frame_count * 100) | |
| out.release() | |
| print(f"Video saved at {video_path}.") | |
| def face_swap(video_path, image_path): | |
| global progress | |
| progress = 0 | |
| output_folder = "Out_Frames" | |
| frame_count = video_to_frames(video_path, output_folder) | |
| if frame_count[0] > 400: | |
| frame_count[0] = 400 | |
| output_video_path = "output_video.mp4" | |
| frames_to_video(output_folder, output_video_path, image_path, frame_count[0], frame_count[1]) | |
| return output_video_path | |
| def get_progress(): | |
| global progress | |
| return progress | |
| iface = gr.Interface( | |
| fn=face_swap, | |
| inputs=["video", "image"], | |
| outputs="video", | |
| title="Profaker's Face Swap", | |
| description="Upload a video and an image. The faces in the video will be swapped with the face in the image.", | |
| ) | |
| progress_iface = gr.Interface( | |
| fn=get_progress, | |
| inputs=[], | |
| outputs="number", | |
| live=True, | |
| title="Progress Tracker" | |
| ) | |
| iface.launch(share=True) | |
| progress_iface.launch(share=True) | |