import os import subprocess import cv2 from datetime import datetime from functools import partial from concurrent.futures import ThreadPoolExecutor from collections import deque from tqdm import tqdm root_dir = '/content/wav2lip-gfpgan' def stream_command( args, *, stdout_handler=print, stderr_handler=print, check=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs, ): """Mimic subprocess.run, while processing the command output in real time.""" with subprocess.Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process: with ThreadPoolExecutor(2) as pool: # two threads to handle the streams exhaust = partial(pool.submit, partial(deque, maxlen=0)) exhaust(stdout_handler(line[:-1]) for line in process.stdout) exhaust(stderr_handler(line[:-1]) for line in process.stderr) retcode = process.poll() if check and retcode: raise subprocess.CalledProcessError(retcode, process.args) return subprocess.CompletedProcess(process.args, retcode) def call_wav2lip(video_path, audio_path, output_path): checkpoint_path = os.path.join(root_dir, 'wav2lip/checkpoints/wav2lip.pth') assert os.path.isfile(video_path), f'Video path {video_path} not exist.' assert os.path.isfile(audio_path), f'Audio path {audio_path} not exist.' assert os.path.isfile(checkpoint_path), f'Checkpoint file {checkpoint_path} not exist.' # python inference.py \ # --checkpoint_path checkpoints/wav2lip.pth \ # --face {inputVideoPath} \ # --audio {inputAudioPath} \ # --outfile {lipSyncedOutputPath} start = datetime.now() cmd = [ "python", "wav2lip/inference.py", "--checkpoint_path", checkpoint_path, # # "--segmentation_path", "checkpoints/face_segmentation.pth", "--face", video_path, "--audio", audio_path, "--outfile", output_path, ] print(f'Call subprocess: {cmd}') stream_command(cmd) duration = datetime.now() - start print(f'wav2lip finished in {duration}') origin_frames_folder = _get_frames(output_path) return output_path def _get_frames(video_path): folder_path = os.path.dirname(video_path) origin_frames_folder = os.path.join(folder_path, 'frames') os.makedirs(origin_frames_folder, exist_ok=True) # get frames pics vidcap = cv2.VideoCapture(video_path) numberOfFrames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = vidcap.get(cv2.CAP_PROP_FPS) print("FPS: ", fps, "Frames: ", numberOfFrames) for frameNumber in tqdm(range(numberOfFrames)): _, image = vidcap.read() cv2.imwrite(os.path.join(origin_frames_folder, str(frameNumber).zfill(4) + '.jpg'), image) return origin_frames_folder def call_gfpgan(wav2lip_mp4): assert os.path.isfile(wav2lip_mp4), f'Video path {wav2lip_mp4} not exist.' folder_path = os.path.dirname(wav2lip_mp4) origin_frames_folder = os.path.join(folder_path, 'frames') # python inference_gfpgan.py # -i "$unProcessedFramesFolderPath" # -o "$outputPath" # -v 1.3 # -s 2 # --only_center_face # --bg_upsampler None start = datetime.now() cmd = [ "python", "gfpgan/inference_gfpgan.py", "-i", origin_frames_folder, "-o", folder_path, # "-v", str(1.4), # "-s", str(2), "--only_center_face", "--bg_upsampler", 'realesrgan' ] print(cmd) stream_command(cmd) duration = datetime.now() - start print(f'inference_gfpgan finished in {duration}') def merge(folder_path, audio_path, output_mp4): start = datetime.now() cmd = [ "python", "merge.py", "-j", folder_path, "-a", audio_path, "-o", output_mp4, ] stream_command(cmd) duration = datetime.now() - start print(f'Merge output in {duration}') print(output_mp4)