File size: 5,123 Bytes
1ea2ba0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from __future__ import annotations

import base64
import json
import logging
import os
import uuid
from io import BytesIO

import requests
from PIL import Image

from ..index_func import *
from ..presets import *
from ..utils import *
from .base_model import BaseLLMModel


class XMChat(BaseLLMModel):
    def __init__(self, api_key, user_name=""):
        super().__init__(model_name="xmchat", user=user_name)
        self.api_key = api_key
        self.session_id = None
        self.reset()
        self.image_bytes = None
        self.image_path = None
        self.xm_history = []
        self.url = "https://xmbot.net/web"
        self.last_conv_id = None

    def reset(self, remain_system_prompt=False):
        self.session_id = str(uuid.uuid4())
        self.last_conv_id = None
        return super().reset()

    def image_to_base64(self, image_path):
        # 打开并加载图片
        img = Image.open(image_path)

        # 获取图片的宽度和高度
        width, height = img.size

        # 计算压缩比例,以确保最长边小于4096像素
        max_dimension = 2048
        scale_ratio = min(max_dimension / width, max_dimension / height)

        if scale_ratio < 1:
            # 按压缩比例调整图片大小
            new_width = int(width * scale_ratio)
            new_height = int(height * scale_ratio)
            img = img.resize((new_width, new_height), Image.LANCZOS)

        # 将图片转换为jpg格式的二进制数据
        buffer = BytesIO()
        if img.mode == "RGBA":
            img = img.convert("RGB")
        img.save(buffer, format='JPEG')
        binary_image = buffer.getvalue()

        # 对二进制数据进行Base64编码
        base64_image = base64.b64encode(binary_image).decode('utf-8')

        return base64_image

    def try_read_image(self, filepath):
        def is_image_file(filepath):
            # 判断文件是否为图片
            valid_image_extensions = [
                ".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff"]
            file_extension = os.path.splitext(filepath)[1].lower()
            return file_extension in valid_image_extensions

        if is_image_file(filepath):
            logging.info(f"读取图片文件: {filepath}")
            self.image_bytes = self.image_to_base64(filepath)
            self.image_path = filepath
        else:
            self.image_bytes = None
            self.image_path = None

    def like(self):
        if self.last_conv_id is None:
            return "点赞失败,你还没发送过消息"
        data = {
            "uuid": self.last_conv_id,
            "appraise": "good"
        }
        requests.post(self.url, json=data)
        return "👍点赞成功,感谢反馈~"

    def dislike(self):
        if self.last_conv_id is None:
            return "点踩失败,你还没发送过消息"
        data = {
            "uuid": self.last_conv_id,
            "appraise": "bad"
        }
        requests.post(self.url, json=data)
        return "👎点踩成功,感谢反馈~"

    def prepare_inputs(self, real_inputs, use_websearch, files, reply_language, chatbot):
        fake_inputs = real_inputs
        display_append = ""
        limited_context = False
        return limited_context, fake_inputs, display_append, real_inputs, chatbot

    def handle_file_upload(self, files, chatbot, language):
        """if the model accepts multi modal input, implement this function"""
        if files:
            for file in files:
                if file.name:
                    logging.info(f"尝试读取图像: {file.name}")
                    self.try_read_image(file.name)
            if self.image_path is not None:
                chatbot = chatbot + [((self.image_path,), None)]
            if self.image_bytes is not None:
                logging.info("使用图片作为输入")
                # XMChat的一轮对话中实际上只能处理一张图片
                self.reset()
                conv_id = str(uuid.uuid4())
                data = {
                    "user_id": self.api_key,
                    "session_id": self.session_id,
                    "uuid": conv_id,
                    "data_type": "imgbase64",
                    "data": self.image_bytes
                }
                response = requests.post(self.url, json=data)
                response = json.loads(response.text)
                logging.info(f"图片回复: {response['data']}")
        return None, chatbot, None

    def get_answer_at_once(self):
        question = self.history[-1]["content"]
        conv_id = str(uuid.uuid4())
        self.last_conv_id = conv_id
        data = {
            "user_id": self.api_key,
            "session_id": self.session_id,
            "uuid": conv_id,
            "data_type": "text",
            "data": question
        }
        response = requests.post(self.url, json=data)
        try:
            response = json.loads(response.text)
            return response["data"], len(response["data"])
        except Exception as e:
            return response.text, len(response.text)