LLAMA_BATCH / modeling_minicpmv.py
yuzaa's picture
support stream and system prompt (#15)
8ca9165 verified
raw
history blame
24.8 kB
import math
from typing import List, Optional
import json
import torch
import torchvision
from threading import Thread
from copy import deepcopy
from PIL import Image
from torchvision import transforms
from transformers import LlamaTokenizer, LlamaPreTrainedModel, LlamaForCausalLM, AutoModel, PreTrainedTokenizerFast, TextIteratorStreamer
from transformers.models.idefics2.modeling_idefics2 import Idefics2VisionTransformer
from .configuration_minicpm import MiniCPMVConfig
from .resampler import Resampler
IMAGENET_INCEPTION_MEAN = (0.5, 0.5, 0.5) # timm.data.IMAGENET_INCEPTION_MEAN
IMAGENET_INCEPTION_STD = (0.5, 0.5, 0.5) # timm.data.IMAGENET_INCEPTION_STD
class MiniCPMVPreTrainedModel(LlamaPreTrainedModel):
config_class = MiniCPMVConfig
class MiniCPMV(MiniCPMVPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.llm = LlamaForCausalLM(config)
self.vpm = self.init_vision_module()
self.vision_dim = self.vpm.embed_dim
self.embed_dim = self.llm.config.hidden_size
self.resampler = self.init_resampler(self.embed_dim, self.vision_dim)
self.transform = self.init_transform()
def init_vision_module(self):
# same as HuggingFaceM4/siglip-so400m-14-980-flash-attn2-navit
model = Idefics2VisionTransformer(self.config.vision_config)
if self.config.drop_vision_last_layer:
model.encoder.layers = model.encoder.layers[:-1]
setattr(model, 'embed_dim', model.embeddings.embed_dim)
setattr(model, 'patch_size', model.embeddings.patch_size)
return model
def init_resampler(self, embed_dim, vision_dim):
return Resampler(
num_queries=self.config.query_num,
embed_dim=embed_dim,
num_heads=embed_dim // 128,
kv_dim=vision_dim,
adaptive=True
)
def init_transform(self):
return transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize(
mean=IMAGENET_INCEPTION_MEAN, std=IMAGENET_INCEPTION_STD
),
]
)
def get_vllm_embedding(self, data):
if 'vision_hidden_states' not in data:
dtype = self.vpm.embeddings.position_embedding.weight.dtype
device = self.vpm.embeddings.position_embedding.weight.device
tgt_sizes = data['tgt_sizes']
pixel_values_list = data['pixel_values']
vision_hidden_states = []
all_pixel_values = []
img_cnt = []
for pixel_values in pixel_values_list:
img_cnt.append(len(pixel_values))
all_pixel_values.extend([i.flatten(end_dim=1).permute(1, 0) for i in pixel_values])
# exist image
if all_pixel_values:
tgt_sizes = torch.vstack(tgt_sizes).type(torch.int32)
if self.config.batch_vision_input:
max_patches = torch.max(tgt_sizes[:, 0] * tgt_sizes[:, 1])
all_pixel_values = torch.nn.utils.rnn.pad_sequence(all_pixel_values, batch_first=True,
padding_value=0.0)
B, L, _ = all_pixel_values.shape
all_pixel_values = all_pixel_values.permute(0, 2, 1).reshape(B, 3, -1, L)
patch_attn_mask = torch.zeros((B, 1, max_patches), dtype=torch.bool, device=device)
for i in range(B):
patch_attn_mask[i, :tgt_sizes[i][0] * tgt_sizes[i][1]] = True
vision_embedding = self.vpm(all_pixel_values.type(dtype), patch_attention_mask=patch_attn_mask).last_hidden_state
vision_embedding = self.resampler(vision_embedding, tgt_sizes)
else:
# get vision_embedding foreach
vision_embedding = []
for single_tgt_size, single_pixel_values in zip(tgt_sizes, all_pixel_values):
single_pixel_values = single_pixel_values.unsqueeze(0)
B, L, _ = single_pixel_values.shape
single_pixel_values = single_pixel_values.permute(0, 2, 1).reshape(B, 3, -1, L)
single_vision_embedding = self.vpm(single_pixel_values.type(dtype)).last_hidden_state
single_vision_embedding = self.resampler(single_vision_embedding, single_tgt_size.unsqueeze(0))
vision_embedding.append(single_vision_embedding)
vision_embedding = torch.vstack(vision_embedding)
start = 0
for pixel_values in pixel_values_list:
img_cnt = len(pixel_values)
if img_cnt > 0:
vision_hidden_states.append(vision_embedding[start: start + img_cnt])
start += img_cnt
else:
vision_hidden_states.append([])
else: # no image
if self.training:
dummy_image = torch.zeros(
(1, 3, 224, 224),
device=device, dtype=dtype
)
tgt_sizes = torch.Tensor([[(224 // self.config.patch_size), math.ceil(224 / self.config.patch_size)]]).type(torch.int32)
dummy_feature = self.resampler(self.vpm(dummy_image).last_hidden_state, tgt_sizes)
else:
dummy_feature = []
for _ in range(len(pixel_values_list)):
vision_hidden_states.append(dummy_feature)
else:
vision_hidden_states = data['vision_hidden_states']
if hasattr(self.llm.config, 'scale_emb'):
vllm_embedding = self.llm.model.embed_tokens(data['input_ids']) * self.llm.config.scale_emb
else:
vllm_embedding = self.llm.model.embed_tokens(data['input_ids'])
vision_hidden_states = [i.type(vllm_embedding.dtype) if isinstance(
i, torch.Tensor) else i for i in vision_hidden_states]
bs = len(data['input_ids'])
for i in range(bs):
cur_vs_hs = vision_hidden_states[i]
if len(cur_vs_hs) > 0:
cur_vllm_emb = vllm_embedding[i]
cur_image_bound = data['image_bound'][i]
if len(cur_image_bound) > 0:
image_indices = torch.stack(
[torch.arange(r[0], r[1], dtype=torch.long) for r in cur_image_bound]
).to(vllm_embedding.device)
cur_vllm_emb.scatter_(0, image_indices.view(-1, 1).repeat(1, cur_vllm_emb.shape[-1]),
cur_vs_hs.view(-1, cur_vs_hs.shape[-1]))
elif self.training:
cur_vllm_emb += cur_vs_hs[0].mean() * 0
return vllm_embedding, vision_hidden_states
def forward(self, data, **kwargs):
vllm_embedding, vision_hidden_states = self.get_vllm_embedding(data)
position_ids = data["position_ids"]
if position_ids.dtype != torch.int64:
position_ids = position_ids.long()
return self.llm(
input_ids=None,
position_ids=position_ids,
inputs_embeds=vllm_embedding,
**kwargs
)
def _convert_to_tensors(
self, tokenizer, input_ids, max_inp_length: Optional[int] = None
):
if max_inp_length is not None:
input_ids = input_ids[:max_inp_length]
input_ids = torch.tensor(input_ids, dtype=torch.int32)
image_start_tokens = torch.where(input_ids == tokenizer.im_start_id)[0]
# 跳过 im_start
image_start_tokens += 1
image_end_tokens = torch.where(input_ids == tokenizer.im_end_id)[0]
valid_image_nums = max(len(image_start_tokens), len(image_end_tokens))
image_bound = torch.hstack(
[
image_start_tokens[:valid_image_nums].unsqueeze(-1),
image_end_tokens[:valid_image_nums].unsqueeze(-1),
]
)
model_input = {}
model_input["input_ids"] = input_ids.unsqueeze(0).to(self.device)
model_input["image_bound"] = image_bound
return model_input
def _process_list(
self, tokenizer, input_id_list, max_inp_length: Optional[int] = None
):
pad_keys = ["input_ids"]
input_tensors = []
for input_ids in input_id_list:
input_tensors.append(
self._convert_to_tensors(tokenizer, input_ids, max_inp_length)
)
padded = {}
for key in pad_keys:
padded[key] = pad(input_tensors, key, padding_side="left").to(self.device)
padded["image_bound"] = [i["image_bound"] for i in input_tensors]
return padded
def _decode(self, inputs_embeds, tokenizer, **kwargs):
terminators = [
tokenizer.eos_token_id,
tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
output = self.llm.generate(
inputs_embeds=inputs_embeds,
pad_token_id=0,
eos_token_id=terminators,
**kwargs
)
return self._decode_text(output, tokenizer)
def _decode_stream(self, inputs_embeds, tokenizer, **kwargs):
terminators = [
tokenizer.eos_token_id,
tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
streamer = TextIteratorStreamer(tokenizer=tokenizer)
generation_kwargs = {
'inputs_embeds': inputs_embeds,
'pad_token_id': 0,
'eos_token_id': terminators,
'streamer': streamer
}
generation_kwargs.update(kwargs)
thread = Thread(target=self.llm.generate, kwargs=generation_kwargs)
thread.start()
return streamer
def _decode_text(self, result_ids, tokenizer):
result_text = []
for result in result_ids:
result = result[result != 0]
if result[0] == tokenizer.bos_id:
result = result[1:]
if result[-1] == tokenizer.eos_id or result[-1] == tokenizer.eot_id:
result = result[:-1]
result_text.append(tokenizer.decode(result).strip())
return result_text
def slice_image(self, image):
return slice_image(
image,
self.config.slice_config.max_slice_nums,
self.config.slice_config.scale_resolution,
self.config.slice_config.patch_size,
)
def get_slice_image_placeholder(self, image, tokenizer):
image_placeholder = (
tokenizer.im_start
+ tokenizer.unk_token * self.config.query_num
+ tokenizer.im_end
)
slice_images = []
source_image, patches, best_grid = slice_image(
image,
self.config.slice_config.max_slice_nums,
self.config.slice_config.scale_resolution,
self.config.slice_config.patch_size,
)
slice_images.append(source_image)
final_placeholder = image_placeholder
if len(patches) > 0:
for i in range(len(patches)):
for j in range(len(patches[0])):
slice_images.append(patches[i][j])
final_placeholder += get_grid_placeholder(
tokenizer, best_grid, self.config.query_num
)
return slice_images, final_placeholder
def reshape_by_patch(self, image_tensor):
"""
:param image_tensor: shape [3, H, W]
:param patch_size:
:return: [3, patch_size, HW/patch_size]
"""
patch_size = self.config.patch_size
patches = torch.nn.functional.unfold(
image_tensor,
(patch_size, patch_size),
stride=(patch_size, patch_size)
)
patches = patches.reshape(image_tensor.size(0), patch_size, patch_size, -1)
patches = patches.permute(0, 1, 3, 2).reshape(image_tensor.size(0), patch_size, -1)
return patches
def generate(
self,
input_id_list=None,
img_list=None,
tgt_sizes=None,
tokenizer=None,
max_inp_length: Optional[int] = None,
vision_hidden_states=None,
return_vision_hidden_states=False,
stream=False,
**kwargs
):
assert input_id_list is not None
bs = len(input_id_list)
if img_list == None:
img_list = [[] for i in range(bs)]
assert bs == len(img_list)
model_inputs = self._process_list(tokenizer, input_id_list, max_inp_length)
if vision_hidden_states is None:
pixel_values = []
for i in range(bs):
img_inps = []
for img in img_list[i]:
img_inps.append(img.to(self.device))
if img_inps:
pixel_values.append(img_inps)
else:
pixel_values.append([])
model_inputs["pixel_values"] = pixel_values
model_inputs['tgt_sizes'] = tgt_sizes
else:
model_inputs["vision_hidden_states"] = vision_hidden_states
with torch.inference_mode():
(
model_inputs["inputs_embeds"],
vision_hidden_states,
) = self.get_vllm_embedding(model_inputs)
if stream:
result = self._decode_stream(model_inputs["inputs_embeds"], tokenizer, **kwargs)
else:
result = self._decode(model_inputs["inputs_embeds"], tokenizer, **kwargs)
if return_vision_hidden_states:
return result, vision_hidden_states
return result
def chat(
self,
image,
msgs,
tokenizer,
vision_hidden_states=None,
max_new_tokens=1024,
sampling=True,
max_inp_length=2048,
system_prompt='',
stream=False,
**kwargs
):
if isinstance(msgs, str):
msgs = json.loads(msgs)
copy_msgs = deepcopy(msgs)
assert len(copy_msgs) > 0, 'msgs is empty'
assert sampling or not stream, 'if use stream mode, make sure sampling=True'
if image is not None and isinstance(copy_msgs[0]['content'], str):
copy_msgs[0]['content'] = [image, copy_msgs[0]['content']]
images = []
tgt_sizes = []
for i, msg in enumerate(copy_msgs):
role = msg["role"]
content = msg["content"]
assert role in ["user", "assistant"]
if i == 0:
assert role == "user", "The role of first msg should be user"
if isinstance(content, str):
content = [content]
cur_msgs = []
for c in content:
if isinstance(c, Image.Image):
image = c
if self.config.slice_mode:
slice_images, image_placeholder = self.get_slice_image_placeholder(
image, tokenizer
)
cur_msgs.append(image_placeholder)
for slice_image in slice_images:
slice_image = self.transform(slice_image)
H, W = slice_image.shape[1:]
images.append(self.reshape_by_patch(slice_image))
tgt_sizes.append(torch.Tensor([H // self.config.patch_size, W // self.config.patch_size]).type(torch.int32))
else:
images.append(self.transform(image))
cur_msgs.append(
tokenizer.im_start
+ tokenizer.unk_token * self.config.query_num
+ tokenizer.im_end
)
elif isinstance(c, str):
cur_msgs.append(c)
msg['content'] = '\n'.join(cur_msgs)
if tgt_sizes:
tgt_sizes = torch.vstack(tgt_sizes)
if system_prompt:
sys_msg = {'role': 'system', 'content': system_prompt}
copy_msgs = [sys_msg] + copy_msgs
input_ids = tokenizer.apply_chat_template(copy_msgs, tokenize=True, add_generation_prompt=False)
if sampling:
generation_config = {
"top_p": 0.8,
"top_k": 100,
"temperature": 0.7,
"do_sample": True,
"repetition_penalty": 1.05
}
else:
generation_config = {
"num_beams": 3,
"repetition_penalty": 1.2,
}
generation_config.update(
(k, kwargs[k]) for k in generation_config.keys() & kwargs.keys()
)
with torch.inference_mode():
res, vision_hidden_states = self.generate(
input_id_list=[input_ids],
max_inp_length=max_inp_length,
img_list=[images],
tgt_sizes=[tgt_sizes],
tokenizer=tokenizer,
max_new_tokens=max_new_tokens,
vision_hidden_states=vision_hidden_states,
return_vision_hidden_states=True,
stream=stream,
**generation_config
)
if stream:
def stream_gen():
for text in res:
text = text.replace(tokenizer.eot_token, '').replace(tokenizer.eos_token, '')
yield text
return stream_gen()
else:
answer = res[0]
return answer
class PreTrainedTokenizerFastWrapper(PreTrainedTokenizerFast):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.eot_token = "<|eot_id|>"
self.im_start = "<image>"
self.im_end = "</image>"
self.ref_start = "<ref>"
self.ref_end = "</ref>"
self.box_start = "<box>"
self.box_end = "</box>"
self.quad_start = "<quad>"
self.quad_end = "</quad>"
self.slice_start = "<slice>"
self.slice_end = "</slice>"
@property
def eos_id(self):
return self.eos_token_id
@property
def bos_id(self):
return self.bos_token_id
@property
def unk_id(self):
return self.unk_token_id
@property
def eot_id(self):
return self.convert_tokens_to_ids(self.eot_token)
@property
def im_start_id(self):
return self.convert_tokens_to_ids(self.im_start)
@property
def im_end_id(self):
return self.convert_tokens_to_ids(self.im_end)
@staticmethod
def escape(text: str) -> str:
return text
@staticmethod
def unescape(text: str) -> str:
return text
def pad(orig_items, key, max_length=None, padding_value=0, padding_side="left"):
items = []
if isinstance(orig_items[0][key], list):
assert isinstance(orig_items[0][key][0], torch.Tensor)
for it in orig_items:
for tr in it[key]:
items.append({key: tr})
else:
assert isinstance(orig_items[0][key], torch.Tensor)
items = orig_items
batch_size = len(items)
shape = items[0][key].shape
dim = len(shape)
assert dim <= 3
if max_length is None:
max_length = 0
max_length = max(max_length, max(item[key].shape[-1] for item in items))
min_length = min(item[key].shape[-1] for item in items)
dtype = items[0][key].dtype
if dim == 1:
return torch.cat([item[key] for item in items], dim=0)
elif dim == 2:
if max_length == min_length:
return torch.cat([item[key] for item in items], dim=0)
tensor = torch.zeros((batch_size, max_length), dtype=dtype) + padding_value
else:
tensor = (
torch.zeros((batch_size, max_length, shape[-1]), dtype=dtype)
+ padding_value
)
for i, item in enumerate(items):
if dim == 2:
if padding_side == "left":
tensor[i, -len(item[key][0]) :] = item[key][0].clone()
else:
tensor[i, : len(item[key][0])] = item[key][0].clone()
elif dim == 3:
if padding_side == "left":
tensor[i, -len(item[key][0]) :, :] = item[key][0].clone()
else:
tensor[i, : len(item[key][0]), :] = item[key][0].clone()
return tensor
def slice_image(
image, max_slice_nums=9, scale_resolution=448, patch_size=14, never_split=False
):
original_size = image.size
original_width, original_height = original_size
log_ratio = math.log(original_width / original_height)
ratio = original_width * original_height / (scale_resolution * scale_resolution)
multiple = min(math.ceil(ratio), max_slice_nums)
source_image = None
best_grid = None
patches = []
if multiple <= 1 or never_split:
# dont need to slice, upsample
best_size = find_best_resize(
original_size, scale_resolution, patch_size, allow_upscale=True
)
source_image = image.resize(best_size, Image.Resampling.BICUBIC)
else:
candidate_split_grids_nums = []
for i in [multiple - 1, multiple, multiple + 1]:
if i == 1 or i > max_slice_nums:
continue
candidate_split_grids_nums.append(i)
# source image, down-sampling and ensure divided by patch_size
best_resize = find_best_resize(original_size, scale_resolution, patch_size)
source_image = image.copy().resize(best_resize, Image.Resampling.BICUBIC)
candidate_grids = []
# find best grid
for split_grids_nums in candidate_split_grids_nums:
m = 1
while m <= split_grids_nums:
if split_grids_nums % m == 0:
candidate_grids.append([m, split_grids_nums // m])
m += 1
best_grid = [1, 1]
min_error = float("inf")
for grid in candidate_grids:
error = abs(log_ratio - math.log(grid[0] / grid[1]))
if error < min_error:
best_grid = grid
min_error = error
refine_size = get_refine_size(
original_size, best_grid, scale_resolution, patch_size, allow_upscale=True
)
refine_image = image.resize(refine_size, Image.Resampling.BICUBIC)
patches = split_to_patches(refine_image, best_grid)
return source_image, patches, best_grid
def ensure_divide(length, patch_size):
return max(round(length / patch_size) * patch_size, patch_size)
def find_best_resize(original_size, scale_resolution, patch_size, allow_upscale=False):
width, height = original_size
if (width * height > scale_resolution * scale_resolution) or allow_upscale:
r = width / height
height = int(scale_resolution / math.sqrt(r))
width = int(height * r)
best_width = ensure_divide(width, patch_size)
best_height = ensure_divide(height, patch_size)
return (best_width, best_height)
def get_refine_size(
original_size, grid, scale_resolution, patch_size, allow_upscale=False
):
width, height = original_size
grid_x, grid_y = grid
refine_width = ensure_divide(width, grid_x)
refine_height = ensure_divide(height, grid_y)
grid_width = refine_width / grid_x
grid_height = refine_height / grid_y
best_grid_size = find_best_resize(
(grid_width, grid_height),
scale_resolution,
patch_size,
allow_upscale=allow_upscale,
)
refine_size = (best_grid_size[0] * grid_x, best_grid_size[1] * grid_y)
return refine_size
def split_to_patches(image, grid):
patches = []
width, height = image.size
grid_x = int(width / grid[0])
grid_y = int(height / grid[1])
for i in range(0, height, grid_y):
images = []
for j in range(0, width, grid_x):
box = (j, i, j + grid_x, i + grid_y)
patch = image.crop(box)
images.append(patch)
patches.append(images)
return patches
def get_grid_placeholder(tokenizer, grid, query_num):
image_placeholder = (
tokenizer.im_start + tokenizer.unk_token * query_num + tokenizer.im_end
)
cols = grid[0]
rows = grid[1]
slices = []
for i in range(rows):
lines = []
for j in range(cols):
lines.append(image_placeholder)
slices.append("".join(lines))
slice_placeholder = tokenizer.slice_start + "\n".join(slices) + tokenizer.slice_end
return slice_placeholder