Spaces:
Runtime error
Runtime error
File size: 6,787 Bytes
9c040e3 7dd4b4a 9c040e3 6f8c97f 9c040e3 7dd4b4a 9c040e3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
import os
import time
import cv2
from flask import Flask, request, jsonify, send_file
from flask_ngrok2 import run_with_ngrok
from subprocess import call
from tqdm import tqdm
import numpy as np
from ffmpy import FFmpeg
# !pip install flask flask-ngrok2 pyngrok
app = Flask(__name__)
auth_token = '2XQTtjJqVg4B4ryQVgauTPIGeiK_3JUWVJcMhXwpPxz2sc9KB'
root_dir = '/content/wav2lip-gfpgan'
jobs_dir = os.path.join('/content', 'jobs')
@app.route('/', methods=['GET'])
def index():
return jsonify({'hello': 'world!'})
# New route to serve files from /content/input
@app.route('/job/<job_id>/<filename>', methods=['GET'])
def download_file(job_id, filename):
try:
file_path = os.path.join(jobs_dir, job_id, filename)
print(file_path)
return send_file(file_path, as_attachment=True)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/wav2lip', methods=['POST'])
def wav2lip():
try:
# Get uploaded files
video_file = request.files['video']
audio_file = request.files['audio']
job_id = str(int(time.time()))
job_path = os.path.join(jobs_dir, job_id)
os.makedirs(job_path, exist_ok=True)
# Save the files to a temporary directory
video_path = os.path.join(job_path, video_file.filename)
audio_path = os.path.join(job_path, audio_file.filename)
video_file.save(video_path)
audio_file.save(audio_path)
wav2lip_mp4 = os.path.join(job_path, 'wav2lip.mp4')
call_wav2lip(video_path, audio_path, wav2lip_mp4)
output_filename = 'output.mp4'
output_mp4 = os.path.join(job_path, output_filename)
call_gfpgan(wav2lip_mp4, audio_path, output_mp4)
return jsonify({'url': f'/job/{job_id}/{output_filename}'})
except Exception as e:
return jsonify({'error': str(e)}), 500
def call_wav2lip(video_path, audio_path, output_path):
checkpoint_path = os.path.join(root_dir, 'wav2lip/checkpoints/wav2lip.pth')
assert os.path.isfile(video_path), f'Video path {video_path} not exist.'
assert os.path.isfile(audio_path), f'Audio path {audio_path} not exist.'
assert os.path.isfile(checkpoint_path), f'Checkpoint file {checkpoint_path} not exist.'
# python inference.py \
# --checkpoint_path checkpoints/wav2lip.pth \
# --face {inputVideoPath} \
# --audio {inputAudioPath} \
# --outfile {lipSyncedOutputPath}
cmd = [
"python",
"wav2lip/inference.py",
"--checkpoint_path", checkpoint_path,
# "--segmentation_path", "checkpoints/face_segmentation.pth",
"--face", video_path,
"--audio", audio_path,
"--outfile", output_path,
]
call(cmd)
return output_path
def _get_frames(video_path):
folder_path = os.path.dirname(video_path)
origin_frames_folder = os.path.join(folder_path, 'frames')
os.makedirs(origin_frames_folder, exist_ok=True)
# get frames pics
vidcap = cv2.VideoCapture(video_path)
numberOfFrames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = vidcap.get(cv2.CAP_PROP_FPS)
print("FPS: ", fps, "Frames: ", numberOfFrames)
for frameNumber in tqdm(range(numberOfFrames)):
_, image = vidcap.read()
cv2.imwrite(os.path.join(origin_frames_folder, str(frameNumber).zfill(4) + '.jpg'), image)
return origin_frames_folder
def call_gfpgan(wav2lip_mp4, audio_path, output_mp4):
assert os.path.isfile(wav2lip_mp4), f'Video path {wav2lip_mp4} not exist.'
origin_frames_folder = _get_frames(wav2lip_mp4)
folder_path = os.path.dirname(wav2lip_mp4)
# python inference_gfpgan.py
# -i "$unProcessedFramesFolderPath"
# -o "$outputPath"
# -v 1.3
# -s 2
# --only_center_face
# --bg_upsampler None
cmd = [
"python",
"gfpgan/inference_gfpgan.py",
"-i", origin_frames_folder,
"-o", folder_path,
"-v", 1.3,
"-s", 2,
"--only_center_face",
"--bg_upsampler", None
]
call(cmd)
restoredFramesPath = os.path.join(folder_path, 'restored_imgs')
os.makedirs(restoredFramesPath, exist_ok=True)
folder_path = folder_path
dir_list = os.listdir(restoredFramesPath)
dir_list.sort()
batch = 0
batchSize = 300
for i in tqdm(range(0, len(dir_list), batchSize)):
img_array = []
start, end = i, i + batchSize
print("processing ", start, end)
for filename in tqdm(dir_list[start:end]):
filename = restoredFramesPath + filename;
img = cv2.imread(filename)
if img is None:
continue
height, width, layers = img.shape
size = (width, height)
img_array.append(img)
fourcc = cv2.VideoWriter_fourcc(*'X264')
filename = 'batch_' + str(batch).zfill(4) + '.mp4'
out = cv2.VideoWriter(os.path.join(folder_path, filename),
fourcc, 30, size)
batch = batch + 1
for i in range(len(img_array)):
out.write(img_array[i])
out.release()
concatTextFilePath = os.path.join(folder_path, "concat.txt")
concatTextFile = open(concatTextFilePath, "w")
for ips in range(batch):
concatTextFile.write("file batch_" + str(ips).zfill(4) + ".mp4\n")
concatTextFile.close()
concatedVideoOutputPath = os.path.join(folder_path, "concated_output.mp4")
ff = FFmpeg(
inputs={concatTextFilePath: None},
outputs={concatedVideoOutputPath: '-y -c copy'}
)
# !ffmpeg -y -f concat -i {concatTextFilePath} -c copy {concatedVideoOutputPath}
ff.run()
ff = FFmpeg(
inputs={concatedVideoOutputPath: None, audio_path: None},
outputs={output_mp4: '-y -map 0 -map 1:a -c:v libx264 -c:a aac -shortest'}
)
# !ffmpeg -y -i {concatedVideoOutputPath} -i {inputAudioPath} -map 0 -map 1:a -c:v copy -shortest {finalProcessedOuputVideo}
ff.run()
# from google.colab import files
# files.download(finalProcessedOuputVideo)
if __name__ == '__main__':
run_with_ngrok(app, auth_token=auth_token)
app.run()
def test():
# request
import requests
ngrok_url = f"http://74c0-34-87-172-60.ngrok-free.app"
url = f"{ngrok_url}/wav2lip"
print(url)
video_path = '/Users/taoluo/Downloads/oIy5B4-vHVw.4.6588496370531551262.0.jpg'
audio_path = '/Users/taoluo/Downloads/test_audio.mp3'
files = {'video': ('video.jpg', open(video_path, 'rb')), 'audio': ('audio.mp3', open(audio_path, 'rb'))}
headers = {'ngrok-skip-browser-warning': 'true'}
response = requests.post(url, files=files, headers=headers)
# Print the response
print(response.json())
data = response.json()
print(ngrok_url + data['url'])
|