from __future__ import annotations import base64 import json import logging import os import uuid from io import BytesIO import requests from PIL import Image import torch from transformers import AutoModelForCausalLM, AutoTokenizer from ..index_func import * from ..presets import * from ..utils import * from .base_model import BaseLLMModel from .. import shared imp_model = AutoModelForCausalLM.from_pretrained( "MILVLG/imp-v1-3b", torch_dtype=torch.float16, device_map="auto", trust_remote_code=True) imp_tokenizer = AutoTokenizer.from_pretrained("MILVLG/imp-v1-3b", trust_remote_code=True) # print('model loading') # model = AutoModelForCausalLM.from_pretrained( # "/home/shaozw/labs/imp-v0", # torch_dtype=torch.float16, # device_map="auto", # trust_remote_code=True) # tokenizer = AutoTokenizer.from_pretrained("/home/shaozw/labs/imp-v0", trust_remote_code=True) # print('model loaded') class XMChat(BaseLLMModel): def __init__(self, api_key, user_name="", common_model=None, common_tokenizer=None): super().__init__(model_name="xmchat", user=user_name) self.api_key = api_key self.image_flag = False self.session_id = None self.reset() self.image_bytes = None self.image_path = None self.xm_history = [] self.url = "https://xmbot.net/web" self.last_conv_id = None self.max_generation_token = 100 # [Edited by zhenwei - 2024-01-26 10:35] self.common_model = common_model self.common_tokenizer = common_tokenizer self.system_prompt = "A chat between a curious user and an artificial intelligence assistant. This artificial intelligence assistant is a chatbot named as Imp, and developed by MILVLG team. Imp gives helpful, detailed, and polite answers to the user's questions." def reset(self, remain_system_prompt=False): logging.info("Reseting...") self.session_id = str(uuid.uuid4()) self.last_conv_id = None self.image_bytes = None self.image_flag = False return super().reset() def image_to_base64(self, image_path): # 打开并加载图片 img = Image.open(image_path) # 获取图片的宽度和高度 width, height = img.size # 计算压缩比例,以确保最长边小于4096像素 max_dimension = 2048 scale_ratio = min(max_dimension / width, max_dimension / height) if scale_ratio < 1: # 按压缩比例调整图片大小 new_width = int(width * scale_ratio) new_height = int(height * scale_ratio) img = img.resize((new_width, new_height), Image.LANCZOS) # 将图片转换为jpg格式的二进制数据 buffer = BytesIO() if img.mode == "RGBA": img = img.convert("RGB") img.save(buffer, format='JPEG') binary_image = buffer.getvalue() # 对二进制数据进行Base64编码 base64_image = base64.b64encode(binary_image).decode('utf-8') return base64_image def try_read_image(self, filepath): def is_image_file(filepath): # 判断文件是否为图片 valid_image_extensions = [ ".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff"] file_extension = os.path.splitext(filepath)[1].lower() return file_extension in valid_image_extensions if is_image_file(filepath): logging.info(f"读取图片文件: {filepath}") self.image_bytes = Image.open(filepath) self.image_path = filepath self.image_flag = True else: self.image_bytes = None self.image_path = None # self.image_flag = False def like(self): if self.last_conv_id is None: return "点赞失败,你还没发送过消息" data = { "uuid": self.last_conv_id, "appraise": "good" } requests.post(self.url, json=data) return "👍点赞成功,感谢反馈~" def dislike(self): if self.last_conv_id is None: return "点踩失败,你还没发送过消息" data = { "uuid": self.last_conv_id, "appraise": "bad" } requests.post(self.url, json=data) return "👎点踩成功,感谢反馈~" def prepare_inputs(self, real_inputs, use_websearch, files, reply_language, chatbot): fake_inputs = real_inputs display_append = "" limited_context = False return limited_context, fake_inputs, display_append, real_inputs, chatbot def handle_file_upload(self, files, chatbot, language): """if the model accepts multi modal input, implement this function""" if files: for file in files: if file.name: logging.info(f"尝试读取图像: {file.name}") self.try_read_image(file.name) if self.image_path is not None: chatbot = chatbot + [((self.image_path,), None)] # if self.image_bytes is not None: # logging.info("使用图片作为输入") # # XMChat的一轮对话中实际上只能处理一张图片 # self.reset() # conv_id = str(uuid.uuid4()) # data = { # "user_id": self.api_key, # "session_id": self.session_id, # "uuid": conv_id, # "data_type": "imgbase64", # "data": self.image_bytes # } # response = requests.post(self.url, json=data) # response = json.loads(response.text) # logging.info(f"图片回复: {response['data']}") return None, chatbot, None def _get_imp_style_inputs(self): context = """ A chat between a curious user and an artificial intelligence assistant. This artificial intelligence assistant is a multimodal chatbot named as Imp, and developed by MILVLG team from Hangzhou Dianzi University. Imp gives helpful, detailed, and polite answers to the user's questions. """.strip() for ii, i in enumerate(self.history): if i["role"] == "user": if self.image_flag and ii == len(self.history) - 1: context = context.replace('\n', '') i["content"] = '\n' + i["content"] self.image_flag = False context += ' USER: ' + i["content"].strip()# + ' ' else: context += ' ASSISTANT: ' + i["content"].strip() + '' context += ' ASSISTANT:' return context def get_answer_at_once(self): # question = self.history[-1]["content"].strip() # question = f"{self.system_prompt.strip()} USER: \n{question} ASSISTANT:" global imp_model, imp_tokenizer prompt = self._get_imp_style_inputs() logging.info(prompt) # image_tok_cnt = prompt.count('') # global model, tokenizer input_ids = imp_tokenizer(prompt, return_tensors='pt').input_ids image_tensor = None if '' in prompt: # logging.info("Preprocessing...") image_tensor = imp_model.image_preprocess(self.image_bytes) output_ids = imp_model.generate( input_ids, max_new_tokens=3000, images=image_tensor, # max_length=self.token_upper_limit, do_sample=True if self.temperature > 0 else False, # top_k=self.top_k, top_p=self.top_p, temperature=self.temperature, # repetition_penalty=self.repetition_penalty, num_return_sequences=1, use_cache=True)[0] response = imp_tokenizer.decode(output_ids[input_ids.shape[1]:], skip_special_tokens=True).strip() return response, len(response)