Spaces:
Sleeping
Sleeping
import gradio as gr | |
import base64 | |
import mimetypes | |
import os | |
import re | |
import struct | |
import tempfile | |
import asyncio | |
import logging | |
from google import genai | |
from google.genai import types | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Direct API key - WARNING: This is not recommended for production use | |
GEMINI_API_KEY = "AIzaSyDy5hjn9NFamWhBjqsVsD2WSoFNr2MrHSw" | |
def save_binary_file(file_name, data): | |
"""Save binary data to a file.""" | |
with open(file_name, "wb") as f: | |
f.write(data) | |
return file_name | |
def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes: | |
"""Generates a WAV file header for the given audio data and parameters.""" | |
parameters = parse_audio_mime_type(mime_type) | |
bits_per_sample = parameters["bits_per_sample"] | |
sample_rate = parameters["rate"] | |
num_channels = 1 | |
data_size = len(audio_data) | |
bytes_per_sample = bits_per_sample // 8 | |
block_align = num_channels * bytes_per_sample | |
byte_rate = sample_rate * block_align | |
chunk_size = 36 + data_size | |
header = struct.pack( | |
"<4sI4s4sIHHIIHH4sI", | |
b"RIFF", # ChunkID | |
chunk_size, # ChunkSize (total file size - 8 bytes) | |
b"WAVE", # Format | |
b"fmt ", # Subchunk1ID | |
16, # Subchunk1Size (16 for PCM) | |
1, # AudioFormat (1 for PCM) | |
num_channels, # NumChannels | |
sample_rate, # SampleRate | |
byte_rate, # ByteRate | |
block_align, # BlockAlign | |
bits_per_sample, # BitsPerSample | |
b"data", # Subchunk2ID | |
data_size # Subchunk2Size (size of audio data) | |
) | |
return header + audio_data | |
def parse_audio_mime_type(mime_type: str) -> dict: | |
"""Parses bits per sample and rate from an audio MIME type string.""" | |
bits_per_sample = 16 | |
rate = 24000 | |
parts = mime_type.split(";") | |
for param in parts: | |
param = param.strip() | |
if param.lower().startswith("rate="): | |
try: | |
rate_str = param.split("=", 1)[1] | |
rate = int(rate_str) | |
except (ValueError, IndexError): | |
pass | |
elif param.startswith("audio/L"): | |
try: | |
bits_per_sample = int(param.split("L", 1)[1]) | |
except (ValueError, IndexError): | |
pass | |
return {"bits_per_sample": bits_per_sample, "rate": rate} | |
def fetch_web_content(url, progress=None): | |
"""Fetch and analyze web content using Gemini with tools.""" | |
try: | |
if progress: | |
progress(0.1, desc="Initializing Gemini client...") | |
logger.info("Initializing Gemini client...") | |
if not GEMINI_API_KEY: | |
raise ValueError("GEMINI_API_KEY is not set") | |
client = genai.Client(api_key=GEMINI_API_KEY) | |
if progress: | |
progress(0.2, desc="Fetching web content...") | |
logger.info(f"Fetching content from URL: {url}") | |
model = "gemini-2.0-flash-exp" # Updated model name | |
contents = [ | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_text(text=f"""Please analyze the content from this URL: {url} | |
Create a comprehensive summary that would be suitable for a podcast discussion between two hosts. | |
Focus on the key points, interesting aspects, and discussion-worthy topics. | |
Format your response as a natural conversation between two podcast hosts discussing the content."""), | |
], | |
), | |
] | |
tools = [ | |
types.Tool(url_context=types.UrlContext()), | |
types.Tool(google_search=types.GoogleSearch()), | |
] | |
generate_content_config = types.GenerateContentConfig( | |
tools=tools, | |
response_mime_type="text/plain", | |
) | |
if progress: | |
progress(0.4, desc="Analyzing content with AI...") | |
logger.info("Generating content with Gemini...") | |
content_text = "" | |
for chunk in client.models.generate_content_stream( | |
model=model, | |
contents=contents, | |
config=generate_content_config, | |
): | |
if chunk.text: | |
content_text += chunk.text | |
if progress: | |
progress(0.6, desc="Content analysis complete!") | |
logger.info(f"Content generation complete. Length: {len(content_text)} characters") | |
return content_text | |
except Exception as e: | |
logger.error(f"Error in fetch_web_content: {e}") | |
raise e | |
def generate_podcast_from_content(content_text, speaker1_name="Anna Chope", speaker2_name="Adam Chan", progress=None): | |
"""Generate audio podcast from text content.""" | |
try: | |
if progress: | |
progress(0.7, desc="Generating podcast audio...") | |
logger.info("Starting audio generation...") | |
if not GEMINI_API_KEY: | |
raise ValueError("GEMINI_API_KEY is not set") | |
client = genai.Client(api_key=GEMINI_API_KEY) | |
model = "gemini-2.0-flash-exp" # Updated model name | |
podcast_prompt = f"""Please read aloud the following content in a natural podcast interview style with two distinct speakers. | |
Make it sound conversational and engaging: | |
{content_text} | |
If the content is not already in dialogue format, please convert it into a natural conversation between two podcast hosts Speaker 1 {speaker1_name} and Speaker 2 {speaker2_name} discussing the topic. They should introduce themselves at the beginning.""" | |
contents = [ | |
types.Content( | |
role="user", | |
parts=[ | |
types.Part.from_text(text=podcast_prompt), | |
], | |
), | |
] | |
generate_content_config = types.GenerateContentConfig( | |
temperature=1, | |
response_modalities=[ | |
"audio", | |
], | |
speech_config=types.SpeechConfig( | |
multi_speaker_voice_config=types.MultiSpeakerVoiceConfig( | |
speaker_voice_configs=[ | |
types.SpeakerVoiceConfig( | |
speaker="Speaker 1", | |
voice_config=types.VoiceConfig( | |
prebuilt_voice_config=types.PrebuiltVoiceConfig( | |
voice_name="Zephyr" | |
) | |
), | |
), | |
types.SpeakerVoiceConfig( | |
speaker="Speaker 2", | |
voice_config=types.VoiceConfig( | |
prebuilt_voice_config=types.PrebuiltVoiceConfig( | |
voice_name="Puck" | |
) | |
), | |
), | |
] | |
), | |
), | |
) | |
if progress: | |
progress(0.8, desc="Converting to audio...") | |
logger.info("Generating audio stream...") | |
# Create temporary file | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
temp_file.close() | |
audio_chunks = [] | |
for chunk in client.models.generate_content_stream( | |
model=model, | |
contents=contents, | |
config=generate_content_config, | |
): | |
if ( | |
chunk.candidates is None | |
or chunk.candidates[0].content is None | |
or chunk.candidates[0].content.parts is None | |
): | |
continue | |
if (chunk.candidates[0].content.parts[0].inline_data and | |
chunk.candidates[0].content.parts[0].inline_data.data): | |
inline_data = chunk.candidates[0].content.parts[0].inline_data | |
data_buffer = inline_data.data | |
# Convert to WAV if needed | |
if inline_data.mime_type != "audio/wav": | |
data_buffer = convert_to_wav(inline_data.data, inline_data.mime_type) | |
audio_chunks.append(data_buffer) | |
# Combine all audio chunks | |
if audio_chunks: | |
# For simplicity, just use the first chunk (you might want to concatenate them) | |
final_audio = audio_chunks[0] | |
save_binary_file(temp_file.name, final_audio) | |
if progress: | |
progress(1.0, desc="Podcast generated successfully!") | |
logger.info(f"Audio file saved: {temp_file.name}") | |
return temp_file.name | |
else: | |
raise ValueError("No audio data generated") | |
except Exception as e: | |
logger.error(f"Error in generate_podcast_from_content: {e}") | |
raise e | |
def generate_web_podcast(url, speaker1_name, speaker2_name, progress=None): | |
"""Main function to fetch web content and generate podcast.""" | |
try: | |
if progress: | |
progress(0.0, desc="Starting podcast generation...") | |
logger.info(f"Starting podcast generation for URL: {url}") | |
# Validate inputs | |
if not url or not url.strip(): | |
raise ValueError("Please enter a valid URL") | |
if not url.startswith(('http://', 'https://')): | |
raise ValueError("Please enter a valid URL starting with http:// or https://") | |
if not speaker1_name or not speaker1_name.strip(): | |
speaker1_name = "Anna Chope" | |
if not speaker2_name or not speaker2_name.strip(): | |
speaker2_name = "Adam Chan" | |
# Step 1: Fetch and analyze web content | |
content_text = fetch_web_content(url.strip(), progress) | |
if not content_text or len(content_text.strip()) < 50: | |
raise ValueError("Unable to extract sufficient content from the URL") | |
# Step 2: Generate podcast from the content | |
audio_file = generate_podcast_from_content(content_text, speaker1_name.strip(), speaker2_name.strip(), progress) | |
logger.info("Podcast generation completed successfully") | |
return audio_file, "β Podcast generated successfully!", content_text | |
except Exception as e: | |
error_msg = f"β Error generating podcast: {str(e)}" | |
logger.error(f"Error in generate_web_podcast: {e}") | |
return None, error_msg, "" | |
# Create Gradio interface | |
def create_interface(): | |
"""Create and return the Gradio interface.""" | |
with gr.Blocks( | |
title="ποΈ Web-to-Podcast Generator", | |
theme=gr.themes.Soft(), | |
analytics_enabled=False | |
) as demo: | |
gr.Markdown(""" | |
# ποΈ Web-to-Podcast Generator | |
Transform any website into an engaging podcast conversation between two AI hosts! | |
Simply paste a URL and let AI create a natural dialogue discussing the content. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
url_input = gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com", | |
info="Enter the URL of the website you want to convert to a podcast", | |
lines=1 | |
) | |
with gr.Row(): | |
speaker1_input = gr.Textbox( | |
label="Host 1 Name", | |
value="Anna Chope", | |
info="Name of the first podcast host", | |
lines=1 | |
) | |
speaker2_input = gr.Textbox( | |
label="Host 2 Name", | |
value="Adam Chan", | |
info="Name of the second podcast host", | |
lines=1 | |
) | |
generate_btn = gr.Button("ποΈ Generate Podcast", variant="primary", size="lg") | |
with gr.Column(scale=1): | |
gr.Markdown(""" | |
### Instructions: | |
1. Enter a website URL | |
2. Customize host names (optional) | |
3. Click "Generate Podcast" | |
4. Wait for the AI to analyze content and create audio | |
5. Download your podcast! | |
### Examples: | |
- News articles | |
- Blog posts | |
- Product pages | |
- Documentation | |
- Research papers | |
""") | |
with gr.Row(): | |
status_output = gr.Textbox(label="Status", interactive=False, lines=2) | |
with gr.Row(): | |
audio_output = gr.Audio(label="Generated Podcast", type="filepath") | |
with gr.Accordion("π Generated Script Preview", open=False): | |
script_output = gr.Textbox( | |
label="Podcast Script", | |
lines=10, | |
interactive=False, | |
info="Preview of the conversation script generated from the website content" | |
) | |
# Event handlers | |
generate_btn.click( | |
fn=generate_web_podcast, | |
inputs=[url_input, speaker1_input, speaker2_input], | |
outputs=[audio_output, status_output, script_output], | |
show_progress=True | |
) | |
# Examples | |
gr.Examples( | |
examples=[ | |
["https://github.com/weaviate/weaviate", "Anna", "Adam"], | |
["https://huggingface.co/blog", "Sarah", "Mike"], | |
["https://openai.com/blog", "Emma", "John"], | |
], | |
inputs=[url_input, speaker1_input, speaker2_input], | |
) | |
gr.Markdown(""" | |
--- | |
**Note:** API key is now directly embedded in the code for convenience. | |
The generated podcast will feature two AI voices having a natural conversation about the website content. | |
""") | |
return demo | |
if __name__ == "__main__": | |
try: | |
logger.info("Starting Web-to-Podcast Generator...") | |
demo = create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False, | |
show_error=True | |
) | |
except Exception as e: | |
logger.error(f"Failed to launch application: {e}") | |
print(f"Error: {e}") | |
raise e |