pod / app.py
deeme's picture
Upload app.py
2cc3649 verified
raw
history blame
26.7 kB
import gradio as gr
import os
import tempfile
import logging
from podcastfy.client import generate_podcast
from dotenv import load_dotenv
import requests
import json
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# 定义语音选项
VOICE_OPTIONS = [
{"id": "3b55b3d84d2f453a98d8ca9bb24182d6", "name": "邓紫琪"},
{"id": "fa756c4628b94b7394d1822e5848cf59", "name": "杨幂"},
{"id": "08f18a5692544543a6ca5fdd1eaa328c", "name": "宋雨琦"},
{"id": "f2ed19ca0ea246bf9cbc6382be00e4fc", "name": "王志文"},
{"id": "738d0cc1a3e9430a9de2b544a466a7fc", "name": "雷军"},
{"id": "1512d05841734931bf905d0520c272b1", "name": "周杰伦"},
{"id": "e4642e5edccd4d9ab61a69e82d4f8a14", "name": "蔡徐坤"},
{"id": "e04a3dc718864c999ef7db3035764aa8", "name": "刘华强"},
{"id": "7c66db6e457c4d53b1fe428a8c547953", "name": "郭德纲"},
{"id": "f6f293aabfe24e46aff0fc309c233d31", "name": "曹操"},
{"id": "22e8eb5f1f424c749592cd9db3927368", "name": "李云龙"},
{"id": "5e680ebc2eeb4f78a2224f2e1003b8c6", "name": "刘备"},
{"id": "zh-HK-HiuGaaiNeural", "name": "曉佳(粤语女声)"},
{"id": "zh-HK-HiuMaanNeural", "name": "曉曼(粤语女声)"},
{"id": "zh-HK-WanLungNeural", "name": "雲龍(粤语男声)"},
{"id": "zh-CN-XiaoxiaoNeural", "name": "晓晓(活泼女声)"},
{"id": "zh-CN-XiaoyiNeural", "name": "晓伊(女声)"},
{"id": "zh-CN-YunjianNeural", "name": "云健(解说男声)"},
{"id": "zh-CN-YunxiNeural", "name": "云希(阳光男声)"},
{"id": "zh-CN-YunxiaNeural", "name": "云夏(少年男声)"},
{"id": "zh-CN-YunyangNeural", "name": "云扬(专业男声)"},
{"id": "zh-CN-liaoning-XiaobeiNeural", "name": "晓贝(辽宁女声)"},
{"id": "zh-TW-HsiaoChenNeural", "name": "曉臻(湾湾女声)"},
{"id": "zh-TW-YunJheNeural", "name": "雲哲(湾湾男声)"},
{"id": "zh-TW-HsiaoYuNeural", "name": "曉雨(湾湾女声)"},
{"id": "zh-CN-shaanxi-XiaoniNeural", "name": "晓妮(陕西女声)"},
{"id": "alloy", "name": "alloy(用于官方)"},
{"id": "echo", "name": "echo"},
{"id": "fable", "name": "fable"},
{"id": "onyx", "name": "onyx"},
{"id": "nova", "name": "nova"},
{"id": "shimmer", "name": "shimmer"},
]
# 添加 API Keys 轮询功能
def get_next_gemini_key(api_keys):
keys = [k.strip() for k in api_keys.split(',') if k.strip()]
if not hasattr(get_next_gemini_key, 'current_index'):
get_next_gemini_key.current_index = 0
key = keys[get_next_gemini_key.current_index]
get_next_gemini_key.current_index = (get_next_gemini_key.current_index + 1) % len(keys)
return key
def get_api_key(key_name, ui_value):
if key_name == "GEMINI_API_KEY" and ui_value and ',' in ui_value:
return get_next_gemini_key(ui_value)
return ui_value if ui_value else os.getenv(key_name)
def process_inputs(
text_input,
urls_input,
pdf_files,
image_files,
gemini_key,
openai_key,
openai_base_url, # 新增参数
elevenlabs_key,
max_num_chunks,
min_chunk_size,
conversation_style,
roles_person1,
roles_person2,
dialogue_structure,
podcast_name,
podcast_tagline,
output_language,
tts_model,
creativity_level,
user_instructions,
engagement_techniques,
tts_openai_question,
tts_openai_answer,
ending_message,
longform,
llm_model_name,
#api_key_label,
#gemini_model,
#openai_model,
):
try:
logger.info("Starting podcast generation process")
# API key handling
logger.debug("Setting API keys")
os.environ["GEMINI_API_KEY"] = get_api_key("GEMINI_API_KEY", gemini_key)
logger.debug("Setting OpenAI API key")
if not openai_key and not os.getenv("OPENAI_API_KEY"):
raise ValueError("OpenAI API key is required when using OpenAI TTS model")
os.environ["OPENAI_API_KEY"] = get_api_key("OPENAI_API_KEY", openai_key)
# if api_key_label == "OPENAI_API_KEY":
os.environ["OPENAI_API_BASE"] = get_api_key("OPENAI_BASE_URL", openai_base_url)
if tts_model == "openai":
os.environ["OPENAI_BASE_URL"] = get_api_key("OPENAI_BASE_URL", openai_base_url)
# 根据选择的名称找到对应的 voice ID
tts_openai_question = next(voice["id"] for voice in VOICE_OPTIONS if voice["name"] == tts_openai_question)
tts_openai_answer = next(voice["id"] for voice in VOICE_OPTIONS if voice["name"] == tts_openai_answer)
if tts_model == "elevenlabs":
logger.debug("Setting ElevenLabs API key")
if not elevenlabs_key and not os.getenv("ELEVENLABS_API_KEY"):
raise ValueError("ElevenLabs API key is required when using ElevenLabs TTS model")
os.environ["ELEVENLABS_API_KEY"] = get_api_key("ELEVENLABS_API_KEY", elevenlabs_key)
# Process URLs
urls = [url.strip() for url in urls_input.split('\n') if url.strip()]
logger.debug(f"Processed URLs: {urls}")
temp_files = []
temp_dirs = []
# Handle PDF files
if pdf_files is not None and len(pdf_files) > 0:
logger.info(f"Processing {len(pdf_files)} PDF files")
pdf_temp_dir = tempfile.mkdtemp()
temp_dirs.append(pdf_temp_dir)
for i, pdf_file in enumerate(pdf_files):
pdf_path = os.path.join(pdf_temp_dir, f"input_pdf_{i}.pdf")
temp_files.append(pdf_path)
with open(pdf_path, 'wb') as f:
f.write(pdf_file)
urls.append(pdf_path)
logger.debug(f"Saved PDF {i} to {pdf_path}")
# Handle image files
image_paths = []
if image_files is not None and len(image_files) > 0:
logger.info(f"Processing {len(image_files)} image files")
img_temp_dir = tempfile.mkdtemp()
temp_dirs.append(img_temp_dir)
for i, img_file in enumerate(image_files):
# Get file extension from the original name in the file tuple
original_name = img_file.orig_name if hasattr(img_file, 'orig_name') else f"image_{i}.jpg"
extension = original_name.split('.')[-1]
logger.debug(f"Processing image file {i}: {original_name}")
img_path = os.path.join(img_temp_dir, f"input_image_{i}.{extension}")
temp_files.append(img_path)
try:
# Write the bytes directly to the file
with open(img_path, 'wb') as f:
if isinstance(img_file, (tuple, list)):
f.write(img_file[1]) # Write the bytes content
else:
f.write(img_file) # Write the bytes directly
image_paths.append(img_path)
logger.debug(f"Saved image {i} to {img_path}")
except Exception as e:
logger.error(f"Error saving image {i}: {str(e)}")
raise
# Prepare conversation config
logger.debug("Preparing conversation config")
conversation_config = {
"max_num_chunks": max_num_chunks,
"min_chunk_size": min_chunk_size,
"conversation_style": conversation_style.split(','),
"roles_person1": roles_person1,
"roles_person2": roles_person2,
"dialogue_structure": dialogue_structure.split(','),
"podcast_name": podcast_name,
"podcast_tagline": podcast_tagline,
"output_language": output_language,
"creativity": creativity_level,
"user_instructions": user_instructions,
"engagement_techniques": engagement_techniques,
'text_to_speech': {
'ending_message': ending_message,
'openai': {
'default_voices': {
'question': tts_openai_question,
'answer': tts_openai_answer
},
"model": "tts-1",
},
},
}
# Generate podcast
logger.info("Calling generate_podcast function")
logger.debug(f"URLs: {urls}")
logger.debug(f"Image paths: {image_paths}")
logger.debug(f"Text input present: {'Yes' if text_input else 'No'}")
audio_file = generate_podcast(
urls=urls if urls else None,
text=text_input if text_input else None,
image_paths=image_paths if image_paths else None,
tts_model=tts_model,
conversation_config=conversation_config,
longform=longform,
llm_model_name=llm_model_name,
api_key_label="OPENAI_API_KEY",
#llm_model_name=get_active_model(api_key_label, gemini_model, openai_model),
)
logger.info("Podcast generation completed")
# Cleanup
logger.debug("Cleaning up temporary files")
for file_path in temp_files:
if os.path.exists(file_path):
os.unlink(file_path)
logger.debug(f"Removed temp file: {file_path}")
for dir_path in temp_dirs:
if os.path.exists(dir_path):
os.rmdir(dir_path)
logger.debug(f"Removed temp directory: {dir_path}")
return audio_file
except Exception as e:
logger.error(f"Error in process_inputs: {str(e)}", exc_info=True)
# Cleanup on error
for file_path in temp_files:
if os.path.exists(file_path):
os.unlink(file_path)
for dir_path in temp_dirs:
if os.path.exists(dir_path):
os.rmdir(dir_path)
return str(e)
# Create Gradio interface with updated theme
with gr.Blocks(
title="AI播客plus",
theme=gr.themes.Base(
primary_hue="blue",
secondary_hue="slate",
neutral_hue="slate"
),
css="""
/* Move toggle arrow to left side */
.gr-accordion {
--accordion-arrow-size: 1.5em;
}
.gr-accordion > .label-wrap {
flex-direction: row !important;
justify-content: flex-start !important;
gap: 1em;
}
.gr-accordion > .label-wrap > .icon {
order: -1;
}
"""
) as demo:
with gr.Tab("默认环境变量已设置 Gemini、OpenAI API Key "):
# API Keys Section
with gr.Row():
gr.Markdown(
"""
<h2 style='color: #2196F3; margin-bottom: 10px; padding: 10px 0;'>
🔑 API Keys
</h2>
""",
elem_classes=["section-header"]
)
theme_btn = gr.Button("🌓", scale=0, min_width=0)
with gr.Accordion("配置 API Keys", open=False):
gemini_key = gr.Textbox(
label="Gemini API Key",
type="password",
value="",
info="必须的,多个key请用逗号分隔"
)
openai_key = gr.Textbox(
label="OpenAI API Key",
type="password",
value="",
info="只有在使用OpenAI文本转语音模型的情况下才需要此项"
)
openai_base_url = gr.Textbox(
label="OpenAI Base URL",
value="",
info="可选,留空使用默认URL:https://api.openai.com/v1"
)
elevenlabs_key = gr.Textbox(
label="ElevenLabs API Key",
type="password",
value="",
info="建议使用ElevenLabs TTS模型,仅在使用该模型时才需要此项"
)
# Content Input Section
gr.Markdown(
"""
<h2 style='color: #2196F3; margin-bottom: 10px; padding: 10px 0;'>
📝 输入内容
</h2>
""",
elem_classes=["section-header"]
)
with gr.Accordion("设置输入内容", open=False):
with gr.Group():
text_input = gr.Textbox(
label="文本输入",
placeholder="在此输入或粘贴文字...",
lines=3
)
urls_input = gr.Textbox(
label="URLs",
placeholder="请逐行输入网址,支持网站和YouTube视频链接.",
lines=3
)
# Place PDF and Image uploads side by side
with gr.Row():
with gr.Column():
pdf_files = gr.Files( # Changed from gr.File to gr.Files
label="上传 PDFs", # Updated label
file_types=[".pdf"],
type="binary"
)
gr.Markdown("*上传一个或多个PDF文件来创建播客*", elem_classes=["file-info"])
with gr.Column():
image_files = gr.Files(
label="上传图片",
file_types=["image"],
type="binary"
)
gr.Markdown("*上传一个或多个图片文件来创建播客*", elem_classes=["file-info"])
# Customization Section
gr.Markdown(
"""
<h2 style='color: #2196F3; margin-bottom: 10px; padding: 10px 0;'>
⚙️ 自定义选项
</h2>
""",
elem_classes=["section-header"]
)
with gr.Accordion("自定义选项", open=False):
# Basic Settings
gr.Markdown(
"""
<h3 style='color: #1976D2; margin: 15px 0 10px 0;'>
📊 基本设置
</h3>
""",
)
llm_model_name = gr.Radio(
choices=["gemini-1.5-pro-latest", "gemini-exp-1121", "learnlm-1.5-pro-experimental", "o1-mini", "o1-preview", "gpt-4o-mini", "gpt-4o", "gpt-4-turbo", "gpt-4", "gpt-4-turbo-2024-04-09", "claude-3-5-sonnet-20240620", "claude-3-5-sonnet-20241022", "claude-3-5-haiku-20241022"],
value="gemini-1.5-pro-latest",
label="文本生成模型",
info="默认使用 gemini-1.5-pro-latest "
)
longform = gr.Checkbox(
label="长篇模式",
value=False,
info="启用长篇内容生成模式,启用长篇需要Google Cloud支持,设置好GOOGLE_API_KEY"
)
with gr.Group(visible=False) as longform_settings_group:
max_num_chunks = gr.Slider(
minimum=1,
maximum=20,
value=8,
step=1,
label="最大轮数",
info="长篇模式下,生成的最大轮数"
)
min_chunk_size = gr.Slider(
minimum=300,
maximum=2000,
value=600,
step=100,
label="一轮最小字符数",
info="长篇模式下,生成一轮所需的最小字符数"
)
# 添加更新可见性的函数
def update_longform_settings(is_longform):
return gr.update(visible=is_longform)
# 添加事件监听
longform.change(
fn=update_longform_settings,
inputs=[longform],
outputs=[longform_settings_group]
)
conversation_style = gr.Textbox(
label="对话风格",
value="engaging,fast-paced,enthusiastic",
info="用于对话的风格列表(以逗号分隔)默认:生动活泼,节奏明快,热情洋溢。学术辩论: formal,analytical,critical;讲故事: narrative,suspenseful,descriptive"
)
# Roles and Structure
gr.Markdown(
"""
<h3 style='color: #1976D2; margin: 15px 0 10px 0;'>
👥 角色设定与结构安排
</h3>
""",
)
roles_person1 = gr.Textbox(
label="第一位发言者的角色",
value="main summarizer",
info="在对话中,第一个说话人扮演的角色,默认:主要负责总结的人。学术辩论: thesis presenter;讲故事: storyteller"
)
roles_person2 = gr.Textbox(
label="第二位发言者的角色",
value="questioner/clarifier",
info="在对话中,第二个说话人所扮演的角色或承担的任务,默认:提问者/释疑者。学术辩论: counterargument provider;讲故事: audience participator"
)
dialogue_structure = gr.Textbox(
label="对话结构",
value="Introduction,Main Content Summary,Conclusion",
info="对话结构的各个部分(用逗号隔开)默认:引言,主要内容的概括,总结。学术辩论: Opening Statements,Thesis Presentation,Counterarguments,Rebuttals,Closing Remarks;讲故事: Scene Setting,Character Introduction,Rising Action,Climax,Resolution"
)
engagement_techniques = gr.Textbox(
label="沟通技巧",
value="rhetorical questions,anecdotes,analogies,humor",
info="一些沟通和交流方式(用逗号隔开)默认:各种修辞、生动例子、形象比喻、诙谐幽默。学术辩论: socratic questioning,historical references,thought experiments;讲故事: cliffhangers,vivid imagery,audience prompts"
)
creativity_level = gr.Slider(
minimum=0,
maximum=1,
value=0.7,
step=0.1,
label="创意等级",
info="调节生成对话的创意程度(0 为注重事实,1 为更具创意)。学术辩论:0。讲故事:0.9"
)
# Podcast Identity
gr.Markdown(
"""
<h3 style='color: #1976D2; margin: 15px 0 10px 0;'>
🎙️ 播客特色
</h3>
""",
)
podcast_name = gr.Textbox(
label="播客名",
value="猛然间",
info="播客的名字"
)
podcast_tagline = gr.Textbox(
label="播客宣传语",
value="猛然回首,太匆匆",
info="播客的宣传语或副标题"
)
output_language = gr.Textbox(
label="输出语言",
value="Chinese",
info="播客使用的语言"
)
# Voice Settings
gr.Markdown(
"""
<h3 style='color: #1976D2; margin: 15px 0 10px 0;'>
🗣️ 语音设置
</h3>
""",
)
ending_message = gr.Textbox(
label="结束语",
value="撒由那拉!",
info="结束语"
)
tts_model = gr.Radio(
choices=["openai", "geminimulti", "elevenlabs", "gemini", "edge"],
value="openai",
label="文本转语音模型",
info="选择语音合成模型 (edge 免费但音质较差, 其他模型音质更好但需申请 API keys)"
)
with gr.Group(visible=True) as openai_voice_group:
tts_openai_question = gr.Dropdown(
choices=[voice["name"] for voice in VOICE_OPTIONS],
value=VOICE_OPTIONS[27]["name"],
label="OpenAI TTS 主持人",
info="选择OpenAI TTS 主持人角色语音"
)
tts_openai_answer = gr.Dropdown(
choices=[voice["name"] for voice in VOICE_OPTIONS],
value=VOICE_OPTIONS[31]["name"],
label="OpenAI TTS 嘉宾",
info="选择OpenAI TTS 嘉宾角色语音"
)
# 添加更新可见性的函数
def update_voice_options(tts_model):
return gr.update(visible=(tts_model == "openai"))
# 添加事件监听
tts_model.change(
fn=update_voice_options,
inputs=[tts_model],
outputs=[openai_voice_group]
)
# Advanced Settings
gr.Markdown(
"""
<h3 style='color: #1976D2; margin: 15px 0 10px 0;'>
🔧 高级选项
</h3>
""",
)
user_instructions = gr.Textbox(
label="个性化指令",
value="",
lines=2,
placeholder="在此处添加你希望AI遵循的具体指令,以控制对话的走向和内容...",
info="一些额外的指令,用来帮助AI更好地理解你想要聊天的内容和方向"
)
# api_key_label = gr.Radio(
# choices=["GEMINI_API_KEY", "OPENAI_API_KEY"],
# value="GEMINI_API_KEY",
# label="文本生成模型供应商",
# info="默认使用 Gemini "
# )
# with gr.Group(visible=True) as gemini_llm_group:
# gemini_model = gr.Radio(
# choices=["gemini-1.5-pro-latest", "gemini-exp-1121", "learnlm-1.5-pro-experimental"],
# value="gemini-1.5-pro-latest",
# label="Gemini 文本生成模型",
# info="默认使用 gemini-1.5-pro-latest "
# )
# def fetch_openai_models():
# try:
# response = requests.get("https://api.168369.xyz/v1/models")
# data = response.json()
# 提取所有模型的 id
# model_ids = [model["id"] for model in data["data"]]
# return model_ids
# except Exception as e:
# print(f"获取模型列表失败: {str(e)}")
# return ["获取模型列表失败"]
# with gr.Group(visible=False) as openai_llm_group:
# openai_model = gr.Radio(
#choices=fetch_openai_models(), # 从 API 获取模型列表
# choices=["o1-mini", "o1-preview", "gpt-4o-mini", "gpt-4o", "gpt-4-turbo", "gpt-4", "gpt-4-turbo-2024-04-09"],
# value="gpt-4o-mini",
# label="Openai 文本生成模型",
# info="默认为 gpt-4o-mini"
# )
# 添加获取当前有效模型的函数
# def get_active_model(api_key_label, gemini_model, openai_model):
# if api_key_label == "GEMINI_API_KEY":
# return gemini_model
# else: # OPENAI_API_KEY
# return openai_model
# 添加更新可见性的函数
# def update_llm_options(api_key_label):
# if api_key_label == "GEMINI_API_KEY":
# return gr.update(visible=True), gr.update(visible=False)
# else: # OPENAI_API_KEY
# return gr.update(visible=False), gr.update(visible=True)
# 添加事件监听
# api_key_label.change(
# fn=update_llm_options,
# inputs=[api_key_label],
# outputs=[gemini_llm_group, openai_llm_group]
# )
# Output Section
gr.Markdown(
"""
<h2 style='color: #2196F3; margin-bottom: 10px; padding: 10px 0;'>
🎵 生成结果
</h2>
""",
elem_classes=["section-header"]
)
with gr.Group():
generate_btn = gr.Button("🎙️ 生成播客", variant="primary")
audio_output = gr.Audio(
type="filepath",
label="生成的播客"
)
# Handle generation
generate_btn.click(
process_inputs,
inputs=[
text_input, urls_input, pdf_files, image_files,
gemini_key, openai_key, openai_base_url,
elevenlabs_key,
max_num_chunks, min_chunk_size, conversation_style,
roles_person1, roles_person2,
dialogue_structure, podcast_name,
podcast_tagline, output_language, tts_model,
creativity_level, user_instructions,
engagement_techniques, tts_openai_question, tts_openai_answer, ending_message,
longform, llm_model_name, #api_key_label, gemini_model, openai_model,
],
outputs=audio_output
)
# Add theme toggle functionality
theme_btn.click(
None,
None,
None,
js="""
function() {
document.querySelector('body').classList.toggle('dark');
return [];
}
"""
)
if __name__ == "__main__":
demo.queue().launch(share=True)