import os import time import cv2 from flask import Flask, request, jsonify, send_file from flask_ngrok2 import run_with_ngrok from subprocess import call from tqdm import tqdm import numpy as np from ffmpy import FFmpeg # !pip install flask flask-ngrok2 pyngrok app = Flask(__name__) auth_token = '2XQTtjJqVg4B4ryQVgauTPIGeiK_3JUWVJcMhXwpPxz2sc9KB' root_dir = '/content/wav2lip-gfpgan' jobs_dir = os.path.join('/content', 'jobs') @app.route('/', methods=['GET']) def index(): return jsonify({'hello': 'world!'}) # New route to serve files from /content/input @app.route('/job//', methods=['GET']) def download_file(job_id, filename): try: file_path = os.path.join(jobs_dir, job_id, filename) print(file_path) return send_file(file_path, as_attachment=True) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/wav2lip', methods=['POST']) def wav2lip(): try: # Get uploaded files video_file = request.files['video'] audio_file = request.files['audio'] job_id = str(int(time.time())) job_path = os.path.join(jobs_dir, job_id) os.makedirs(job_path, exist_ok=True) # Save the files to a temporary directory video_path = os.path.join(job_path, video_file.filename) audio_path = os.path.join(job_path, audio_file.filename) video_file.save(video_path) audio_file.save(audio_path) wav2lip_mp4 = os.path.join(job_path, 'wav2lip.mp4') call_wav2lip(video_path, audio_path, wav2lip_mp4) output_filename = 'output.mp4' output_mp4 = os.path.join(job_path, output_filename) call_gfpgan(wav2lip_mp4, audio_path, output_mp4) return jsonify({'url': f'/job/{job_id}/{output_filename}'}) except Exception as e: return jsonify({'error': str(e)}), 500 def call_wav2lip(video_path, audio_path, output_path): checkpoint_path = os.path.join(root_dir, 'wav2lip/checkpoints/wav2lip.pth') assert os.path.isfile(video_path), f'Video path {video_path} not exist.' assert os.path.isfile(audio_path), f'Audio path {audio_path} not exist.' assert os.path.isfile(checkpoint_path), f'Checkpoint file {checkpoint_path} not exist.' # python inference.py \ # --checkpoint_path checkpoints/wav2lip.pth \ # --face {inputVideoPath} \ # --audio {inputAudioPath} \ # --outfile {lipSyncedOutputPath} cmd = [ "python", "wav2lip/inference.py", "--checkpoint_path", checkpoint_path, # "--segmentation_path", "checkpoints/face_segmentation.pth", "--face", video_path, "--audio", audio_path, "--outfile", output_path, ] call(cmd) return output_path def _get_frames(video_path): folder_path = os.path.dirname(video_path) origin_frames_folder = os.path.join(folder_path, 'frames') os.makedirs(origin_frames_folder, exist_ok=True) # get frames pics vidcap = cv2.VideoCapture(video_path) numberOfFrames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = vidcap.get(cv2.CAP_PROP_FPS) print("FPS: ", fps, "Frames: ", numberOfFrames) for frameNumber in tqdm(range(numberOfFrames)): _, image = vidcap.read() cv2.imwrite(os.path.join(origin_frames_folder, str(frameNumber).zfill(4) + '.jpg'), image) return origin_frames_folder def call_gfpgan(wav2lip_mp4, audio_path, output_mp4): assert os.path.isfile(wav2lip_mp4), f'Video path {wav2lip_mp4} not exist.' origin_frames_folder = _get_frames(wav2lip_mp4) folder_path = os.path.dirname(wav2lip_mp4) # python inference_gfpgan.py # -i "$unProcessedFramesFolderPath" # -o "$outputPath" # -v 1.3 # -s 2 # --only_center_face # --bg_upsampler None cmd = [ "python", "gfpgan/inference_gfpgan.py", "-i", origin_frames_folder, "-o", folder_path, "-v", 1.3, "-s", 2, "--only_center_face", "--bg_upsampler", None ] call(cmd) restoredFramesPath = os.path.join(folder_path, 'restored_imgs') os.makedirs(restoredFramesPath, exist_ok=True) folder_path = folder_path dir_list = os.listdir(restoredFramesPath) dir_list.sort() batch = 0 batchSize = 300 for i in tqdm(range(0, len(dir_list), batchSize)): img_array = [] start, end = i, i + batchSize print("processing ", start, end) for filename in tqdm(dir_list[start:end]): filename = restoredFramesPath + filename; img = cv2.imread(filename) if img is None: continue height, width, layers = img.shape size = (width, height) img_array.append(img) fourcc = cv2.VideoWriter_fourcc(*'X264') filename = 'batch_' + str(batch).zfill(4) + '.mp4' out = cv2.VideoWriter(os.path.join(folder_path, filename), fourcc, 30, size) batch = batch + 1 for i in range(len(img_array)): out.write(img_array[i]) out.release() concatTextFilePath = os.path.join(folder_path, "concat.txt") concatTextFile = open(concatTextFilePath, "w") for ips in range(batch): concatTextFile.write("file batch_" + str(ips).zfill(4) + ".mp4\n") concatTextFile.close() concatedVideoOutputPath = os.path.join(folder_path, "concated_output.mp4") ff = FFmpeg( inputs={concatTextFilePath: None}, outputs={concatedVideoOutputPath: '-y -c copy'} ) # !ffmpeg -y -f concat -i {concatTextFilePath} -c copy {concatedVideoOutputPath} ff.run() ff = FFmpeg( inputs={concatedVideoOutputPath: None, audio_path: None}, outputs={output_mp4: '-y -map 0 -map 1:a -c:v libx264 -c:a aac -shortest'} ) # !ffmpeg -y -i {concatedVideoOutputPath} -i {inputAudioPath} -map 0 -map 1:a -c:v copy -shortest {finalProcessedOuputVideo} ff.run() # from google.colab import files # files.download(finalProcessedOuputVideo) if __name__ == '__main__': run_with_ngrok(app, auth_token=auth_token) app.run() def test(): # request import requests ngrok_url = f"http://74c0-34-87-172-60.ngrok-free.app" url = f"{ngrok_url}/wav2lip" print(url) video_path = '/Users/taoluo/Downloads/oIy5B4-vHVw.4.6588496370531551262.0.jpg' audio_path = '/Users/taoluo/Downloads/test_audio.mp3' files = {'video': ('video.jpg', open(video_path, 'rb')), 'audio': ('audio.mp3', open(audio_path, 'rb'))} headers = {'ngrok-skip-browser-warning': 'true'} response = requests.post(url, files=files, headers=headers) # Print the response print(response.json()) data = response.json() print(ngrok_url + data['url'])