Spaces:
Runtime error
Runtime error
import os | |
import subprocess | |
import cv2 | |
from datetime import datetime | |
from functools import partial | |
from concurrent.futures import ThreadPoolExecutor | |
from collections import deque | |
from tqdm import tqdm | |
root_dir = '/content/wav2lip-gfpgan' | |
def stream_command( | |
args, | |
*, | |
stdout_handler=print, | |
stderr_handler=print, | |
check=True, | |
text=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
**kwargs, | |
): | |
"""Mimic subprocess.run, while processing the command output in real time.""" | |
with subprocess.Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process: | |
with ThreadPoolExecutor(2) as pool: # two threads to handle the streams | |
exhaust = partial(pool.submit, partial(deque, maxlen=0)) | |
exhaust(stdout_handler(line[:-1]) for line in process.stdout) | |
exhaust(stderr_handler(line[:-1]) for line in process.stderr) | |
retcode = process.poll() | |
if check and retcode: | |
raise subprocess.CalledProcessError(retcode, process.args) | |
return subprocess.CompletedProcess(process.args, retcode) | |
def call_wav2lip(video_path, audio_path, output_path): | |
checkpoint_path = os.path.join(root_dir, 'wav2lip/checkpoints/wav2lip.pth') | |
assert os.path.isfile(video_path), f'Video path {video_path} not exist.' | |
assert os.path.isfile(audio_path), f'Audio path {audio_path} not exist.' | |
assert os.path.isfile(checkpoint_path), f'Checkpoint file {checkpoint_path} not exist.' | |
# python inference.py \ | |
# --checkpoint_path checkpoints/wav2lip.pth \ | |
# --face {inputVideoPath} \ | |
# --audio {inputAudioPath} \ | |
# --outfile {lipSyncedOutputPath} | |
start = datetime.now() | |
cmd = [ | |
"python", | |
"wav2lip/inference.py", | |
"--checkpoint_path", checkpoint_path, # | |
# "--segmentation_path", "checkpoints/face_segmentation.pth", | |
"--face", video_path, | |
"--audio", audio_path, | |
"--outfile", output_path, | |
] | |
print(f'Call subprocess: {cmd}') | |
stream_command(cmd) | |
duration = datetime.now() - start | |
print(f'wav2lip finished in {duration}') | |
origin_frames_folder = _get_frames(output_path) | |
return output_path | |
def _get_frames(video_path): | |
folder_path = os.path.dirname(video_path) | |
origin_frames_folder = os.path.join(folder_path, 'frames') | |
os.makedirs(origin_frames_folder, exist_ok=True) | |
# get frames pics | |
vidcap = cv2.VideoCapture(video_path) | |
numberOfFrames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
fps = vidcap.get(cv2.CAP_PROP_FPS) | |
print("FPS: ", fps, "Frames: ", numberOfFrames) | |
for frameNumber in tqdm(range(numberOfFrames)): | |
_, image = vidcap.read() | |
cv2.imwrite(os.path.join(origin_frames_folder, str(frameNumber).zfill(4) + '.jpg'), image) | |
return origin_frames_folder | |
def call_gfpgan(wav2lip_mp4): | |
assert os.path.isfile(wav2lip_mp4), f'Video path {wav2lip_mp4} not exist.' | |
folder_path = os.path.dirname(wav2lip_mp4) | |
origin_frames_folder = os.path.join(folder_path, 'frames') | |
# python inference_gfpgan.py | |
# -i "$unProcessedFramesFolderPath" | |
# -o "$outputPath" | |
# -v 1.3 | |
# -s 2 | |
# --only_center_face | |
# --bg_upsampler None | |
start = datetime.now() | |
cmd = [ | |
"python", | |
"gfpgan/inference_gfpgan.py", | |
"-i", origin_frames_folder, | |
"-o", folder_path, | |
# "-v", str(1.4), | |
# "-s", str(2), | |
"--only_center_face", | |
"--bg_upsampler", 'realesrgan' | |
] | |
print(cmd) | |
stream_command(cmd) | |
duration = datetime.now() - start | |
print(f'inference_gfpgan finished in {duration}') | |
def merge(folder_path, audio_path, output_mp4): | |
start = datetime.now() | |
cmd = [ | |
"python", | |
"merge.py", | |
"-j", folder_path, | |
"-a", audio_path, | |
"-o", output_mp4, | |
] | |
stream_command(cmd) | |
duration = datetime.now() - start | |
print(f'Merge output in {duration}') | |
print(output_mp4) | |