podcast / app.py
orrinin's picture
Update app.py
7155424 verified
#Using codes from killerz3/PodGen & eswardivi/Podcastify
import json
import httpx
import os
import re
import feedparser
import asyncio
import random
import edge_tts
import tempfile
import cohere
import gradio as gr
from pydub import AudioSegment
from moviepy.editor import AudioFileClip, concatenate_audioclips
double_prompt = '''
[SYSTEM_INSTRUCT] You are an insightful podcast generator for The Daily Show. You have to create short conversations between Xiao and Yang that gives an overview of the Info given by the user.
Please provide the script and output strictly in the following JSON format:
{
"title": "[string]",
"content": {
"Xiao_0: "[string]",
"Yang_0": "[string]",
...
}
}
#Be concise, keep Show style. No less than five rounds of conversation.
#Please note that the [string] you generate now must be in native Chinese.
'''
single_prompt = """
[SYSTEM_INSTRUCT] You are a podcast generator in The Daily Show. You have to create short scripts for Yang to comment on current events in the Info given by the user.
Please provide the script and output strictly in the following JSON format:
{
"title": "[string]",
"content": {
"Yang_0: "[string]",
"Yang_1": "[string]",
...
}
}
#Be concise, keep Show style. No less than five rounds of conversation.
#Please note that the [string] you generate now must be in native Chinese.
"""
DESCRIPTION = '''
<div>
<h1 style="text-align: center;">📻听说demo</h1>
<p>一个轻量的中文播客</p>
<p>🔎 输入完整的网页链接发送即可。</p>
<p>🦕 部分网址可能无法解析,请尝试更换。</p>
<p>🍀 点击随机将随机获取资讯。</p>
</div>
'''
css = """
h1 {
text-align: center;
display: block;
}
p {
text-align: center;
}
footer {
display:none !important
}
"""
rss_feed = 'https://www.scmp.com/rss/4/feed'
apikey = os.environ.get("API_KEY")
co = cohere.Client(api_key=apikey)
# RSS feeds
def is_url(string):
url_pattern = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return re.match(url_pattern, string) is not None
def validate_url(url):
try:
response = httpx.get(url, timeout=60.0)
response.raise_for_status()
return response.text
except httpx.RequestError as e:
return f"An error occurred while requesting {url}: {str(e)}"
except httpx.HTTPStatusError as e:
return f"Error response {e.response.status_code} while requesting {url}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
def fetch_text(url):
print("Entered Webpage Extraction")
prefix_url = "https://r.jina.ai/"
full_url = prefix_url + url
print(full_url)
print("Exited Webpage Extraction")
return validate_url(full_url)
async def text_to_speech(text, voice, filename):
communicate = edge_tts.Communicate(text, voice)
await communicate.save(filename)
async def gen_show(script):
title = script['title']
content = script['content']
temp_files = []
tasks = []
for key, text in content.items():
speaker = key.split('_')[0] # Extract the speaker name
index = key.split('_')[1] # Extract the dialogue index
voice = "zh-CN-XiaoxiaoNeural" if speaker == "Xiao" else "zh-CN-YunyangNeural"
# Create temporary file for each speaker's dialogue
temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False)
temp_files.append(temp_file.name)
filename = temp_file.name
tasks.append(text_to_speech(text, voice, filename))
print(f"Generated audio for {speaker}_{index}: {filename}")
await asyncio.gather(*tasks)
# Combine the audio files using moviepy
audio_clips = [AudioFileClip(temp_file) for temp_file in temp_files]
combined = concatenate_audioclips(audio_clips)
# Create temporary file for the combined output
output_filename = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False).name
# Save the combined file
combined.write_audiofile(output_filename)
print(f"Combined audio saved as: {output_filename}")
# Clean up temporary files
for temp_file in temp_files:
os.remove(temp_file)
print(f"Deleted temporary file: {temp_file}")
return output_filename
def extract_content(text):
"""Extracts the JSON content from the given text."""
match = re.search(r'\{(?:[^{}]|\{[^{}]*\})*\}', text, re.DOTALL)
if match:
return match.group(0)
else:
return None
async def main(link, peoples):
if not link.startswith("http://") and not link.startswith("https://"):
return "URL must start with 'http://' or 'https://'",None
text = fetch_text(link)
if "Error" in text:
return text, None
prompt = f"Info: {text}"
if peoples == "双人":
system_prompt = double_prompt
else:
system_prompt = single_prompt
messages = system_prompt + "\n\n\n" + prompt
completion = co.chat(
model="command-r",
message=messages
)
print(completion)
generated_script = extract_content(completion.text)
#print("Generated Script:"+generated_script)
# Check if the generated_script is empty or not valid JSON
if not generated_script or not generated_script.strip().startswith('{'):
raise ValueError("Failed to generate a valid script.")
script_json = json.loads(generated_script) # Use the generated script as input
output_filename = await gen_show(script_json)
print("Output File:"+output_filename)
# Read the generated audio file
return output_filename
async def random_news(peoples):
global rss_feed
if not is_url(rss_feed):
raise ValueError(f"{rss_feed} is not a valid RSS feed.")
news = []
feed = feedparser.parse(rss_feed)
for entry in feed.entries:
news.append(entry.link)
random_url = random.choice(news)
print(random_url)
output = await main(random_url, peoples)
return output
Examples = [
["https://www.yahoo.com/news/shes-worlds-most-expensive-cow-040156493.html","单人"],
["https://www.yahoo.com/entertainment/kevin-spacey-says-owes-many-220432469.html","双人"],
["https://www.yahoo.com/tech/super-hornet-armed-sm-6-180853983.html","双人"],
["https://www.yahoo.com/news/harvard-scientists-may-unknown-technologically-150917239.html","单人"],
]
with gr.Blocks(theme='soft', css=css, title="听说") as iface:
with gr.Accordion(""):
gr.Markdown(DESCRIPTION)
with gr.Row():
output_box = gr.Audio(label="播客", type="filepath", interactive=False, autoplay=True, elem_classes="audio") # Create an output textbox
with gr.Row():
input_box = gr.Textbox(label="网址", placeholder="请输入https开头的网址")
with gr.Row():
peoples = gr.Radio(["单人","双人"],value="双人",label="播音员人数")
with gr.Row():
submit_btn = gr.Button("🚀 发送") # Create a submit button
random_btn = gr.Button("🤙 随机")
clear_btn = gr.ClearButton(output_box, value="🗑️ 清除") # Create a clear button
gr.Examples(examples=Examples, inputs=[input_box,peoples], outputs=output_box, fn=main, label="示例", cache_examples="lazy")
# Set up the event listeners
submit_btn.click(main, inputs=[input_box,peoples], outputs=output_box)
random_btn.click(fn=random_news, inputs=peoples, outputs=output_box)
#gr.close_all()
iface.queue().launch(show_api=False) # Launch the Gradio interface