knowledgeCast / app.py
nihalaninihal's picture
Update app.py
34bfe42 verified
import gradio as gr
import base64
import mimetypes
import os
import re
import struct
import tempfile
import asyncio
import logging
from google import genai
from google.genai import types
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Direct API key - WARNING: This is not recommended for production use
GEMINI_API_KEY = "AIzaSyDy5hjn9NFamWhBjqsVsD2WSoFNr2MrHSw"
def save_binary_file(file_name, data):
"""Save binary data to a file."""
with open(file_name, "wb") as f:
f.write(data)
return file_name
def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes:
"""Generates a WAV file header for the given audio data and parameters."""
parameters = parse_audio_mime_type(mime_type)
bits_per_sample = parameters["bits_per_sample"]
sample_rate = parameters["rate"]
num_channels = 1
data_size = len(audio_data)
bytes_per_sample = bits_per_sample // 8
block_align = num_channels * bytes_per_sample
byte_rate = sample_rate * block_align
chunk_size = 36 + data_size
header = struct.pack(
"<4sI4s4sIHHIIHH4sI",
b"RIFF", # ChunkID
chunk_size, # ChunkSize (total file size - 8 bytes)
b"WAVE", # Format
b"fmt ", # Subchunk1ID
16, # Subchunk1Size (16 for PCM)
1, # AudioFormat (1 for PCM)
num_channels, # NumChannels
sample_rate, # SampleRate
byte_rate, # ByteRate
block_align, # BlockAlign
bits_per_sample, # BitsPerSample
b"data", # Subchunk2ID
data_size # Subchunk2Size (size of audio data)
)
return header + audio_data
def parse_audio_mime_type(mime_type: str) -> dict:
"""Parses bits per sample and rate from an audio MIME type string."""
bits_per_sample = 16
rate = 24000
parts = mime_type.split(";")
for param in parts:
param = param.strip()
if param.lower().startswith("rate="):
try:
rate_str = param.split("=", 1)[1]
rate = int(rate_str)
except (ValueError, IndexError):
pass
elif param.startswith("audio/L"):
try:
bits_per_sample = int(param.split("L", 1)[1])
except (ValueError, IndexError):
pass
return {"bits_per_sample": bits_per_sample, "rate": rate}
def fetch_web_content(url, progress=None):
"""Fetch and analyze web content using Gemini with tools."""
try:
if progress:
progress(0.1, desc="Initializing Gemini client...")
logger.info("Initializing Gemini client...")
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY is not set")
client = genai.Client(api_key=GEMINI_API_KEY)
if progress:
progress(0.2, desc="Fetching web content...")
logger.info(f"Fetching content from URL: {url}")
model = "gemini-2.0-flash-exp" # Updated model name
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=f"""Please analyze the content from this URL: {url}
Create a comprehensive summary that would be suitable for a podcast discussion between two hosts.
Focus on the key points, interesting aspects, and discussion-worthy topics.
Format your response as a natural conversation between two podcast hosts discussing the content."""),
],
),
]
tools = [
types.Tool(url_context=types.UrlContext()),
types.Tool(google_search=types.GoogleSearch()),
]
generate_content_config = types.GenerateContentConfig(
tools=tools,
response_mime_type="text/plain",
)
if progress:
progress(0.4, desc="Analyzing content with AI...")
logger.info("Generating content with Gemini...")
content_text = ""
for chunk in client.models.generate_content_stream(
model=model,
contents=contents,
config=generate_content_config,
):
if chunk.text:
content_text += chunk.text
if progress:
progress(0.6, desc="Content analysis complete!")
logger.info(f"Content generation complete. Length: {len(content_text)} characters")
return content_text
except Exception as e:
logger.error(f"Error in fetch_web_content: {e}")
raise e
def generate_podcast_from_content(content_text, speaker1_name="Anna Chope", speaker2_name="Adam Chan", progress=None):
"""Generate audio podcast from text content."""
try:
if progress:
progress(0.7, desc="Generating podcast audio...")
logger.info("Starting audio generation...")
if not GEMINI_API_KEY:
raise ValueError("GEMINI_API_KEY is not set")
client = genai.Client(api_key=GEMINI_API_KEY)
model = "gemini-2.0-flash-exp" # Updated model name
podcast_prompt = f"""Please read aloud the following content in a natural podcast interview style with two distinct speakers.
Make it sound conversational and engaging:
{content_text}
If the content is not already in dialogue format, please convert it into a natural conversation between two podcast hosts Speaker 1 {speaker1_name} and Speaker 2 {speaker2_name} discussing the topic. They should introduce themselves at the beginning."""
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=podcast_prompt),
],
),
]
generate_content_config = types.GenerateContentConfig(
temperature=1,
response_modalities=[
"audio",
],
speech_config=types.SpeechConfig(
multi_speaker_voice_config=types.MultiSpeakerVoiceConfig(
speaker_voice_configs=[
types.SpeakerVoiceConfig(
speaker="Speaker 1",
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name="Zephyr"
)
),
),
types.SpeakerVoiceConfig(
speaker="Speaker 2",
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name="Puck"
)
),
),
]
),
),
)
if progress:
progress(0.8, desc="Converting to audio...")
logger.info("Generating audio stream...")
# Create temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
temp_file.close()
audio_chunks = []
for chunk in client.models.generate_content_stream(
model=model,
contents=contents,
config=generate_content_config,
):
if (
chunk.candidates is None
or chunk.candidates[0].content is None
or chunk.candidates[0].content.parts is None
):
continue
if (chunk.candidates[0].content.parts[0].inline_data and
chunk.candidates[0].content.parts[0].inline_data.data):
inline_data = chunk.candidates[0].content.parts[0].inline_data
data_buffer = inline_data.data
# Convert to WAV if needed
if inline_data.mime_type != "audio/wav":
data_buffer = convert_to_wav(inline_data.data, inline_data.mime_type)
audio_chunks.append(data_buffer)
# Combine all audio chunks
if audio_chunks:
# For simplicity, just use the first chunk (you might want to concatenate them)
final_audio = audio_chunks[0]
save_binary_file(temp_file.name, final_audio)
if progress:
progress(1.0, desc="Podcast generated successfully!")
logger.info(f"Audio file saved: {temp_file.name}")
return temp_file.name
else:
raise ValueError("No audio data generated")
except Exception as e:
logger.error(f"Error in generate_podcast_from_content: {e}")
raise e
def generate_web_podcast(url, speaker1_name, speaker2_name, progress=None):
"""Main function to fetch web content and generate podcast."""
try:
if progress:
progress(0.0, desc="Starting podcast generation...")
logger.info(f"Starting podcast generation for URL: {url}")
# Validate inputs
if not url or not url.strip():
raise ValueError("Please enter a valid URL")
if not url.startswith(('http://', 'https://')):
raise ValueError("Please enter a valid URL starting with http:// or https://")
if not speaker1_name or not speaker1_name.strip():
speaker1_name = "Anna Chope"
if not speaker2_name or not speaker2_name.strip():
speaker2_name = "Adam Chan"
# Step 1: Fetch and analyze web content
content_text = fetch_web_content(url.strip(), progress)
if not content_text or len(content_text.strip()) < 50:
raise ValueError("Unable to extract sufficient content from the URL")
# Step 2: Generate podcast from the content
audio_file = generate_podcast_from_content(content_text, speaker1_name.strip(), speaker2_name.strip(), progress)
logger.info("Podcast generation completed successfully")
return audio_file, "βœ… Podcast generated successfully!", content_text
except Exception as e:
error_msg = f"❌ Error generating podcast: {str(e)}"
logger.error(f"Error in generate_web_podcast: {e}")
return None, error_msg, ""
# Create Gradio interface
def create_interface():
"""Create and return the Gradio interface."""
with gr.Blocks(
title="πŸŽ™οΈ Web-to-Podcast Generator",
theme=gr.themes.Soft(),
analytics_enabled=False
) as demo:
gr.Markdown("""
# πŸŽ™οΈ Web-to-Podcast Generator
Transform any website into an engaging podcast conversation between two AI hosts!
Simply paste a URL and let AI create a natural dialogue discussing the content.
""")
with gr.Row():
with gr.Column(scale=2):
url_input = gr.Textbox(
label="Website URL",
placeholder="https://example.com",
info="Enter the URL of the website you want to convert to a podcast",
lines=1
)
with gr.Row():
speaker1_input = gr.Textbox(
label="Host 1 Name",
value="Anna Chope",
info="Name of the first podcast host",
lines=1
)
speaker2_input = gr.Textbox(
label="Host 2 Name",
value="Adam Chan",
info="Name of the second podcast host",
lines=1
)
generate_btn = gr.Button("πŸŽ™οΈ Generate Podcast", variant="primary", size="lg")
with gr.Column(scale=1):
gr.Markdown("""
### Instructions:
1. Enter a website URL
2. Customize host names (optional)
3. Click "Generate Podcast"
4. Wait for the AI to analyze content and create audio
5. Download your podcast!
### Examples:
- News articles
- Blog posts
- Product pages
- Documentation
- Research papers
""")
with gr.Row():
status_output = gr.Textbox(label="Status", interactive=False, lines=2)
with gr.Row():
audio_output = gr.Audio(label="Generated Podcast", type="filepath")
with gr.Accordion("πŸ“ Generated Script Preview", open=False):
script_output = gr.Textbox(
label="Podcast Script",
lines=10,
interactive=False,
info="Preview of the conversation script generated from the website content"
)
# Event handlers
generate_btn.click(
fn=generate_web_podcast,
inputs=[url_input, speaker1_input, speaker2_input],
outputs=[audio_output, status_output, script_output],
show_progress=True
)
# Examples
gr.Examples(
examples=[
["https://github.com/weaviate/weaviate", "Anna", "Adam"],
["https://huggingface.co/blog", "Sarah", "Mike"],
["https://openai.com/blog", "Emma", "John"],
],
inputs=[url_input, speaker1_input, speaker2_input],
)
gr.Markdown("""
---
**Note:** API key is now directly embedded in the code for convenience.
The generated podcast will feature two AI voices having a natural conversation about the website content.
""")
return demo
if __name__ == "__main__":
try:
logger.info("Starting Web-to-Podcast Generator...")
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=False,
show_error=True
)
except Exception as e:
logger.error(f"Failed to launch application: {e}")
print(f"Error: {e}")
raise e