File size: 9,861 Bytes
95bd630
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import gradio as gr
from huggingface_hub import InferenceClient
import os
import numpy as np
from scipy.io.wavfile import write as write_wav
from PIL import Image
from tools import audio_to_str, image_to_str  # 导入tools.py中的方法

client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")

# 指定保存文件的相对路径
SAVE_DIR = 'download'  # 相对路径
os.makedirs(SAVE_DIR, exist_ok=True)  # 确保目录存在

def get_client_ip(request: gr.Request, debug_mode=False):
    """获取客户端真实IP地址"""
    if request:
        # 从请求头中获取真实IP(考虑代理情况)
        x_forwarded_for = request.headers.get("x-forwarded-for", "")
        if x_forwarded_for:
            client_ip = x_forwarded_for.split(",")[0]
        else:
            client_ip = request.client.host
        if debug_mode:
            print(f"Debug: Client IP detected as {client_ip}")
        return client_ip
    return "unknown"

def save_audio(audio, filename):
    """保存音频为.wav文件"""
    sample_rate, audio_data = audio
    write_wav(filename, sample_rate, audio_data)

def save_image(image, filename):
    """保存图片为.jpg文件"""
    img = Image.fromarray(image.astype('uint8'))
    img.save(filename)

def process(audio, image, text, request: gr.Request):
    """处理语音、图片和文本的示例函数"""
    client_ip = get_client_ip(request, True)
    print(f"Processing request from IP: {client_ip}")

    audio_info = "未收到音频"
    image_info = "未收到图片"
    text_info = "未收到文本"
    audio_filename = None
    image_filename = None
    audio_text = ""
    image_text = ""

    if audio is not None:
        sample_rate, audio_data = audio
        audio_info = f"音频采样率: {sample_rate}Hz, 数据长度: {len(audio_data)}"
        # 保存音频为.wav文件
        audio_filename = os.path.join(SAVE_DIR, f"audio_{client_ip}.wav")
        save_audio(audio, audio_filename)
        print(f"Audio saved as {audio_filename}")
        # 调用tools.py中的audio_to_str方法处理音频
        audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename)
        if audio_text:
            print(f"Audio text: {audio_text}")
        else:
            print("Audio processing failed")

    if image is not None:
        image_info = f"图片尺寸: {image.shape}"
        # 保存图片为.jpg文件
        image_filename = os.path.join(SAVE_DIR, f"image_{client_ip}.jpg")
        save_image(image, image_filename)
        print(f"Image saved as {image_filename}")
        # 调用tools.py中的image_to_str方法处理图片
        image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename)
        if image_text:
            print(f"Image text: {image_text}")
        else:
            print("Image processing failed")

    if text:
        text_info = f"接收到文本: {text}"

    return audio_info, image_info, text_info, audio_text, image_text

# 创建自定义的聊天界面
with gr.Blocks() as app:
    gr.Markdown("# ToDoAgent Multi-Modal Interface")

    # 创建两个标签页
    with gr.Tab("Chat"):
        # 修复Chatbot类型警告
        chatbot = gr.Chatbot(height=500, type="messages")

        msg = gr.Textbox(label="输入消息", placeholder="输入您的问题...")

        # 上传区域
        with gr.Row():
            audio_input = gr.Audio(label="上传语音", type="numpy", sources=["upload", "microphone"])
            image_input = gr.Image(label="上传图片", type="numpy")

        # 设置区域
        with gr.Accordion("高级设置", open=False):
            system_msg = gr.Textbox(value="You are a friendly Chatbot.", label="系统提示")
            max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="最大生成长度")
            temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="温度")
            top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p")

        # 提交按钮
        submit_btn = gr.Button("发送", variant="primary")

        # 清除按钮
        clear = gr.Button("清除聊天")

        # 事件处理
        def user(user_message, chat_history):
            return "", chat_history + [{"role": "user", "content": user_message}]
