import os import requests import json import time import subprocess import gradio as gr import uuid from dotenv import load_dotenv # Load environment variables load_dotenv() # API Keys A_KEY = os.getenv("A_KEY") B_KEY = os.getenv("B_KEY") # URLs API_URL = os.getenv("API_URL") UPLOAD_URL = os.getenv("UPLOAD_URL") def get_voices(): url = "https://api.elevenlabs.io/v1/voices" headers = { "Accept": "application/json", "xi-api-key": A_KEY } response = requests.get(url, headers=headers) if response.status_code != 200: return [] return [(voice['name'], voice['voice_id']) for voice in response.json().get('voices', [])] def get_video_models(): return [f for f in os.listdir("models") if f.endswith((".mp4", ".avi", ".mov"))] def text_to_speech(voice_id, text, session_id): url = f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}" headers = { "Accept": "audio/mpeg", "Content-Type": "application/json", "xi-api-key": A_KEY } data = { "text": text, "model_id": "eleven_turbo_v2_5", "voice_settings": { "stability": 0.5, "similarity_boost": 0.5 } } response = requests.post(url, json=data, headers=headers) if response.status_code != 200: return None # Save temporary audio file with session ID audio_file_path = f'temp_voice_{session_id}.mp3' with open(audio_file_path, 'wb') as audio_file: audio_file.write(response.content) return audio_file_path def upload_file(file_path): with open(file_path, 'rb') as file: files = {'fileToUpload': (os.path.basename(file_path), file)} data = {'reqtype': 'fileupload'} response = requests.post(UPLOAD_URL, files=files, data=data) if response.status_code == 200: return response.text.strip() return None def lipsync_api_call(video_url, audio_url): headers = { "Content-Type": "application/json", "x-api-key": B_KEY } data = { "audioUrl": audio_url, "videoUrl": video_url, "maxCredits": 1000, "model": "sync-1.6.0", "synergize": True, "pads": [0, 5, 0, 0], "synergizerStrength": 1 } response = requests.post(API_URL, headers=headers, data=json.dumps(data)) return response.json() def check_job_status(job_id): headers = {"x-api-key": B_KEY} max_attempts = 30 # Limit the number of attempts for _ in range(max_attempts): response = requests.get(f"{API_URL}/{job_id}", headers=headers) data = response.json() if data["status"] == "COMPLETED": return data["videoUrl"] elif data["status"] == "FAILED": return None time.sleep(10) return None def combine_audio_video(video_path, audio_path, output_path): cmd = [ 'ffmpeg', '-i', video_path, '-i', audio_path, '-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', '-shortest', '-y', output_path ] subprocess.run(cmd, check=True) def process_video(voice, model, text, progress=gr.Progress()): session_id = str(uuid.uuid4()) # Generate a unique session ID progress(0, desc="Generating speech...") audio_path = text_to_speech(voice, text, session_id) if not audio_path: return None, "Failed to generate speech audio." progress(0.2, desc="Processing video...") video_path = os.path.join("models", model) try: progress(0.3, desc="Uploading files...") video_url = upload_file(video_path) audio_url = upload_file(audio_path) if not video_url or not audio_url: raise Exception("Failed to upload files") progress(0.4, desc="Initiating lipsync...") job_data = lipsync_api_call(video_url, audio_url) if "error" in job_data or "message" in job_data: raise Exception(job_data.get("error", job_data.get("message", "Unknown error"))) job_id = job_data["id"] progress(0.5, desc="Processing lipsync...") result_url = check_job_status(job_id) if result_url: progress(0.9, desc="Downloading result...") response = requests.get(result_url) output_path = f"output_{session_id}.mp4" with open(output_path, "wb") as f: f.write(response.content) progress(1.0, desc="Complete!") return output_path, "Lipsync completed successfully!" else: raise Exception("Lipsync processing failed or timed out") except Exception as e: progress(0.8, desc="Falling back to simple combination...") try: output_path = f"output_{session_id}.mp4" combine_audio_video(video_path, audio_path, output_path) progress(1.0, desc="Complete!") return output_path, f"Used fallback method. Original error: {str(e)}" except Exception as fallback_error: return None, f"All methods failed. Error: {str(fallback_error)}" finally: # Cleanup if os.path.exists(audio_path): os.remove(audio_path) def create_interface(): voices = get_voices() models = get_video_models() with gr.Blocks() as app: gr.Markdown("# JSON Train") with gr.Row(): with gr.Column(): voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select", value=voices[0][0] if voices else None) model_dropdown = gr.Dropdown(choices=models, label="Select", value=models[0] if models else None) text_input = gr.Textbox(label="json", lines=3) generate_btn = gr.Button("Generate Video") with gr.Column(): video_output = gr.Video(label="Generated Video") status_output = gr.Textbox(label="Status", interactive=False) def on_generate(voice_name, model_name, text): voice_id = next((v[1] for v in voices if v[0] == voice_name), None) if not voice_id: return None, "Invalid voice selected." return process_video(voice_id, model_name, text) generate_btn.click( fn=on_generate, inputs=[voice_dropdown, model_dropdown, text_input], outputs=[video_output, status_output] ) return app if __name__ == "__main__": app = create_interface() app.launch()