|
import torch |
|
import os |
|
import warnings |
|
import shutil |
|
import base64 |
|
import dataclasses |
|
|
|
from PIL import Image |
|
from io import BytesIO |
|
from enum import auto, Enum |
|
from typing import List, Tuple |
|
from transformers import StoppingCriteria |
|
|
|
|
|
|
|
IGNORE_INDEX = -100 |
|
IMAGE_TOKEN_INDEX = -200 |
|
DEFAULT_IMAGE_TOKEN = "<image>" |
|
DEFAULT_IMAGE_PATCH_TOKEN = "<im_patch>" |
|
DEFAULT_IM_START_TOKEN = "<im_start>" |
|
DEFAULT_IM_END_TOKEN = "<im_end>" |
|
|
|
|
|
def disable_torch_init(): |
|
""" |
|
Disable the redundant torch default initialization to accelerate model creation. |
|
""" |
|
import torch |
|
setattr(torch.nn.Linear, "reset_parameters", lambda self: None) |
|
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None) |
|
|
|
|
|
def load_image_from_base64(image): |
|
return Image.open(BytesIO(base64.b64decode(image))) |
|
|
|
|
|
def expand2square(pil_img, background_color): |
|
width, height = pil_img.size |
|
if width == height: |
|
return pil_img |
|
elif width > height: |
|
result = Image.new(pil_img.mode, (width, width), background_color) |
|
result.paste(pil_img, (0, (width - height) // 2)) |
|
return result |
|
else: |
|
result = Image.new(pil_img.mode, (height, height), background_color) |
|
result.paste(pil_img, ((height - width) // 2, 0)) |
|
return result |
|
|
|
|
|
def process_images(images, image_processor, model_cfg): |
|
image_aspect_ratio = getattr(model_cfg, "image_aspect_ratio", None) |
|
new_images = [] |
|
if image_aspect_ratio == 'pad': |
|
for image in images: |
|
image = expand2square(image, tuple(int(x*255) for x in image_processor.image_mean)) |
|
image = image_processor.preprocess(image, return_tensors='pt')['pixel_values'][0] |
|
new_images.append(image) |
|
else: |
|
return image_processor(images, return_tensors='pt')['pixel_values'] |
|
if all(x.shape == new_images[0].shape for x in new_images): |
|
new_images = torch.stack(new_images, dim=0) |
|
return new_images |
|
|
|
|
|
def tokenizer_image_token(prompt, tokenizer, image_token_index=IMAGE_TOKEN_INDEX, return_tensors=None): |
|
prompt_chunks = [tokenizer(chunk).input_ids for chunk in prompt.split('<image>')] |
|
|
|
def insert_separator(X, sep): |
|
return [ele for sublist in zip(X, [sep]*len(X)) for ele in sublist][:-1] |
|
|
|
input_ids = [] |
|
offset = 0 |
|
if len(prompt_chunks) > 0 and len(prompt_chunks[0]) > 0 and prompt_chunks[0][0] == tokenizer.bos_token_id: |
|
offset = 1 |
|
input_ids.append(prompt_chunks[0][0]) |
|
|
|
for x in insert_separator(prompt_chunks, [image_token_index] * (offset + 1)): |
|
input_ids.extend(x[offset:]) |
|
|
|
if return_tensors is not None: |
|
if return_tensors == 'pt': |
|
return torch.tensor(input_ids, dtype=torch.long) |
|
raise ValueError(f'Unsupported tensor type: {return_tensors}') |
|
return input_ids |
|
|
|
|
|
def get_model_name_from_path(model_path): |
|
model_path = model_path.strip("/") |
|
model_paths = model_path.split("/") |
|
if model_paths[-1].startswith('checkpoint-'): |
|
return model_paths[-2] + "_" + model_paths[-1] |
|
else: |
|
return model_paths[-1] |
|
|
|
class KeywordsStoppingCriteria(StoppingCriteria): |
|
def __init__(self, keywords, tokenizer, input_ids): |
|
self.keywords = keywords |
|
self.keyword_ids = [] |
|
self.max_keyword_len = 0 |
|
for keyword in keywords: |
|
cur_keyword_ids = tokenizer(keyword).input_ids |
|
if len(cur_keyword_ids) > 1 and cur_keyword_ids[0] == tokenizer.bos_token_id: |
|
cur_keyword_ids = cur_keyword_ids[1:] |
|
if len(cur_keyword_ids) > self.max_keyword_len: |
|
self.max_keyword_len = len(cur_keyword_ids) |
|
self.keyword_ids.append(torch.tensor(cur_keyword_ids)) |
|
self.tokenizer = tokenizer |
|
self.start_len = input_ids.shape[1] |
|
|
|
def call_for_batch(self, output_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: |
|
offset = min(output_ids.shape[1] - self.start_len, self.max_keyword_len) |
|
self.keyword_ids = [keyword_id.to(output_ids.device) for keyword_id in self.keyword_ids] |
|
for keyword_id in self.keyword_ids: |
|
if (output_ids[0, -keyword_id.shape[0]:] == keyword_id).all(): |
|
return True |
|
outputs = self.tokenizer.batch_decode(output_ids[:, -offset:], skip_special_tokens=True)[0] |
|
for keyword in self.keywords: |
|
if keyword in outputs: |
|
return True |
|
return False |
|
|
|
def __call__(self, output_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: |
|
outputs = [] |
|
for i in range(output_ids.shape[0]): |
|
outputs.append(self.call_for_batch(output_ids[i].unsqueeze(0), scores)) |
|
return all(outputs) |
|
|
|
|
|
""" |
|
Conversation related |
|
""" |
|
|
|
class SeparatorStyle(Enum): |
|
"""Different separator style.""" |
|
SINGLE = auto() |
|
TWO = auto() |
|
MPT = auto() |
|
PLAIN = auto() |
|
LLAMA_2 = auto() |
|
|
|
|
|
@dataclasses.dataclass |
|
class Conversation: |
|
"""A class that keeps all conversation history.""" |
|
system: str |
|
roles: List[str] |
|
messages: List[List[str]] |
|
offset: int |
|
sep_style: SeparatorStyle = SeparatorStyle.SINGLE |
|
sep: str = "###" |
|
sep2: str = None |
|
version: str = "Unknown" |
|
|
|
skip_next: bool = False |
|
|
|
def get_prompt(self): |
|
messages = self.messages |
|
if len(messages) > 0 and type(messages[0][1]) is tuple: |
|
messages = self.messages.copy() |
|
init_role, init_msg = messages[0].copy() |
|
init_msg = init_msg[0].replace("<image>", "").strip() |
|
if 'mmtag' in self.version: |
|
messages[0] = (init_role, init_msg) |
|
messages.insert(0, (self.roles[0], "<Image><image></Image>")) |
|
messages.insert(1, (self.roles[1], "Received.")) |
|
else: |
|
messages[0] = (init_role, "<image>\n" + init_msg) |
|
|
|
if self.sep_style == SeparatorStyle.SINGLE: |
|
ret = self.system + self.sep |
|
for role, message in messages: |
|
if message: |
|
if type(message) is tuple: |
|
message, _, _ = message |
|
ret += role + ": " + message + self.sep |
|
else: |
|
ret += role + ":" |
|
elif self.sep_style == SeparatorStyle.TWO: |
|
seps = [self.sep, self.sep2] |
|
ret = self.system + seps[0] |
|
for i, (role, message) in enumerate(messages): |
|
if message: |
|
if type(message) is tuple: |
|
message, _, _ = message |
|
ret += role + ": " + message + seps[i % 2] |
|
else: |
|
ret += role + ":" |
|
elif self.sep_style == SeparatorStyle.MPT: |
|
ret = self.system + self.sep |
|
for role, message in messages: |
|
if message: |
|
if type(message) is tuple: |
|
message, _, _ = message |
|
ret += role + message + self.sep |
|
else: |
|
ret += role |
|
elif self.sep_style == SeparatorStyle.LLAMA_2: |
|
wrap_sys = lambda msg: f"<<SYS>>\n{msg}\n<</SYS>>\n\n" |
|
if self.version == 'llama_v2_alaya': |
|
wrap_inst = lambda msg: f"{self.roles[0]} {msg} {self.roles[1]}" |
|
else: |
|
wrap_inst = lambda msg: f"[INST] {msg} [/INST]" |
|
ret = "" |
|
|
|
for i, (role, message) in enumerate(messages): |
|
if i == 0: |
|
assert message, "first message should not be none" |
|
assert role == self.roles[0], "first message should come from user" |
|
if message: |
|
if type(message) is tuple: |
|
message, _, _ = message |
|
if i == 0: |
|
if self.system: |
|
message = wrap_sys(self.system) + message |
|
if i % 2 == 0: |
|
message = wrap_inst(message) |
|
ret += self.sep + message |
|
else: |
|
ret += " " + message + " " + self.sep2 |
|
else: |
|
ret += "" |
|
ret = ret.lstrip(self.sep) |
|
elif self.sep_style == SeparatorStyle.PLAIN: |
|
seps = [self.sep, self.sep2] |
|
ret = self.system |
|
for i, (role, message) in enumerate(messages): |
|
if message: |
|
if type(message) is tuple: |
|
message, _, _ = message |
|
ret += message + seps[i % 2] |
|
else: |
|
ret += "" |
|
else: |
|
raise ValueError(f"Invalid style: {self.sep_style}") |
|
|
|
return ret |
|
|
|
def append_message(self, role, message): |
|
self.messages.append([role, message]) |
|
|
|
def get_images(self, return_pil=False): |
|
images = [] |
|
for i, (role, msg) in enumerate(self.messages[self.offset:]): |
|
if i % 2 == 0: |
|
if type(msg) is tuple: |
|
import base64 |
|
from io import BytesIO |
|
from PIL import Image |
|
msg, image, image_process_mode = msg |
|
if image_process_mode == "Pad": |
|
def expand2square(pil_img, background_color=(122, 116, 104)): |
|
width, height = pil_img.size |
|
if width == height: |
|
return pil_img |
|
elif width > height: |
|
result = Image.new(pil_img.mode, (width, width), background_color) |
|
result.paste(pil_img, (0, (width - height) // 2)) |
|
return result |
|
else: |
|
result = Image.new(pil_img.mode, (height, height), background_color) |
|
result.paste(pil_img, ((height - width) // 2, 0)) |
|
return result |
|
image = expand2square(image) |
|
elif image_process_mode in ["Default", "Crop"]: |
|
pass |
|
elif image_process_mode == "Resize": |
|
image = image.resize((336, 336)) |
|
else: |
|
raise ValueError(f"Invalid image_process_mode: {image_process_mode}") |
|
max_hw, min_hw = max(image.size), min(image.size) |
|
aspect_ratio = max_hw / min_hw |
|
max_len, min_len = 800, 400 |
|
shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw)) |
|
longest_edge = int(shortest_edge * aspect_ratio) |
|
W, H = image.size |
|
if longest_edge != max(image.size): |
|
if H > W: |
|
H, W = longest_edge, shortest_edge |
|
else: |
|
H, W = shortest_edge, longest_edge |
|
image = image.resize((W, H)) |
|
if return_pil: |
|
images.append(image) |
|
else: |
|
buffered = BytesIO() |
|
image.save(buffered, format="PNG") |
|
img_b64_str = base64.b64encode(buffered.getvalue()).decode() |
|
images.append(img_b64_str) |
|
return images |
|
|
|
def to_gradio_chatbot(self): |
|
ret = [] |
|
for i, (role, msg) in enumerate(self.messages[self.offset:]): |
|
if i % 2 == 0: |
|
if type(msg) is tuple: |
|
import base64 |
|
from io import BytesIO |
|
msg, image, image_process_mode = msg |
|
max_hw, min_hw = max(image.size), min(image.size) |
|
aspect_ratio = max_hw / min_hw |
|
max_len, min_len = 800, 400 |
|
shortest_edge = int(min(max_len / aspect_ratio, min_len, min_hw)) |
|
longest_edge = int(shortest_edge * aspect_ratio) |
|
W, H = image.size |
|
if H > W: |
|
H, W = longest_edge, shortest_edge |
|
else: |
|
H, W = shortest_edge, longest_edge |
|
image = image.resize((W, H)) |
|
buffered = BytesIO() |
|
image.save(buffered, format="JPEG") |
|
img_b64_str = base64.b64encode(buffered.getvalue()).decode() |
|
img_str = f'<img src="data:image/png;base64,{img_b64_str}" alt="user upload image" />' |
|
msg = img_str + msg.replace('<image>', '').strip() |
|
ret.append([msg, None]) |
|
else: |
|
ret.append([msg, None]) |
|
else: |
|
ret[-1][-1] = msg |
|
return ret |
|
|
|
def copy(self): |
|
return Conversation( |
|
system=self.system, |
|
roles=self.roles, |
|
messages=[[x, y] for x, y in self.messages], |
|
offset=self.offset, |
|
sep_style=self.sep_style, |
|
sep=self.sep, |
|
sep2=self.sep2, |
|
version=self.version) |
|
|
|
def dict(self): |
|
if len(self.get_images()) > 0: |
|
return { |
|
"system": self.system, |
|
"roles": self.roles, |
|
"messages": [[x, y[0] if type(y) is tuple else y] for x, y in self.messages], |
|
"offset": self.offset, |
|
"sep": self.sep, |
|
"sep2": self.sep2, |
|
} |
|
return { |
|
"system": self.system, |
|
"roles": self.roles, |
|
"messages": self.messages, |
|
"offset": self.offset, |
|
"sep": self.sep, |
|
"sep2": self.sep2, |
|
} |
|
|
|
|
|
conv_mmalaya_llama = Conversation( |
|
system="", |
|
roles=("### Instruction:\t\n", "### Output:\t\n"), |
|
version="llama_v2_alaya", |
|
messages=(), |
|
offset=0, |
|
sep_style=SeparatorStyle.LLAMA_2, |
|
sep="<s>", |
|
sep2="</s>", |
|
) |
|
|
|
default_conversation = conv_mmalaya_llama |
|
conv_templates = { |
|
"mmalaya_llama": conv_mmalaya_llama, |
|
} |
|
|