|
import requests |
|
import json |
|
import time |
|
import subprocess |
|
import gradio as gr |
|
import uuid |
|
import os |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN") |
|
|
|
|
|
REPLICATE_API_URL = "https://api.replicate.com/v1/predictions" |
|
UPLOAD_URL = os.getenv("UPLOAD_URL") |
|
|
|
def get_voices(): |
|
|
|
return [ |
|
("alloy", "alloy"), |
|
("echo", "echo"), |
|
("fable", "fable"), |
|
("onyx", "onyx"), |
|
("nova", "nova"), |
|
("shimmer", "shimmer") |
|
] |
|
|
|
def text_to_speech(voice, text, session_id): |
|
url = "https://api.openai.com/v1/audio/speech" |
|
|
|
headers = { |
|
"Authorization": f"Bearer {OPENAI_API_KEY}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
data = { |
|
"model": "tts-1", |
|
"input": text, |
|
"voice": voice |
|
} |
|
|
|
response = requests.post(url, json=data, headers=headers) |
|
if response.status_code != 200: |
|
return None |
|
|
|
|
|
audio_file_path = f'tempvoice{session_id}.mp3' |
|
with open(audio_file_path, 'wb') as audio_file: |
|
audio_file.write(response.content) |
|
return audio_file_path |
|
|
|
def upload_file(file_path): |
|
with open(file_path, 'rb') as file: |
|
files = {'fileToUpload': (os.path.basename(file_path), file)} |
|
data = {'reqtype': 'fileupload'} |
|
response = requests.post(UPLOAD_URL, files=files, data=data) |
|
|
|
if response.status_code == 200: |
|
return response.text.strip() |
|
return None |
|
|
|
def lipsync_api_call(video_url, audio_url): |
|
headers = { |
|
"Authorization": f"Bearer {REPLICATE_API_TOKEN}", |
|
"Content-Type": "application/json", |
|
"Prefer": "wait" |
|
} |
|
|
|
data = { |
|
"version": "db5a650c807b007dc5f9e5abe27c53e1b62880d1f94d218d27ce7fa802711d67", |
|
"input": { |
|
"face": video_url, |
|
"input_audio": audio_url |
|
} |
|
} |
|
|
|
response = requests.post(REPLICATE_API_URL, headers=headers, json=data) |
|
return response.json() |
|
|
|
def check_job_status(prediction_id): |
|
headers = {"Authorization": f"Bearer {REPLICATE_API_TOKEN}"} |
|
max_attempts = 30 |
|
|
|
for _ in range(max_attempts): |
|
response = requests.get(f"{REPLICATE_API_URL}/{prediction_id}", headers=headers) |
|
data = response.json() |
|
|
|
if data["status"] == "succeeded": |
|
return data["output"] |
|
elif data["status"] == "failed": |
|
return None |
|
|
|
time.sleep(10) |
|
return None |
|
|
|
def get_media_duration(file_path): |
|
|
|
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path] |
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
return float(result.stdout.strip()) |
|
|
|
def combine_audio_video(video_path, audio_path, output_path): |
|
|
|
video_duration = get_media_duration(video_path) |
|
audio_duration = get_media_duration(audio_path) |
|
|
|
if video_duration > audio_duration: |
|
|
|
cmd = [ |
|
'ffmpeg', '-i', video_path, '-i', audio_path, |
|
'-t', str(audio_duration), |
|
'-map', '0:v', '-map', '1:a', |
|
'-c:v', 'copy', '-c:a', 'aac', |
|
'-y', output_path |
|
] |
|
else: |
|
|
|
loop_count = int(audio_duration // video_duration) + 1 |
|
cmd = [ |
|
'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path, |
|
'-t', str(audio_duration), |
|
'-map', '0:v', '-map', '1:a', |
|
'-c:v', 'copy', '-c:a', 'aac', |
|
'-shortest', '-y', output_path |
|
] |
|
|
|
subprocess.run(cmd, check=True) |
|
|
|
def create_video_from_image(image_url, session_id): |
|
|
|
response = requests.get(image_url) |
|
image_path = f"tempimage{session_id}.jpg" |
|
with open(image_path, "wb") as f: |
|
f.write(response.content) |
|
|
|
|
|
video_path = f"tempvideo{session_id}.mp4" |
|
cmd = [ |
|
'ffmpeg', '-loop', '1', '-i', image_path, |
|
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', |
|
'-c:v', 'libx264', '-t', '10', '-pix_fmt', 'yuv420p', |
|
video_path |
|
] |
|
subprocess.run(cmd, check=True) |
|
|
|
|
|
os.remove(image_path) |
|
|
|
return video_path |
|
|
|
def process_video(voice, url, text, progress=gr.Progress()): |
|
session_id = str(uuid.uuid4()) |
|
progress(0, desc="Generating speech...") |
|
audio_path = text_to_speech(voice, text, session_id) |
|
if not audio_path: |
|
return None, "Failed to generate speech audio." |
|
|
|
progress(0.2, desc="Processing media...") |
|
|
|
try: |
|
|
|
response = requests.head(url) |
|
content_type = response.headers.get('Content-Type', '') |
|
|
|
if content_type.startswith('image'): |
|
progress(0.3, desc="Converting image to video...") |
|
video_path = create_video_from_image(url, session_id) |
|
video_url = upload_file(video_path) |
|
else: |
|
video_url = url |
|
|
|
progress(0.4, desc="Uploading audio...") |
|
audio_url = upload_file(audio_path) |
|
|
|
if not audio_url or not video_url: |
|
raise Exception("Failed to upload audio or video file") |
|
|
|
progress(0.5, desc="Initiating lipsync...") |
|
job_data = lipsync_api_call(video_url, audio_url) |
|
|
|
if "error" in job_data: |
|
raise Exception(job_data.get("error", "Unknown error")) |
|
|
|
prediction_id = job_data["id"] |
|
|
|
progress(0.6, desc="Processing lipsync...") |
|
result_url = check_job_status(prediction_id) |
|
|
|
if result_url: |
|
progress(0.9, desc="Downloading result...") |
|
response = requests.get(result_url) |
|
output_path = f"output{session_id}.mp4" |
|
with open(output_path, "wb") as f: |
|
f.write(response.content) |
|
progress(1.0, desc="Complete!") |
|
return output_path, "Lipsync completed successfully!" |
|
else: |
|
raise Exception("Lipsync processing failed or timed out") |
|
|
|
except Exception as e: |
|
progress(0.8, desc="Falling back to simple combination...") |
|
try: |
|
if 'video_path' not in locals(): |
|
|
|
video_response = requests.get(video_url) |
|
video_path = f"tempvideo{session_id}.mp4" |
|
with open(video_path, "wb") as f: |
|
f.write(video_response.content) |
|
|
|
output_path = f"output{session_id}.mp4" |
|
combine_audio_video(video_path, audio_path, output_path) |
|
progress(1.0, desc="Complete!") |
|
return output_path, f"Used fallback method. Original error: {str(e)}" |
|
except Exception as fallback_error: |
|
return None, f"All methods failed. Error: {str(fallback_error)}" |
|
finally: |
|
|
|
if os.path.exists(audio_path): |
|
os.remove(audio_path) |
|
if os.path.exists(f"tempvideo{session_id}.mp4"): |
|
os.remove(f"tempvideo{session_id}.mp4") |
|
|
|
def create_interface(): |
|
voices = get_voices() |
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("# Generator") |
|
with gr.Row(): |
|
with gr.Column(): |
|
voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select Voice", value=voices[0][0] if voices else None) |
|
url_input = gr.Textbox(label="Enter Video or Image URL") |
|
text_input = gr.Textbox(label="Enter text", lines=3) |
|
generate_btn = gr.Button("Generate Video") |
|
with gr.Column(): |
|
video_output = gr.Video(label="Generated Video") |
|
status_output = gr.Textbox(label="Status", interactive=False) |
|
def on_generate(voice_name, url, text): |
|
voice_id = next((v[1] for v in voices if v[0] == voice_name), None) |
|
if not voice_id: |
|
return None, "Invalid voice selected." |
|
return process_video(voice_id, url, text) |
|
generate_btn.click( |
|
fn=on_generate, |
|
inputs=[voice_dropdown, url_input, text_input], |
|
outputs=[video_output, status_output] |
|
) |
|
return app |
|
|
|
if __name__ == "__main__": |
|
app = create_interface() |
|
app.launch() |