jsonop / app.py
sheikhed's picture
Update app.py
dc2c5a4 verified
import requests
import json
import time
import subprocess
import gradio as gr
import uuid
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# API Keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN")
# URLs
REPLICATE_API_URL = "https://api.replicate.com/v1/predictions"
UPLOAD_URL = os.getenv("UPLOAD_URL")
def get_voices():
# OpenAI TTS voices
return [
("alloy", "alloy"),
("echo", "echo"),
("fable", "fable"),
("onyx", "onyx"),
("nova", "nova"),
("shimmer", "shimmer")
]
def text_to_speech(voice, text, session_id):
url = "https://api.openai.com/v1/audio/speech"
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "tts-1",
"input": text,
"voice": voice
}
response = requests.post(url, json=data, headers=headers)
if response.status_code != 200:
return None
# Save temporary audio file with session ID
audio_file_path = f'tempvoice{session_id}.mp3'
with open(audio_file_path, 'wb') as audio_file:
audio_file.write(response.content)
return audio_file_path
def upload_file(file_path):
with open(file_path, 'rb') as file:
files = {'fileToUpload': (os.path.basename(file_path), file)}
data = {'reqtype': 'fileupload'}
response = requests.post(UPLOAD_URL, files=files, data=data)
if response.status_code == 200:
return response.text.strip()
return None
def lipsync_api_call(video_url, audio_url):
headers = {
"Authorization": f"Bearer {REPLICATE_API_TOKEN}",
"Content-Type": "application/json",
"Prefer": "wait"
}
data = {
"version": "db5a650c807b007dc5f9e5abe27c53e1b62880d1f94d218d27ce7fa802711d67",
"input": {
"face": video_url,
"input_audio": audio_url
}
}
response = requests.post(REPLICATE_API_URL, headers=headers, json=data)
return response.json()
def check_job_status(prediction_id):
headers = {"Authorization": f"Bearer {REPLICATE_API_TOKEN}"}
max_attempts = 30 # Limit the number of attempts
for _ in range(max_attempts):
response = requests.get(f"{REPLICATE_API_URL}/{prediction_id}", headers=headers)
data = response.json()
if data["status"] == "succeeded":
return data["output"]
elif data["status"] == "failed":
return None
time.sleep(10)
return None
def get_media_duration(file_path):
# Fetch media duration using ffprobe
cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return float(result.stdout.strip())
def combine_audio_video(video_path, audio_path, output_path):
# Get durations of both video and audio
video_duration = get_media_duration(video_path)
audio_duration = get_media_duration(audio_path)
if video_duration > audio_duration:
# Trim video to match the audio length
cmd = [
'ffmpeg', '-i', video_path, '-i', audio_path,
'-t', str(audio_duration), # Trim video to audio duration
'-map', '0:v', '-map', '1:a',
'-c:v', 'copy', '-c:a', 'aac',
'-y', output_path
]
else:
# Loop video if it's shorter than audio
loop_count = int(audio_duration // video_duration) + 1 # Calculate how many times to loop
cmd = [
'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path,
'-t', str(audio_duration), # Match the duration of the final video with the audio
'-map', '0:v', '-map', '1:a',
'-c:v', 'copy', '-c:a', 'aac',
'-shortest', '-y', output_path
]
subprocess.run(cmd, check=True)
def create_video_from_image(image_url, session_id):
# Download the image
response = requests.get(image_url)
image_path = f"tempimage{session_id}.jpg"
with open(image_path, "wb") as f:
f.write(response.content)
# Create a 10-second video from the image
video_path = f"tempvideo{session_id}.mp4"
cmd = [
'ffmpeg', '-loop', '1', '-i', image_path,
'-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', # Ensure width and height are divisible by 2
'-c:v', 'libx264', '-t', '10', '-pix_fmt', 'yuv420p',
video_path
]
subprocess.run(cmd, check=True)
# Clean up the temporary image file
os.remove(image_path)
return video_path
def process_video(voice, url, text, progress=gr.Progress()):
session_id = str(uuid.uuid4()) # Generate a unique session ID
progress(0, desc="Generating speech...")
audio_path = text_to_speech(voice, text, session_id)
if not audio_path:
return None, "Failed to generate speech audio."
progress(0.2, desc="Processing media...")
try:
# Check if the URL is an image
response = requests.head(url)
content_type = response.headers.get('Content-Type', '')
if content_type.startswith('image'):
progress(0.3, desc="Converting image to video...")
video_path = create_video_from_image(url, session_id)
video_url = upload_file(video_path)
else:
video_url = url
progress(0.4, desc="Uploading audio...")
audio_url = upload_file(audio_path)
if not audio_url or not video_url:
raise Exception("Failed to upload audio or video file")
progress(0.5, desc="Initiating lipsync...")
job_data = lipsync_api_call(video_url, audio_url)
if "error" in job_data:
raise Exception(job_data.get("error", "Unknown error"))
prediction_id = job_data["id"]
progress(0.6, desc="Processing lipsync...")
result_url = check_job_status(prediction_id)
if result_url:
progress(0.9, desc="Downloading result...")
response = requests.get(result_url)
output_path = f"output{session_id}.mp4"
with open(output_path, "wb") as f:
f.write(response.content)
progress(1.0, desc="Complete!")
return output_path, "Lipsync completed successfully!"
else:
raise Exception("Lipsync processing failed or timed out")
except Exception as e:
progress(0.8, desc="Falling back to simple combination...")
try:
if 'video_path' not in locals():
# Download the video from the URL if it wasn't created from an image
video_response = requests.get(video_url)
video_path = f"tempvideo{session_id}.mp4"
with open(video_path, "wb") as f:
f.write(video_response.content)
output_path = f"output{session_id}.mp4"
combine_audio_video(video_path, audio_path, output_path)
progress(1.0, desc="Complete!")
return output_path, f"Used fallback method. Original error: {str(e)}"
except Exception as fallback_error:
return None, f"All methods failed. Error: {str(fallback_error)}"
finally:
# Cleanup
if os.path.exists(audio_path):
os.remove(audio_path)
if os.path.exists(f"tempvideo{session_id}.mp4"):
os.remove(f"tempvideo{session_id}.mp4")
def create_interface():
voices = get_voices()
with gr.Blocks() as app:
gr.Markdown("# Generator")
with gr.Row():
with gr.Column():
voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select Voice", value=voices[0][0] if voices else None)
url_input = gr.Textbox(label="Enter Video or Image URL")
text_input = gr.Textbox(label="Enter text", lines=3)
generate_btn = gr.Button("Generate Video")
with gr.Column():
video_output = gr.Video(label="Generated Video")
status_output = gr.Textbox(label="Status", interactive=False)
def on_generate(voice_name, url, text):
voice_id = next((v[1] for v in voices if v[0] == voice_name), None)
if not voice_id:
return None, "Invalid voice selected."
return process_video(voice_id, url, text)
generate_btn.click(
fn=on_generate,
inputs=[voice_dropdown, url_input, text_input],
outputs=[video_output, status_output]
)
return app
if __name__ == "__main__":
app = create_interface()
app.launch()