wav2lip-gfpgan / main.py
lorneluo's picture
run.py to download video audio and process
61ba537
raw
history blame
4 kB
import os
import subprocess
import cv2
from datetime import datetime
from functools import partial
from concurrent.futures import ThreadPoolExecutor
from collections import deque
from tqdm import tqdm
root_dir = '/content/wav2lip-gfpgan'
def stream_command(
args,
*,
stdout_handler=print,
stderr_handler=print,
check=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
):
"""Mimic subprocess.run, while processing the command output in real time."""
with subprocess.Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process:
with ThreadPoolExecutor(2) as pool: # two threads to handle the streams
exhaust = partial(pool.submit, partial(deque, maxlen=0))
exhaust(stdout_handler(line[:-1]) for line in process.stdout)
exhaust(stderr_handler(line[:-1]) for line in process.stderr)
retcode = process.poll()
if check and retcode:
raise subprocess.CalledProcessError(retcode, process.args)
return subprocess.CompletedProcess(process.args, retcode)
def call_wav2lip(video_path, audio_path, output_path):
checkpoint_path = os.path.join(root_dir, 'wav2lip/checkpoints/wav2lip.pth')
assert os.path.isfile(video_path), f'Video path {video_path} not exist.'
assert os.path.isfile(audio_path), f'Audio path {audio_path} not exist.'
assert os.path.isfile(checkpoint_path), f'Checkpoint file {checkpoint_path} not exist.'
# python inference.py \
# --checkpoint_path checkpoints/wav2lip.pth \
# --face {inputVideoPath} \
# --audio {inputAudioPath} \
# --outfile {lipSyncedOutputPath}
start = datetime.now()
cmd = [
"python",
"wav2lip/inference.py",
"--checkpoint_path", checkpoint_path, #
# "--segmentation_path", "checkpoints/face_segmentation.pth",
"--face", video_path,
"--audio", audio_path,
"--outfile", output_path,
]
print(f'Call subprocess: {cmd}')
stream_command(cmd)
duration = datetime.now() - start
print(f'wav2lip finished in {duration}')
origin_frames_folder = _get_frames(output_path)
return output_path
def _get_frames(video_path):
folder_path = os.path.dirname(video_path)
origin_frames_folder = os.path.join(folder_path, 'frames')
os.makedirs(origin_frames_folder, exist_ok=True)
# get frames pics
vidcap = cv2.VideoCapture(video_path)
numberOfFrames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = vidcap.get(cv2.CAP_PROP_FPS)
print("FPS: ", fps, "Frames: ", numberOfFrames)
for frameNumber in tqdm(range(numberOfFrames)):
_, image = vidcap.read()
cv2.imwrite(os.path.join(origin_frames_folder, str(frameNumber).zfill(4) + '.jpg'), image)
return origin_frames_folder
def call_gfpgan(wav2lip_mp4):
assert os.path.isfile(wav2lip_mp4), f'Video path {wav2lip_mp4} not exist.'
folder_path = os.path.dirname(wav2lip_mp4)
origin_frames_folder = os.path.join(folder_path, 'frames')
# python inference_gfpgan.py
# -i "$unProcessedFramesFolderPath"
# -o "$outputPath"
# -v 1.3
# -s 2
# --only_center_face
# --bg_upsampler None
start = datetime.now()
cmd = [
"python",
"gfpgan/inference_gfpgan.py",
"-i", origin_frames_folder,
"-o", folder_path,
# "-v", str(1.4),
# "-s", str(2),
"--only_center_face",
"--bg_upsampler", 'realesrgan'
]
print(cmd)
stream_command(cmd)
duration = datetime.now() - start
print(f'inference_gfpgan finished in {duration}')
def merge(folder_path, audio_path, output_mp4):
start = datetime.now()
cmd = [
"python",
"merge.py",
"-j", folder_path,
"-a", audio_path,
"-o", output_mp4,
]
stream_command(cmd)
duration = datetime.now() - start
print(f'Merge output in {duration}')
print(output_mp4)