File size: 9,861 Bytes
95bd630 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
import gradio as gr
from huggingface_hub import InferenceClient
import os
import numpy as np
from scipy.io.wavfile import write as write_wav
from PIL import Image
from tools import audio_to_str, image_to_str # 导入tools.py中的方法
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
# 指定保存文件的相对路径
SAVE_DIR = 'download' # 相对路径
os.makedirs(SAVE_DIR, exist_ok=True) # 确保目录存在
def get_client_ip(request: gr.Request, debug_mode=False):
"""获取客户端真实IP地址"""
if request:
# 从请求头中获取真实IP(考虑代理情况)
x_forwarded_for = request.headers.get("x-forwarded-for", "")
if x_forwarded_for:
client_ip = x_forwarded_for.split(",")[0]
else:
client_ip = request.client.host
if debug_mode:
print(f"Debug: Client IP detected as {client_ip}")
return client_ip
return "unknown"
def save_audio(audio, filename):
"""保存音频为.wav文件"""
sample_rate, audio_data = audio
write_wav(filename, sample_rate, audio_data)
def save_image(image, filename):
"""保存图片为.jpg文件"""
img = Image.fromarray(image.astype('uint8'))
img.save(filename)
def process(audio, image, text, request: gr.Request):
"""处理语音、图片和文本的示例函数"""
client_ip = get_client_ip(request, True)
print(f"Processing request from IP: {client_ip}")
audio_info = "未收到音频"
image_info = "未收到图片"
text_info = "未收到文本"
audio_filename = None
image_filename = None
audio_text = ""
image_text = ""
if audio is not None:
sample_rate, audio_data = audio
audio_info = f"音频采样率: {sample_rate}Hz, 数据长度: {len(audio_data)}"
# 保存音频为.wav文件
audio_filename = os.path.join(SAVE_DIR, f"audio_{client_ip}.wav")
save_audio(audio, audio_filename)
print(f"Audio saved as {audio_filename}")
# 调用tools.py中的audio_to_str方法处理音频
audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename)
if audio_text:
print(f"Audio text: {audio_text}")
else:
print("Audio processing failed")
if image is not None:
image_info = f"图片尺寸: {image.shape}"
# 保存图片为.jpg文件
image_filename = os.path.join(SAVE_DIR, f"image_{client_ip}.jpg")
save_image(image, image_filename)
print(f"Image saved as {image_filename}")
# 调用tools.py中的image_to_str方法处理图片
image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename)
if image_text:
print(f"Image text: {image_text}")
else:
print("Image processing failed")
if text:
text_info = f"接收到文本: {text}"
return audio_info, image_info, text_info, audio_text, image_text
# 创建自定义的聊天界面
with gr.Blocks() as app:
gr.Markdown("# ToDoAgent Multi-Modal Interface")
# 创建两个标签页
with gr.Tab("Chat"):
# 修复Chatbot类型警告
chatbot = gr.Chatbot(height=500, type="messages")
msg = gr.Textbox(label="输入消息", placeholder="输入您的问题...")
# 上传区域
with gr.Row():
audio_input = gr.Audio(label="上传语音", type="numpy", sources=["upload", "microphone"])
image_input = gr.Image(label="上传图片", type="numpy")
# 设置区域
with gr.Accordion("高级设置", open=False):
system_msg = gr.Textbox(value="You are a friendly Chatbot.", label="系统提示")
max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大生成长度")
temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度")
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p")
# 提交按钮
submit_btn = gr.Button("发送", variant="primary")
# 清除按钮
clear = gr.Button("清除聊天")
# 事件处理
def user(user_message, chat_history):
return "", chat_history + [{"role": "user", "content": user_message}]
#新增多模态处理--1
def respond(message, chat_history, system_message, max_tokens, temperature, top_p, audio=None, image=None, text=None, request=None):
"""生成响应的函数"""
# 处理多模态输入
multimodal_content = ""
if audio is not None:
try:
audio_filename = os.path.join(SAVE_DIR, "temp_audio.wav")
save_audio(audio, audio_filename)
audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename)
if audio_text:
multimodal_content += f"音频内容: {audio_text}\n"
except Exception as e:
print(f"Audio processing error: {e}")
if image is not None:
try:
image_filename = os.path.join(SAVE_DIR, "temp_image.jpg")
save_image(image, image_filename)
image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename)
if image_text:
multimodal_content += f"图片内容: {image_text}\n"
except Exception as e:
print(f"Image processing error: {e}")
# 组合最终消息
final_message = message
if multimodal_content:
final_message = f"{message}\n\n{multimodal_content}"
# 构建消息历史
messages = [{"role": "system", "content": system_message}]
for chat in chat_history:
if isinstance(chat, dict) and "role" in chat and "content" in chat:
messages.append(chat)
messages.append({"role": "user", "content": final_message})
# 调用HuggingFace API
try:
response = client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
)
partial_message = ""
for token in response:
if token.choices[0].delta.content is not None:
partial_message += token.choices[0].delta.content
yield partial_message
except Exception as e:
yield f"抱歉,生成响应时出现错误: {str(e)}"
def bot(chat_history, system_message, max_tokens, temperature, top_p, audio, image, text):
# 检查chat_history是否为空
if not chat_history or len(chat_history) == 0:
return
# 获取最后一条用户消息
last_message = chat_history[-1]
if not last_message or not isinstance(last_message, dict) or "content" not in last_message:
return
user_message = last_message["content"]
# 生成响应
bot_response = ""
for response in respond(
user_message,
chat_history[:-1],
system_message,
max_tokens,
temperature,
top_p,
audio,
image,
text
):
bot_response = response
# 添加助手回复到聊天历史
updated_history = chat_history + [{"role": "assistant", "content": bot_response}]
yield updated_history
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot
)
submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then(
bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot
)
clear.click(lambda: None, None, chatbot, queue=False)
with gr.Tab("Audio/Image Processing"):
gr.Markdown("## 处理音频和图片")
audio_processor = gr.Audio(label="上传音频", type="numpy")
image_processor = gr.Image(label="上传图片", type="numpy")
text_input = gr.Textbox(label="输入文本")
process_btn = gr.Button("处理", variant="primary")
audio_output = gr.Textbox(label="音频信息")
image_output = gr.Textbox(label="图片信息")
text_output = gr.Textbox(label="文本信息")
audio_text_output = gr.Textbox(label="音频转文字结果")
image_text_output = gr.Textbox(label="图片转文字结果")
# 修改后的处理函数调用
process_btn.click(
process,
inputs=[audio_processor, image_processor, text_input],
outputs=[audio_output, image_output, text_output, audio_text_output, image_text_output]
)
if __name__ == "__main__":
app.launch() |