ToDoAgent / se_app.py
siyuwang541's picture
mvp
95bd630 verified
import gradio as gr
from huggingface_hub import InferenceClient
import os
import numpy as np
from scipy.io.wavfile import write as write_wav
from PIL import Image
from tools import audio_to_str, image_to_str # 导入tools.py中的方法
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# 指定保存文件的相对路径
SAVE_DIR = 'download' # 相对路径
os.makedirs(SAVE_DIR, exist_ok=True) # 确保目录存在
def get_client_ip(request: gr.Request, debug_mode=False):
"""获取客户端真实IP地址"""
if request:
# 从请求头中获取真实IP(考虑代理情况)
x_forwarded_for = request.headers.get("x-forwarded-for", "")
if x_forwarded_for:
client_ip = x_forwarded_for.split(",")[0]
else:
client_ip = request.client.host
if debug_mode:
print(f"Debug: Client IP detected as {client_ip}")
return client_ip
return "unknown"
def save_audio(audio, filename):
"""保存音频为.wav文件"""
sample_rate, audio_data = audio
write_wav(filename, sample_rate, audio_data)
def save_image(image, filename):
"""保存图片为.jpg文件"""
img = Image.fromarray(image.astype('uint8'))
img.save(filename)
def process(audio, image, text, request: gr.Request):
"""处理语音、图片和文本的示例函数"""
client_ip = get_client_ip(request, True)
print(f"Processing request from IP: {client_ip}")
audio_info = "未收到音频"
image_info = "未收到图片"
text_info = "未收到文本"
audio_filename = None
image_filename = None
audio_text = ""
image_text = ""
if audio is not None:
sample_rate, audio_data = audio
audio_info = f"音频采样率: {sample_rate}Hz, 数据长度: {len(audio_data)}"
# 保存音频为.wav文件
audio_filename = os.path.join(SAVE_DIR, f"audio_{client_ip}.wav")
save_audio(audio, audio_filename)
print(f"Audio saved as {audio_filename}")
# 调用tools.py中的audio_to_str方法处理音频
audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename)
if audio_text:
print(f"Audio text: {audio_text}")
else:
print("Audio processing failed")
if image is not None:
image_info = f"图片尺寸: {image.shape}"
# 保存图片为.jpg文件
image_filename = os.path.join(SAVE_DIR, f"image_{client_ip}.jpg")
save_image(image, image_filename)
print(f"Image saved as {image_filename}")
# 调用tools.py中的image_to_str方法处理图片
image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename)
if image_text:
print(f"Image text: {image_text}")
else:
print("Image processing failed")
if text:
text_info = f"接收到文本: {text}"
return audio_info, image_info, text_info, audio_text, image_text
# 创建自定义的聊天界面
with gr.Blocks() as app:
gr.Markdown("# ToDoAgent Multi-Modal Interface")
# 创建两个标签页
with gr.Tab("Chat"):
# 修复Chatbot类型警告
chatbot = gr.Chatbot(height=500, type="messages")
msg = gr.Textbox(label="输入消息", placeholder="输入您的问题...")
# 上传区域
with gr.Row():
audio_input = gr.Audio(label="上传语音", type="numpy", sources=["upload", "microphone"])
image_input = gr.Image(label="上传图片", type="numpy")
# 设置区域
with gr.Accordion("高级设置", open=False):
system_msg = gr.Textbox(value="You are a friendly Chatbot.", label="系统提示")
max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大生成长度")
temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度")
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p")
# 提交按钮
submit_btn = gr.Button("发送", variant="primary")
# 清除按钮
clear = gr.Button("清除聊天")
# 事件处理
def user(user_message, chat_history):
return "", chat_history + [{"role": "user", "content": user_message}]
#新增多模态处理--1
def respond(message, chat_history, system_message, max_tokens, temperature, top_p, audio=None, image=None, text=None, request=None):
"""生成响应的函数"""
# 处理多模态输入
multimodal_content = ""
if audio is not None:
try:
audio_filename = os.path.join(SAVE_DIR, "temp_audio.wav")
save_audio(audio, audio_filename)
audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename)
if audio_text:
multimodal_content += f"音频内容: {audio_text}\n"
except Exception as e:
print(f"Audio processing error: {e}")
if image is not None:
try:
image_filename = os.path.join(SAVE_DIR, "temp_image.jpg")
save_image(image, image_filename)
image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename)
if image_text:
multimodal_content += f"图片内容: {image_text}\n"
except Exception as e:
print(f"Image processing error: {e}")
# 组合最终消息
final_message = message
if multimodal_content:
final_message = f"{message}\n\n{multimodal_content}"
# 构建消息历史
messages = [{"role": "system", "content": system_message}]
for chat in chat_history:
if isinstance(chat, dict) and "role" in chat and "content" in chat:
messages.append(chat)
messages.append({"role": "user", "content": final_message})
# 调用HuggingFace API
try:
response = client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
)
partial_message = ""
for token in response:
if token.choices[0].delta.content is not None:
partial_message += token.choices[0].delta.content
yield partial_message
except Exception as e:
yield f"抱歉,生成响应时出现错误: {str(e)}"
def bot(chat_history, system_message, max_tokens, temperature, top_p, audio, image, text):
# 检查chat_history是否为空
if not chat_history or len(chat_history) == 0:
return
# 获取最后一条用户消息
last_message = chat_history[-1]
if not last_message or not isinstance(last_message, dict) or "content" not in last_message:
return
user_message = last_message["content"]
# 生成响应
bot_response = ""
for response in respond(
user_message,
chat_history[:-1],
system_message,
max_tokens,
temperature,
top_p,
audio,
image,
text
):
bot_response = response
# 添加助手回复到聊天历史
updated_history = chat_history + [{"role": "assistant", "content": bot_response}]
yield updated_history
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot
)
submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot
)
clear.click(lambda: None, None, chatbot, queue=False)
with gr.Tab("Audio/Image Processing"):
gr.Markdown("## 处理音频和图片")
audio_processor = gr.Audio(label="上传音频", type="numpy")
image_processor = gr.Image(label="上传图片", type="numpy")
text_input = gr.Textbox(label="输入文本")
process_btn = gr.Button("处理", variant="primary")
audio_output = gr.Textbox(label="音频信息")
image_output = gr.Textbox(label="图片信息")
text_output = gr.Textbox(label="文本信息")
audio_text_output = gr.Textbox(label="音频转文字结果")
image_text_output = gr.Textbox(label="图片转文字结果")
# 修改后的处理函数调用
process_btn.click(
process,
inputs=[audio_processor, image_processor, text_input],
outputs=[audio_output, image_output, text_output, audio_text_output, image_text_output]
)
if __name__ == "__main__":
app.launch()