#新增多模态处理--1
        def respond(message, chat_history, system_message, max_tokens, temperature, top_p, audio=None, image=None, text=None, request=None):
            """生成响应的函数"""
            # 处理多模态输入
            multimodal_content = ""
            if audio is not None:
                try:
                    audio_filename = os.path.join(SAVE_DIR, "temp_audio.wav")
                    save_audio(audio, audio_filename)
                    audio_text = audio_to_str("33c1b63d", "40bf7cd82e31ace30a9cfb76309a43a3", "OTY1YzIyZWM3YTg0OWZiMGE2ZjA2ZmE4", audio_filename)
                    if audio_text:
                        multimodal_content += f"音频内容: {audio_text}\n"
                except Exception as e:
                    print(f"Audio processing error: {e}")
            
            if image is not None:
                try:
                    image_filename = os.path.join(SAVE_DIR, "temp_image.jpg")
                    save_image(image, image_filename)
                    image_text = image_to_str(endpoint="https://ai-siyuwang5414995ai361208251338.cognitiveservices.azure.com/", key="45PYY2Av9CdMCveAjVG43MGKrnHzSxdiFTK9mWBgrOsMAHavxKj0JQQJ99BDACHYHv6XJ3w3AAAAACOGeVpQ", unused_param=None, file_path=image_filename)
                    if image_text:
                        multimodal_content += f"图片内容: {image_text}\n"
                except Exception as e:
                    print(f"Image processing error: {e}")
            
            # 组合最终消息
            final_message = message
            if multimodal_content:
                final_message = f"{message}\n\n{multimodal_content}"
            
            # 构建消息历史
            messages = [{"role": "system", "content": system_message}]
            for chat in chat_history:
                if isinstance(chat, dict) and "role" in chat and "content" in chat:
                    messages.append(chat)
            
            messages.append({"role": "user", "content": final_message})
            
            # 调用HuggingFace API
            try:
                response = client.chat_completion(
                    messages,
                    max_tokens=max_tokens,
                    stream=True,
                    temperature=temperature,
                    top_p=top_p,
                )
                
                partial_message = ""
                for token in response:
                    if token.choices[0].delta.content is not None:
                        partial_message += token.choices[0].delta.content
                        yield partial_message
            except Exception as e:
                yield f"抱歉,生成响应时出现错误: {str(e)}"

        def bot(chat_history, system_message, max_tokens, temperature, top_p, audio, image, text):
            # 检查chat_history是否为空
            if not chat_history or len(chat_history) == 0:
                return
            
            # 获取最后一条用户消息
            last_message = chat_history[-1]
            if not last_message or not isinstance(last_message, dict) or "content" not in last_message:
                return
            
            user_message = last_message["content"]

            # 生成响应
            bot_response = ""
            for response in respond(
                    user_message,
                    chat_history[:-1],
                    system_message,
                    max_tokens,
                    temperature,
                    top_p,
                    audio,
                    image,
                    text
            ):
                bot_response = response
                # 添加助手回复到聊天历史
                updated_history = chat_history + [{"role": "assistant", "content": bot_response}]
                yield updated_history

        msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then(
            bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot
        )

        submit_btn.click(user, [msg, chatbot], [msg, chatbot], queue=False).then(
            bot, [chatbot, system_msg, max_tokens, temperature, top_p, audio_input, image_input, msg], chatbot
        )

        clear.click(lambda: None, None, chatbot, queue=False)

    with gr.Tab("Audio/Image Processing"):
        gr.Markdown("## 处理音频和图片")
        audio_processor = gr.Audio(label="上传音频", type="numpy")
        image_processor = gr.Image(label="上传图片", type="numpy")
        text_input = gr.Textbox(label="输入文本")
        process_btn = gr.Button("处理", variant="primary")
        audio_output = gr.Textbox(label="音频信息")
        image_output = gr.Textbox(label="图片信息")
        text_output = gr.Textbox(label="文本信息")
        audio_text_output = gr.Textbox(label="音频转文字结果")
        image_text_output = gr.Textbox(label="图片转文字结果")

        # 修改后的处理函数调用
        process_btn.click(
            process,
            inputs=[audio_processor, image_processor, text_input],
            outputs=[audio_output, image_output, text_output, audio_text_output, image_text_output]
        )

if __name__ == "__main__":
    app.launch()