import base64 import dataclasses from io import BytesIO from enum import auto, Enum from typing import List, Tuple from PIL import Image from .constants import LOGDIR, NUM_FRAMES class SeparatorStyle(Enum): """Different separator style.""" SINGLE = auto() TWO = auto() MPT = auto() PLAIN = auto() LLAMA_2 = auto() @dataclasses.dataclass class Conversation: """A class that keeps all conversation history.""" system: str roles: List[str] messages: List[List[str]] offset: int sep_style: SeparatorStyle = SeparatorStyle.SINGLE sep: str = "###" sep2: str = None version: str = "Unknown" skip_next: bool = False modality: str = "image" def get_prompt(self): messages = self.messages modality_token = f"<{self.modality}>" if len(messages) > 0 and type(messages[0][1]) is tuple: messages = self.messages.copy() init_role, init_msg = messages[0].copy() init_msg = init_msg[0].replace(modality_token, "").strip() if 'mmtag' in self.version: messages[0] = (init_role, init_msg) messages.insert(0, (self.roles[0], "")) messages.insert(1, (self.roles[1], "Received.")) else: messages[0] = (init_role, f"{modality_token}\n" + init_msg) if self.sep_style == SeparatorStyle.SINGLE: ret = self.system + self.sep for role, message in messages: if message: if type(message) is tuple: message, _, _ = message ret += role + ": " + message + self.sep else: ret += role + ":" elif self.sep_style == SeparatorStyle.TWO: seps = [self.sep, self.sep2] ret = self.system + seps[0] for i, (role, message) in enumerate(messages): if message: if type(message) is tuple: message, _, _ = message ret += role + ": " + message + seps[i % 2] else: ret += role + ":" elif self.sep_style == SeparatorStyle.MPT: ret = self.system + self.sep for role, message in messages: if message: if type(message) is tuple: message, _, _ = message ret += role + message + self.sep else: ret += role elif self.sep_style == SeparatorStyle.LLAMA_2: wrap_sys = lambda msg: f"<>\n{msg}\n<>\n\n" wrap_inst = lambda msg: f"[INST] {msg} [/INST]" ret = "" for i, (role, message) in enumerate(messages): if i == 0: assert message, "first message should not be none" assert role == self.roles[0], "first message should come from user" if message: if type(message) is tuple: message, _, _ = message if i == 0: message = wrap_sys(self.system) + message if i % 2 == 0: message = wrap_inst(message) ret += self.sep + message else: ret += " " + message + " " + self.sep2 else: ret += "" ret = ret.lstrip(self.sep) elif self.sep_style == SeparatorStyle.PLAIN: seps = [self.sep, self.sep2] ret = self.system for i, (role, message) in enumerate(messages): if message: if type(message) is tuple: message, _, _ = message ret += message + seps[i % 2] else: ret += "" else: raise ValueError(f"Invalid style: {self.sep_style}") return ret def append_message(self, role, message): self.messages.append([role, message]) def process_image(self, image, image_process_mode, return_pil=False, image_format='PNG', max_len=800, min_len=400): if image_process_mode == "Pad": def expand2square(pil_img, background_color=(122, 116, 104)): width, height = pil_img.size if width == height: return pil_img elif width > height: result = Image.new(pil_img.mode, (width, width), background_color) result.paste(pil_img, (0, (width - height) // 2)) return result else: result = Image.new(pil_img.mode, (height, height), background_color) result.paste(pil_img, ((height - width) // 2, 0)) return result image = expand2square(image) elif image_process_mode in ["Default", "Crop"]: pass elif image_process_mode == "Resize": image = image.resize((336, 336)) else: raise ValueError(f"Invalid image_process_mode: {image_process_mode}") if max(image.size) > max_len: max_hw, min_hw = max(image.size), min(image.size) aspect_ratio = max_hw / min_hw shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw)) longest_edge = int(shortest_edge * aspect_ratio) W, H = image.size if H > W: H, W = longest_edge, shortest_edge else: H, W = shortest_edge, longest_edge image = image.resize((W, H)) if return_pil: return image else: buffered = BytesIO() image.save(buffered, format=image_format) img_b64_str = base64.b64encode(buffered.getvalue()).decode() return img_b64_str def get_videos(self, return_pil=False): video_frames = [] for i, (role, msg) in enumerate(self.messages[self.offset:]): if i % 2 == 0: if type(msg) is tuple: from decord import VideoReader, cpu import numpy as np # here video is the file path of input video msg, video, image_process_mode = msg if not return_pil: # return filepath video_frames.append(video) else: # read video using decord.VideoReader decord_vr = VideoReader(uri=video, ctx=cpu(0)) duration = len(decord_vr) frame_id_list = np.linspace(0, duration-1, NUM_FRAMES, dtype=int) # convert the extracted image frames into PIL objects all_images = [Image.fromarray(f) for f in decord_vr.get_batch(frame_id_list).asnumpy()] video_frames.extend([self.process_image(image, image_process_mode, return_pil=return_pil) for image in all_images]) return video_frames def get_images(self, return_pil=False): images = [] for i, (role, msg) in enumerate(self.messages[self.offset:]): if i % 2 == 0: if type(msg) is tuple: msg, image, image_process_mode = msg image = self.process_image(image, image_process_mode, return_pil=return_pil) images.append(image) # import base64 # from io import BytesIO # from PIL import Image # # here image is a PIL object # msg, image, image_process_mode = msg # if image_process_mode == "Pad": # def expand2square(pil_img, background_color=(122, 116, 104)): # width, height = pil_img.size # if width == height: # return pil_img # elif width > height: # result = Image.new(pil_img.mode, (width, width), background_color) # result.paste(pil_img, (0, (width - height) // 2)) # return result # else: # result = Image.new(pil_img.mode, (height, height), background_color) # result.paste(pil_img, ((height - width) // 2, 0)) # return result # image = expand2square(image) # elif image_process_mode in ["Default", "Crop"]: # pass # elif image_process_mode == "Resize": # image = image.resize((336, 336)) # else: # raise ValueError(f"Invalid image_process_mode: {image_process_mode}") # max_hw, min_hw = max(image.size), min(image.size) # aspect_ratio = max_hw / min_hw # max_len, min_len = 800, 400 # shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw)) # longest_edge = int(shortest_edge * aspect_ratio) # W, H = image.size # if longest_edge != max(image.size): # if H > W: # H, W = longest_edge, shortest_edge # else: # H, W = shortest_edge, longest_edge # image = image.resize((W, H)) # if return_pil: # images.append(image) # else: # buffered = BytesIO() # image.save(buffered, format="PNG") # img_b64_str = base64.b64encode(buffered.getvalue()).decode() # images.append(img_b64_str) return images def to_gradio_chatbot(self): ret = [] for i, (role, msg) in enumerate(self.messages[self.offset:]): if i % 2 == 0: if type(msg) is tuple: # import base64 # from io import BytesIO # from PIL import Image # msg, image, image_process_mode = msg # max_hw, min_hw = max(image.size), min(image.size) # aspect_ratio = max_hw / min_hw # max_len, min_len = 800, 400 # shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw)) # longest_edge = int(shortest_edge * aspect_ratio) # W, H = image.size # if H > W: # H, W = longest_edge, shortest_edge # else: # H, W = shortest_edge, longest_edge # image = image.resize((W, H)) # buffered = BytesIO() # image.save(buffered, format="JPEG") # img_b64_str = base64.b64encode(buffered.getvalue()).decode() # img_str = f'user upload image' # display image/video in the textbox msg, image_or_video, image_process_mode = msg ##print("imagebox:", image) if isinstance(image_or_video, Image.Image): # image is PIL object img_b64_str = self.process_image(image_or_video, "Default", return_pil=False, image_format='JPEG') img_str = f'user upload image' msg = img_str + msg.replace('', '').strip() else: # video is file path vid_str = f'
' msg = vid_str + msg.replace('