wav2lip-gfpgan / colab_flask.py
lorneluo's picture
add test inputs
6f8c97f
raw
history blame
No virus
6.79 kB
import os
import time
import cv2
from flask import Flask, request, jsonify, send_file
from flask_ngrok2 import run_with_ngrok
from subprocess import call
from tqdm import tqdm
import numpy as np
from ffmpy import FFmpeg
# !pip install flask flask-ngrok2 pyngrok
app = Flask(__name__)
auth_token = '2XQTtjJqVg4B4ryQVgauTPIGeiK_3JUWVJcMhXwpPxz2sc9KB'
root_dir = '/content/wav2lip-gfpgan'
jobs_dir = os.path.join('/content', 'jobs')
@app.route('/', methods=['GET'])
def index():
return jsonify({'hello': 'world!'})
# New route to serve files from /content/input
@app.route('/job/<job_id>/<filename>', methods=['GET'])
def download_file(job_id, filename):
try:
file_path = os.path.join(jobs_dir, job_id, filename)
print(file_path)
return send_file(file_path, as_attachment=True)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/wav2lip', methods=['POST'])
def wav2lip():
try:
# Get uploaded files
video_file = request.files['video']
audio_file = request.files['audio']
job_id = str(int(time.time()))
job_path = os.path.join(jobs_dir, job_id)
os.makedirs(job_path, exist_ok=True)
# Save the files to a temporary directory
video_path = os.path.join(job_path, video_file.filename)
audio_path = os.path.join(job_path, audio_file.filename)
video_file.save(video_path)
audio_file.save(audio_path)
wav2lip_mp4 = os.path.join(job_path, 'wav2lip.mp4')
call_wav2lip(video_path, audio_path, wav2lip_mp4)
output_filename = 'output.mp4'
output_mp4 = os.path.join(job_path, output_filename)
call_gfpgan(wav2lip_mp4, audio_path, output_mp4)
return jsonify({'url': f'/job/{job_id}/{output_filename}'})
except Exception as e:
return jsonify({'error': str(e)}), 500
def call_wav2lip(video_path, audio_path, output_path):
checkpoint_path = os.path.join(root_dir, 'wav2lip/checkpoints/wav2lip.pth')
assert os.path.isfile(video_path), f'Video path {video_path} not exist.'
assert os.path.isfile(audio_path), f'Audio path {audio_path} not exist.'
assert os.path.isfile(checkpoint_path), f'Checkpoint file {checkpoint_path} not exist.'
# python inference.py \
# --checkpoint_path checkpoints/wav2lip.pth \
# --face {inputVideoPath} \
# --audio {inputAudioPath} \
# --outfile {lipSyncedOutputPath}
cmd = [
"python",
"wav2lip/inference.py",
"--checkpoint_path", checkpoint_path,
# "--segmentation_path", "checkpoints/face_segmentation.pth",
"--face", video_path,
"--audio", audio_path,
"--outfile", output_path,
]
call(cmd)
return output_path
def _get_frames(video_path):
folder_path = os.path.dirname(video_path)
origin_frames_folder = os.path.join(folder_path, 'frames')
os.makedirs(origin_frames_folder, exist_ok=True)
# get frames pics
vidcap = cv2.VideoCapture(video_path)
numberOfFrames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = vidcap.get(cv2.CAP_PROP_FPS)
print("FPS: ", fps, "Frames: ", numberOfFrames)
for frameNumber in tqdm(range(numberOfFrames)):
_, image = vidcap.read()
cv2.imwrite(os.path.join(origin_frames_folder, str(frameNumber).zfill(4) + '.jpg'), image)
return origin_frames_folder
def call_gfpgan(wav2lip_mp4, audio_path, output_mp4):
assert os.path.isfile(wav2lip_mp4), f'Video path {wav2lip_mp4} not exist.'
origin_frames_folder = _get_frames(wav2lip_mp4)
folder_path = os.path.dirname(wav2lip_mp4)
# python inference_gfpgan.py
# -i "$unProcessedFramesFolderPath"
# -o "$outputPath"
# -v 1.3
# -s 2
# --only_center_face
# --bg_upsampler None
cmd = [
"python",
"gfpgan/inference_gfpgan.py",
"-i", origin_frames_folder,
"-o", folder_path,
"-v", 1.3,
"-s", 2,
"--only_center_face",
"--bg_upsampler", None
]
call(cmd)
restoredFramesPath = os.path.join(folder_path, 'restored_imgs')
os.makedirs(restoredFramesPath, exist_ok=True)
folder_path = folder_path
dir_list = os.listdir(restoredFramesPath)
dir_list.sort()
batch = 0
batchSize = 300
for i in tqdm(range(0, len(dir_list), batchSize)):
img_array = []
start, end = i, i + batchSize
print("processing ", start, end)
for filename in tqdm(dir_list[start:end]):
filename = restoredFramesPath + filename;
img = cv2.imread(filename)
if img is None:
continue
height, width, layers = img.shape
size = (width, height)
img_array.append(img)
fourcc = cv2.VideoWriter_fourcc(*'X264')
filename = 'batch_' + str(batch).zfill(4) + '.mp4'
out = cv2.VideoWriter(os.path.join(folder_path, filename),
fourcc, 30, size)
batch = batch + 1
for i in range(len(img_array)):
out.write(img_array[i])
out.release()
concatTextFilePath = os.path.join(folder_path, "concat.txt")
concatTextFile = open(concatTextFilePath, "w")
for ips in range(batch):
concatTextFile.write("file batch_" + str(ips).zfill(4) + ".mp4\n")
concatTextFile.close()
concatedVideoOutputPath = os.path.join(folder_path, "concated_output.mp4")
ff = FFmpeg(
inputs={concatTextFilePath: None},
outputs={concatedVideoOutputPath: '-y -c copy'}
)
# !ffmpeg -y -f concat -i {concatTextFilePath} -c copy {concatedVideoOutputPath}
ff.run()
ff = FFmpeg(
inputs={concatedVideoOutputPath: None, audio_path: None},
outputs={output_mp4: '-y -map 0 -map 1:a -c:v libx264 -c:a aac -shortest'}
)
# !ffmpeg -y -i {concatedVideoOutputPath} -i {inputAudioPath} -map 0 -map 1:a -c:v copy -shortest {finalProcessedOuputVideo}
ff.run()
# from google.colab import files
# files.download(finalProcessedOuputVideo)
if __name__ == '__main__':
run_with_ngrok(app, auth_token=auth_token)
app.run()
def test():
# request
import requests
ngrok_url = f"http://74c0-34-87-172-60.ngrok-free.app"
url = f"{ngrok_url}/wav2lip"
print(url)
video_path = '/Users/taoluo/Downloads/oIy5B4-vHVw.4.6588496370531551262.0.jpg'
audio_path = '/Users/taoluo/Downloads/test_audio.mp3'
files = {'video': ('video.jpg', open(video_path, 'rb')), 'audio': ('audio.mp3', open(audio_path, 'rb'))}
headers = {'ngrok-skip-browser-warning': 'true'}
response = requests.post(url, files=files, headers=headers)
# Print the response
print(response.json())
data = response.json()
print(ngrok_url + data['url'])