|
|
|
import json |
|
import httpx |
|
import os |
|
import re |
|
import feedparser |
|
import asyncio |
|
import random |
|
import edge_tts |
|
import tempfile |
|
import cohere |
|
import gradio as gr |
|
from pydub import AudioSegment |
|
from moviepy.editor import AudioFileClip, concatenate_audioclips |
|
|
|
double_prompt = ''' |
|
[SYSTEM_INSTRUCT] You are an insightful podcast generator for The Daily Show. You have to create short conversations between Xiao and Yang that gives an overview of the Info given by the user. |
|
Please provide the script and output strictly in the following JSON format: |
|
{ |
|
"title": "[string]", |
|
"content": { |
|
"Xiao_0: "[string]", |
|
"Yang_0": "[string]", |
|
... |
|
} |
|
} |
|
#Be concise, keep Show style. No less than five rounds of conversation. |
|
#Please note that the [string] you generate now must be in native Chinese. |
|
''' |
|
|
|
single_prompt = """ |
|
[SYSTEM_INSTRUCT] You are a podcast generator in The Daily Show. You have to create short scripts for Yang to comment on current events in the Info given by the user. |
|
Please provide the script and output strictly in the following JSON format: |
|
{ |
|
"title": "[string]", |
|
"content": { |
|
"Yang_0: "[string]", |
|
"Yang_1": "[string]", |
|
... |
|
} |
|
} |
|
#Be concise, keep Show style. No less than five rounds of conversation. |
|
#Please note that the [string] you generate now must be in native Chinese. |
|
""" |
|
|
|
DESCRIPTION = ''' |
|
<div> |
|
<h1 style="text-align: center;">📻听说demo</h1> |
|
<p>一个轻量的中文播客</p> |
|
<p>🔎 输入完整的网页链接发送即可。</p> |
|
<p>🦕 部分网址可能无法解析,请尝试更换。</p> |
|
<p>🍀 点击随机将随机获取资讯。</p> |
|
</div> |
|
''' |
|
|
|
css = """ |
|
h1 { |
|
text-align: center; |
|
display: block; |
|
} |
|
p { |
|
text-align: center; |
|
} |
|
footer { |
|
display:none !important |
|
} |
|
""" |
|
|
|
rss_feed = 'https://www.scmp.com/rss/4/feed' |
|
|
|
|
|
apikey = os.environ.get("API_KEY") |
|
co = cohere.Client(api_key=apikey) |
|
|
|
|
|
|
|
def is_url(string): |
|
url_pattern = re.compile( |
|
r'^(?:http|ftp)s?://' |
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' |
|
r'localhost|' |
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
r'(?::\d+)?' |
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE) |
|
return re.match(url_pattern, string) is not None |
|
|
|
|
|
def validate_url(url): |
|
try: |
|
response = httpx.get(url, timeout=60.0) |
|
response.raise_for_status() |
|
return response.text |
|
except httpx.RequestError as e: |
|
return f"An error occurred while requesting {url}: {str(e)}" |
|
except httpx.HTTPStatusError as e: |
|
return f"Error response {e.response.status_code} while requesting {url}" |
|
except Exception as e: |
|
return f"An unexpected error occurred: {str(e)}" |
|
|
|
def fetch_text(url): |
|
print("Entered Webpage Extraction") |
|
prefix_url = "https://r.jina.ai/" |
|
full_url = prefix_url + url |
|
print(full_url) |
|
print("Exited Webpage Extraction") |
|
return validate_url(full_url) |
|
|
|
|
|
async def text_to_speech(text, voice, filename): |
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(filename) |
|
|
|
|
|
async def gen_show(script): |
|
title = script['title'] |
|
content = script['content'] |
|
|
|
temp_files = [] |
|
|
|
tasks = [] |
|
for key, text in content.items(): |
|
speaker = key.split('_')[0] |
|
index = key.split('_')[1] |
|
voice = "zh-CN-XiaoxiaoNeural" if speaker == "Xiao" else "zh-CN-YunyangNeural" |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) |
|
temp_files.append(temp_file.name) |
|
|
|
filename = temp_file.name |
|
tasks.append(text_to_speech(text, voice, filename)) |
|
print(f"Generated audio for {speaker}_{index}: {filename}") |
|
|
|
await asyncio.gather(*tasks) |
|
|
|
|
|
audio_clips = [AudioFileClip(temp_file) for temp_file in temp_files] |
|
combined = concatenate_audioclips(audio_clips) |
|
|
|
|
|
output_filename = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False).name |
|
|
|
|
|
combined.write_audiofile(output_filename) |
|
print(f"Combined audio saved as: {output_filename}") |
|
|
|
|
|
for temp_file in temp_files: |
|
os.remove(temp_file) |
|
print(f"Deleted temporary file: {temp_file}") |
|
|
|
return output_filename |
|
|
|
|
|
def extract_content(text): |
|
"""Extracts the JSON content from the given text.""" |
|
match = re.search(r'\{(?:[^{}]|\{[^{}]*\})*\}', text, re.DOTALL) |
|
if match: |
|
return match.group(0) |
|
else: |
|
return None |
|
|
|
async def main(link, peoples): |
|
if not link.startswith("http://") and not link.startswith("https://"): |
|
return "URL must start with 'http://' or 'https://'",None |
|
|
|
text = fetch_text(link) |
|
|
|
if "Error" in text: |
|
return text, None |
|
|
|
prompt = f"Info: {text}" |
|
|
|
if peoples == "双人": |
|
system_prompt = double_prompt |
|
else: |
|
system_prompt = single_prompt |
|
|
|
messages = system_prompt + "\n\n\n" + prompt |
|
|
|
completion = co.chat( |
|
model="command-r", |
|
message=messages |
|
) |
|
|
|
|
|
print(completion) |
|
|
|
generated_script = extract_content(completion.text) |
|
|
|
|
|
|
|
|
|
if not generated_script or not generated_script.strip().startswith('{'): |
|
raise ValueError("Failed to generate a valid script.") |
|
|
|
script_json = json.loads(generated_script) |
|
output_filename = await gen_show(script_json) |
|
print("Output File:"+output_filename) |
|
|
|
|
|
return output_filename |
|
|
|
|
|
async def random_news(peoples): |
|
global rss_feed |
|
if not is_url(rss_feed): |
|
raise ValueError(f"{rss_feed} is not a valid RSS feed.") |
|
news = [] |
|
feed = feedparser.parse(rss_feed) |
|
for entry in feed.entries: |
|
news.append(entry.link) |
|
random_url = random.choice(news) |
|
print(random_url) |
|
output = await main(random_url, peoples) |
|
return output |
|
|
|
Examples = [ |
|
["https://www.yahoo.com/news/shes-worlds-most-expensive-cow-040156493.html","单人"], |
|
["https://www.yahoo.com/entertainment/kevin-spacey-says-owes-many-220432469.html","双人"], |
|
["https://www.yahoo.com/tech/super-hornet-armed-sm-6-180853983.html","双人"], |
|
["https://www.yahoo.com/news/harvard-scientists-may-unknown-technologically-150917239.html","单人"], |
|
] |
|
with gr.Blocks(theme='soft', css=css, title="听说") as iface: |
|
with gr.Accordion(""): |
|
gr.Markdown(DESCRIPTION) |
|
with gr.Row(): |
|
output_box = gr.Audio(label="播客", type="filepath", interactive=False, autoplay=True, elem_classes="audio") |
|
with gr.Row(): |
|
input_box = gr.Textbox(label="网址", placeholder="请输入https开头的网址") |
|
with gr.Row(): |
|
peoples = gr.Radio(["单人","双人"],value="双人",label="播音员人数") |
|
with gr.Row(): |
|
submit_btn = gr.Button("🚀 发送") |
|
random_btn = gr.Button("🤙 随机") |
|
clear_btn = gr.ClearButton(output_box, value="🗑️ 清除") |
|
|
|
gr.Examples(examples=Examples, inputs=[input_box,peoples], outputs=output_box, fn=main, label="示例", cache_examples="lazy") |
|
|
|
|
|
submit_btn.click(main, inputs=[input_box,peoples], outputs=output_box) |
|
random_btn.click(fn=random_news, inputs=peoples, outputs=output_box) |
|
|
|
|
|
|
|
|
|
iface.queue().launch(show_api=False) |