|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
import os |
|
import numpy as np |
|
from scipy.io.wavfile import write as write_wav |
|
from PIL import Image |
|
from tools import audio_to_str, image_to_str |
|
|
|
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") |
|
|
|
|
|
SAVE_DIR = 'download' |
|
os.makedirs(SAVE_DIR, exist_ok=True) |
|
|
|
def get_client_ip(request: gr.Request, debug_mode=False): |
|
"""获取客户端真实IP地址""" |
|
if request: |
|
|
|
x_forwarded_for = request.headers.get("x-forwarded-for", "") |
|
if x_forwarded_for: |
|
client_ip = x_forwarded_for.split(",")[0] |
|
else: |
|
client_ip = request.client.host |
|
if debug_mode: |
|
print(f"Debug: Client IP detected as {client_ip}") |
|
return client_ip |
|
return "unknown" |
|
|
|
def save_audio(audio, filename): |
|
"""保存音频为.wav文件""" |
|
sample_rate, audio_data = audio |
|
write_wav(filename, sample_rate, audio_data) |
|
|
|
def save_image(image, filename): |
|
"""保存图片为.jpg文件""" |
|
img = Image.fromarray(image.astype('uint8')) |
|
img.save(filename) |
|
|
|
def process(audio, image, text, request: gr.Request): |
|
"""处理语音、图片和文本的示例函数""" |
|
client_ip = get_client_ip(request, True) |
|
print(f"Processing request from IP: {client_ip}") |
|
|
|
audio_info = "未收到音频" |
|
image_info = "未收到图片" |
|
text_info = "未收到文本" |
|
audio_filename = None |
|
image_filename = None |
|
audio_text = "" |
|
image_text = "" |
|
|
|
if audio is not None: |
|
sample_rate, audio_data = audio |
|
audio_info = f"音频采样率: {sample_rate}Hz, 数据长度: {len(audio_data)}" |
|
|
|
audio_filename = os.path.join(SAVE_DIR, f"audio_{client_ip}.wav") |
|
save_audio(audio, audio_filename) |
|
print(f"Audio saved as {audio_filename}") |
|
|
|
audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename) |
|
if audio_text: |
|
print(f"Audio text: {audio_text}") |
|
else: |
|
print("Audio processing failed") |
|
|
|
if image is not None: |
|
image_info = f"图片尺寸: {image.shape}" |
|
|
|
image_filename = os.path.join(SAVE_DIR, f"image_{client_ip}.jpg") |
|
save_image(image, image_filename) |
|
print(f"Image saved as {image_filename}") |
|
|
|
image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename) |
|
if image_text: |
|
print(f"Image text: {image_text}") |
|
else: |
|
print("Image processing failed") |
|
|
|
if text: |
|
text_info = f"接收到文本: {text}" |
|
|
|
return audio_info, image_info, text_info, audio_text, image_text |
|
|
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("# ToDoAgent Multi-Modal Interface") |
|
|
|
|
|
with gr.Tab("Chat"): |
|
|
|
chatbot = gr.Chatbot(height=500, type="messages") |
|
|
|
msg = gr.Textbox(label="输入消息", placeholder="输入您的问题...") |
|
|
|
|
|
with gr.Row(): |
|
audio_input = gr.Audio(label="上传语音", type="numpy", sources=["upload", "microphone"]) |
|
image_input = gr.Image(label="上传图片", type="numpy") |
|
|
|
|
|
with gr.Accordion("高级设置", open=False): |
|
system_msg = gr.Textbox(value="You are a friendly Chatbot.", label="系统提示") |
|
max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大生成长度") |
|
temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度") |
|
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p") |
|
|
|
|
|
submit_btn = gr.Button("发送", variant="primary") |
|
|
|
|
|
clear = gr.Button("清除聊天") |
|
|
|
|
|
def user(user_message, chat_history): |
|
return "", chat_history + [{"role": "user", "content": user_message}] |
|
|
|
def respond(message, chat_history, system_message, max_tokens, temperature, top_p, audio=None, image=None, text=None, request=None): |
|
"""生成响应的函数""" |
|
|
|
multimodal_content = "" |
|
if audio is not None: |
|
try: |
|
audio_filename = os.path.join(SAVE_DIR, "temp_audio.wav") |
|
save_audio(audio, audio_filename) |
|
audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename) |
|
if audio_text: |
|
multimodal_content += f"音频内容: {audio_text}\n" |
|
except Exception as e: |
|
print(f"Audio processing error: {e}") |
|
|
|
if image is not None: |
|
try: |
|
image_filename = os.path.join(SAVE_DIR, "temp_image.jpg") |
|
save_image(image, image_filename) |
|
image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename) |
|
if image_text: |
|
multimodal_content += f"图片内容: {image_text}\n" |
|
except Exception as e: |
|
print(f"Image processing error: {e}") |
|
|
|
|
|
final_message = message |
|
if multimodal_content: |
|
final_message = f"{message}\n\n{multimodal_content}" |
|
|
|
|
|
messages = [{"role": "system", "content": system_message}] |
|
for chat in chat_history: |
|
if isinstance(chat, dict) and "role" in chat and "content" in chat: |
|
messages.append(chat) |
|
|
|
messages.append({"role": "user", "content": final_message}) |
|
|
|
|
|
try: |
|
response = client.chat_completion( |
|
messages, |
|
max_tokens=max_tokens, |
|
stream=True, |
|
temperature=temperature, |
|
top_p=top_p, |
|
) |
|
|
|
partial_message = "" |
|
for token in response: |
|
if token.choices[0].delta.content is not None: |
|
partial_message += token.choices[0].delta.content |
|
yield partial_message |
|
except Exception as e: |
|
yield f"抱歉,生成响应时出现错误: {str(e)}" |
|
|
|
def bot(chat_history, system_message, max_tokens, temperature, top_p, audio, image, text): |
|
|
|
if not chat_history or len(chat_history) == 0: |
|
return |
|
|
|
|
|
last_message = chat_history[-1] |
|
if not last_message or not isinstance(last_message, dict) or "content" not in last_message: |
|
return |
|
|
|
user_message = last_message["content"] |
|
|
|
|
|
bot_response = "" |
|
for response in respond( |
|
user_message, |
|
chat_history[:-1], |
|
system_message, |
|
max_tokens, |
|
temperature, |
|
top_p, |
|
audio, |
|
image, |
|
text |
|
): |
|
bot_response = response |
|
|
|
updated_history = chat_history + [{"role": "assistant", "content": bot_response}] |
|
yield updated_history |
|
|
|
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot |
|
) |
|
|
|
submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot |
|
) |
|
|
|
clear.click(lambda: None, None, chatbot, queue=False) |
|
|
|
with gr.Tab("Audio/Image Processing"): |
|
gr.Markdown("## 处理音频和图片") |
|
audio_processor = gr.Audio(label="上传音频", type="numpy") |
|
image_processor = gr.Image(label="上传图片", type="numpy") |
|
text_input = gr.Textbox(label="输入文本") |
|
process_btn = gr.Button("处理", variant="primary") |
|
audio_output = gr.Textbox(label="音频信息") |
|
image_output = gr.Textbox(label="图片信息") |
|
text_output = gr.Textbox(label="文本信息") |
|
audio_text_output = gr.Textbox(label="音频转文字结果") |
|
image_text_output = gr.Textbox(label="图片转文字结果") |
|
|
|
|
|
process_btn.click( |
|
process, |
|
inputs=[audio_processor, image_processor, text_input], |
|
outputs=[audio_output, image_output, text_output, audio_text_output, image_text_output] |
|
) |
|
|
|
if __name__ == "__main__": |
|
app.launch() |