Spaces:
Running
Running
import json | |
import asyncio | |
import edge_tts | |
from pydub import AudioSegment | |
import os | |
import gradio as gr | |
from gradio_client import Client | |
import shutil | |
import uuid | |
from dotenv import load_dotenv | |
import re | |
load_dotenv() | |
def sanitize_filename(filename): | |
"""Convert a string to a safe filename by removing special characters and spaces""" | |
safe_filename = re.sub(r'[^a-zA-Z0-9_-]', '', filename.replace(' ', '_')) | |
return safe_filename.lower()[:50] | |
async def get_voices(): | |
"""Get all available English voices from edge-tts""" | |
voices = await edge_tts.list_voices() | |
english_voices = [ | |
voice for voice in voices | |
if voice["Locale"].startswith(("en-US", "en-GB", "en-AU", "en-CA", "en-IN")) | |
] | |
formatted_voices = [ | |
f"{voice['ShortName']} ({voice['Gender']}, {voice['Locale']})" | |
for voice in english_voices | |
] | |
return formatted_voices | |
def extract_voice_name(voice_string): | |
"""Extract the voice short name from the formatted string""" | |
return voice_string.split(" (")[0] | |
async def generate_audio(text, voice, filename): | |
communicate = edge_tts.Communicate(text, extract_voice_name(voice)) | |
await communicate.save(filename) | |
async def create_podcast_versions(data, speaker1_name, speaker2_name, speaker1_voice, speaker2_voice, title): | |
session_id = str(uuid.uuid4()) | |
temp_dir = f'temp_{session_id}' | |
safe_title = sanitize_filename(title) | |
if not os.path.exists(temp_dir): | |
os.makedirs(temp_dir) | |
try: | |
speaker1_version = AudioSegment.empty() | |
speaker2_version = AudioSegment.empty() | |
combined_version = AudioSegment.empty() | |
for i, entry in enumerate(data['conversation']): | |
if 'speaker1text' in entry: | |
temp_file = f'{temp_dir}/speaker1_{i}.mp3' | |
await generate_audio(entry['speaker1text'], speaker1_voice, temp_file) | |
audio = AudioSegment.from_file(temp_file) | |
speaker1_version += audio | |
speaker2_version += AudioSegment.silent(duration=len(audio)) | |
combined_version += audio | |
os.remove(temp_file) | |
if 'speaker2text' in entry: | |
temp_file = f'{temp_dir}/speaker2_{i}.mp3' | |
await generate_audio(entry['speaker2text'], speaker2_voice, temp_file) | |
audio = AudioSegment.from_file(temp_file) | |
speaker2_version += audio | |
speaker1_version += AudioSegment.silent(duration=len(audio)) | |
combined_version += audio | |
os.remove(temp_file) | |
speaker1_path = f"{safe_title}_{speaker1_name.lower()}_only.mp3" | |
speaker2_path = f"{safe_title}_{speaker2_name.lower()}_only.mp3" | |
combined_path = f"{safe_title}_combined.mp3" | |
speaker1_version.export(speaker1_path, format="mp3") | |
speaker2_version.export(speaker2_path, format="mp3") | |
combined_version.export(combined_path, format="mp3") | |
return speaker1_path, speaker2_path, combined_path, temp_dir | |
except Exception as e: | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
raise e | |
def generate_podcast(title, channel_name, speaker1_name, speaker2_name, speaker1_voice, speaker2_voice): | |
try: | |
if not all([title, channel_name, speaker1_name, speaker2_name, speaker1_voice, speaker2_voice]): | |
raise ValueError("All fields must be filled out") | |
client = Client(os.getenv('API_URL')) | |
result = client.predict( | |
message=f"""{os.getenv('API_MESSAGE')} {{ | |
"title": "{title}", | |
"channel": "{channel_name}", | |
"speaker1": "{speaker1_name}", | |
"speaker2": "{speaker2_name}", | |
"conversation": [ | |
{{ | |
"speaker1text": "" | |
}}, | |
{{ | |
"speaker2text": "" | |
}} | |
] | |
}} | |
give 36 sentences for both. | |
""", | |
request=os.getenv('API_REQUEST'), | |
param_3=0.5, | |
param_4=8100, | |
param_5=0.5, | |
param_6=0, | |
api_name="/chat" | |
) | |
try: | |
podcast_data = json.loads(result) | |
except json.JSONDecodeError: | |
json_start = result.find('```') + 3 | |
json_end = result.rfind('```') | |
if json_start > 2 and json_end > json_start: | |
if result[json_start:json_start+4] == 'json': | |
json_start = result.find('\n', json_start) + 1 | |
json_str = result[json_start:json_end].strip() | |
podcast_data = json.loads(json_str) | |
else: | |
raise ValueError("Could not parse JSON from response") | |
speaker1_path, speaker2_path, combined_path, temp_dir = asyncio.run( | |
create_podcast_versions( | |
podcast_data, | |
speaker1_name, | |
speaker2_name, | |
speaker1_voice, | |
speaker2_voice, | |
title | |
) | |
) | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
return [ | |
speaker1_path, | |
speaker2_path, | |
combined_path, | |
podcast_data | |
] | |
except Exception as e: | |
return [ | |
None, | |
None, | |
None, | |
f"Error: {str(e)}" | |
] | |
with gr.Blocks(theme=gr.themes.Soft()) as interface: | |
available_voices = asyncio.run(get_voices()) | |
gr.Markdown("# Easy Podcast") | |
gr.Markdown("Generate a podcast conversation between two speakers on any topic. Choose voices and customize speaker details to create your perfect podcast.<br>To use elevelabs voices or cloned voices, or to automate the podcast video creation with avatar contact me at aheedsajid@gmail.com<br>Support me USDT (TRC-20) (TAe7hsSVWtMEYz3G5V1UiUdYPQVqm28bKx)") | |
with gr.Row(): | |
with gr.Column(): | |
title = gr.Textbox( | |
label="Podcast Topic", | |
placeholder="e.g., The Future of AI", | |
show_label=True | |
) | |
channel_name = gr.Textbox( | |
label="Channel Name", | |
placeholder="e.g., TechTalks", | |
value="WeePakistan", | |
show_label=True | |
) | |
with gr.Column(): | |
speaker1_name = gr.Textbox( | |
label="First Speaker Name", | |
placeholder="e.g., John", | |
value="Andrew", | |
show_label=True | |
) | |
speaker2_name = gr.Textbox( | |
label="Second Speaker Name", | |
placeholder="e.g., Sarah", | |
value="Priya", | |
show_label=True | |
) | |
with gr.Row(): | |
with gr.Column(): | |
speaker1_voice = gr.Dropdown( | |
choices=available_voices, | |
value=next((v for v in available_voices if "Andrew" in v), available_voices[0]), | |
label="First Speaker Voice", | |
info="Select voice for the first speaker" | |
) | |
with gr.Column(): | |
speaker2_voice = gr.Dropdown( | |
choices=available_voices, | |
value=next((v for v in available_voices if "Ava" in v), available_voices[0]), | |
label="Second Speaker Voice", | |
info="Select voice for the second speaker" | |
) | |
generate_btn = gr.Button("Generate Podcast", variant="primary") | |
with gr.Row(): | |
speaker1_audio = gr.Audio(label="First Speaker Audio") | |
speaker2_audio = gr.Audio(label="Second Speaker Audio") | |
combined_audio = gr.Audio(label="Combined Audio") | |
conversation_json = gr.JSON(label="Generated Conversation") | |
generate_btn.click( | |
fn=generate_podcast, | |
inputs=[ | |
title, | |
channel_name, | |
speaker1_name, | |
speaker2_name, | |
speaker1_voice, | |
speaker2_voice | |
], | |
outputs=[ | |
speaker1_audio, | |
speaker2_audio, | |
combined_audio, | |
conversation_json | |
] | |
) | |
if __name__ == "__main__": | |
interface.launch() |