|
import math |
|
from typing import List, Optional |
|
import json |
|
import timm |
|
import torch |
|
import torchvision |
|
import deepspeed |
|
from PIL import Image |
|
from timm.data import IMAGENET_INCEPTION_MEAN, IMAGENET_INCEPTION_STD |
|
from torchvision import transforms |
|
from transformers import LlamaTokenizer |
|
from transformers.integrations import is_deepspeed_zero3_enabled |
|
from .configuration_minicpm import MiniCPMVConfig |
|
from .modeling_minicpm import MiniCPMForCausalLM, MiniCPMPreTrainedModel |
|
from .resampler import Resampler |
|
|
|
|
|
class MiniCPMVPreTrainedModel(MiniCPMPreTrainedModel): |
|
config_class = MiniCPMVConfig |
|
|
|
|
|
class MiniCPMV(MiniCPMVPreTrainedModel): |
|
def __init__(self, config): |
|
super().__init__(config) |
|
|
|
self.llm = MiniCPMForCausalLM(config) |
|
self.vpm = self.init_vision_module() |
|
self.vision_dim = self.vpm.embed_dim |
|
self.embed_dim = self.llm.config.hidden_size |
|
self.resampler = self.init_resampler(self.embed_dim, self.vision_dim) |
|
self.transform = self.init_transform() |
|
|
|
def init_vision_module(self): |
|
model = timm.create_model( |
|
self.config.vision_encoder, |
|
pretrained=False, |
|
num_classes=0, |
|
dynamic_img_size=True, |
|
dynamic_img_pad=True |
|
) |
|
|
|
if isinstance(model, timm.models.VisionTransformer): |
|
if model.attn_pool is not None: |
|
model.attn_pool = torch.nn.Identity() |
|
|
|
if self.config.drop_vision_last_layer: |
|
model.blocks = model.blocks[:-1] |
|
|
|
return model |
|
|
|
def init_resampler(self, embed_dim, vision_dim): |
|
return Resampler( |
|
grid_size=int(math.sqrt(self.config.query_num)), |
|
embed_dim=embed_dim, |
|
num_heads=embed_dim // 128, |
|
kv_dim=vision_dim, |
|
adaptive=True |
|
) |
|
|
|
def init_transform(self): |
|
return transforms.Compose( |
|
[ |
|
transforms.ToTensor(), |
|
transforms.Normalize( |
|
mean=IMAGENET_INCEPTION_MEAN, std=IMAGENET_INCEPTION_STD |
|
), |
|
] |
|
) |
|
|
|
def get_input_embeddings(self): |
|
return self.llm.get_input_embeddings() |
|
|
|
def set_input_embeddings(self, value): |
|
self.llm.embed_tokens = value |
|
|
|
def get_vision_embedding(self, pixel_values): |
|
res = [] |
|
dtype = self.llm.lm_head.weight.dtype |
|
def process_each_pixel(pixel_value, dtype, config, vpm, resampler): |
|
H, W = pixel_value.shape[-2:] |
|
target_size = (math.ceil(H / config.patch_size), math.ceil(W / config.patch_size)) |
|
vision_embedding = self.vpm.forward_features(pixel_value.unsqueeze(0).type(dtype)) |
|
if hasattr(vpm, 'num_prefix_tokens') and vpm.num_prefix_tokens > 0: |
|
vision_embedding = vision_embedding[:, vpm.num_prefix_tokens:] |
|
return resampler(vision_embedding, target_size) |
|
|
|
if is_deepspeed_zero3_enabled(): |
|
with deepspeed.zero.GatheredParameters(self.vpm.pos_embed, modifier_rank=0): |
|
for pixel_value in pixel_values: |
|
result = process_each_pixel(pixel_value, dtype, self.config, self.vpm, self.resampler) |
|
res.append(result) |
|
else: |
|
for pixel_value in pixel_values: |
|
result = process_each_pixel(pixel_value, dtype, self.config, self.vpm, self.resampler) |
|
res.append(result) |
|
return torch.vstack(res) |
|
|
|
def get_vllm_embedding(self, data): |
|
if "vision_hidden_states" not in data: |
|
pixel_values_list = data["pixel_values"] |
|
vision_hidden_states = [] |
|
for pixel_values in pixel_values_list: |
|
if len(pixel_values) > 0: |
|
vision_hidden_states.append(self.get_vision_embedding(pixel_values)) |
|
elif self.training: |
|
dtype = self.llm.lm_head.weight.dtype |
|
device = self.llm.lm_head.weight.device |
|
dummy_image = torch.zeros( |
|
(1, 3, 224, 224), device=device, dtype=dtype |
|
) |
|
vision_hidden_states.append(self.get_vision_embedding(dummy_image)) |
|
else: |
|
vision_hidden_states.append([]) |
|
|
|
else: |
|
vision_hidden_states = data["vision_hidden_states"] |
|
|
|
vllm_embedding = ( |
|
self.llm.model.embed_tokens(data["input_ids"]) * self.llm.config.scale_emb |
|
) |
|
vision_hidden_states = [ |
|
i.type(vllm_embedding.dtype) if isinstance(i, torch.Tensor) else i |
|
for i in vision_hidden_states |
|
] |
|
|
|
bs = len(data["input_ids"]) |
|
for i in range(bs): |
|
cur_vs_hs = vision_hidden_states[i] |
|
if len(cur_vs_hs) > 0: |
|
cur_vllm_emb = vllm_embedding[i] |
|
cur_image_bound = data["image_bound"][i] |
|
if len(cur_image_bound) > 0: |
|
image_indices = torch.stack( |
|
[ |
|
torch.arange(r[0], r[1], dtype=torch.long) |
|
for r in cur_image_bound |
|
] |
|
).to(vllm_embedding.device) |
|
|
|
cur_vllm_emb.scatter_( |
|
0, |
|
image_indices.view(-1, 1).repeat(1, cur_vllm_emb.shape[-1]), |
|
cur_vs_hs.view(-1, cur_vs_hs.shape[-1]), |
|
) |
|
elif self.training: |
|
cur_vllm_emb += cur_vs_hs[0].mean() * 0 |
|
|
|
return vllm_embedding, vision_hidden_states |
|
|
|
def forward(self, data, **kwargs): |
|
vllm_embedding, vision_hidden_states = self.get_vllm_embedding(data) |
|
position_ids = data["position_ids"] |
|
if position_ids.dtype != torch.int64: |
|
position_ids = position_ids.long() |
|
|
|
return self.llm( |
|
input_ids=None, |
|
position_ids=position_ids, |
|
inputs_embeds=vllm_embedding, |
|
**kwargs |
|
) |
|
|
|
def _convert_to_tensors( |
|
self, tokenizer, input_str, max_inp_length: Optional[int] = None |
|
): |
|
if tokenizer.add_bos_token: |
|
input_ids = tokenizer.encode(input_str) |
|
else: |
|
input_ids = [tokenizer.bos_id] + tokenizer.encode(input_str) |
|
if max_inp_length is not None: |
|
input_ids = input_ids[:max_inp_length] |
|
input_ids = torch.tensor(input_ids, dtype=torch.int32) |
|
|
|
image_start_tokens = torch.where(input_ids == tokenizer.im_start_id)[0] |
|
|
|
image_start_tokens += 1 |
|
image_end_tokens = torch.where(input_ids == tokenizer.im_end_id)[0] |
|
valid_image_nums = max(len(image_start_tokens), len(image_end_tokens)) |
|
image_bound = torch.hstack( |
|
[ |
|
image_start_tokens[:valid_image_nums].unsqueeze(-1), |
|
image_end_tokens[:valid_image_nums].unsqueeze(-1), |
|
] |
|
) |
|
|
|
model_input = {} |
|
model_input["input_ids"] = input_ids.unsqueeze(0).to(self.device) |
|
model_input["image_bound"] = image_bound |
|
|
|
return model_input |
|
|
|
def _process_list( |
|
self, tokenizer, data_list: List[str], max_inp_length: Optional[int] = None |
|
): |
|
pad_keys = ["input_ids"] |
|
input_tensors = [] |
|
for data in data_list: |
|
input_tensors.append( |
|
self._convert_to_tensors(tokenizer, data, max_inp_length) |
|
) |
|
padded = {} |
|
for key in pad_keys: |
|
padded[key] = pad(input_tensors, key, padding_side="left").to(self.device) |
|
padded["image_bound"] = [i["image_bound"] for i in input_tensors] |
|
return padded |
|
|
|
def _decode(self, inputs_embeds, tokenizer, **kwargs): |
|
output = self.llm.generate( |
|
inputs_embeds=inputs_embeds, |
|
pad_token_id=0, |
|
eos_token_id=tokenizer.eos_token_id, |
|
**kwargs |
|
) |
|
return self._decode_text(output, tokenizer) |
|
|
|
def _decode_text(self, result_ids, tokenizer): |
|
result_text = [] |
|
for result in result_ids: |
|
result = result[result != 0] |
|
if result[0] == tokenizer.bos_id: |
|
result = result[1:] |
|
if result[-1] == tokenizer.eos_id: |
|
result = result[:-1] |
|
result_text.append(tokenizer.decode(result).strip()) |
|
return result_text |
|
|
|
def slice_image(self, image): |
|
return slice_image( |
|
image, |
|
self.config.max_slice_nums, |
|
self.config.scale_resolution, |
|
self.config.patch_size, |
|
) |
|
|
|
def get_slice_image_placeholder(self, image, tokenizer): |
|
image_placeholder = ( |
|
tokenizer.im_start |
|
+ tokenizer.unk_token * self.config.query_num |
|
+ tokenizer.im_end |
|
) |
|
|
|
slice_images = [] |
|
|
|
source_image, patches, best_grid = slice_image( |
|
image, |
|
self.config.max_slice_nums, |
|
self.config.scale_resolution, |
|
self.config.patch_size, |
|
) |
|
|
|
slice_images.append(source_image) |
|
final_placeholder = image_placeholder |
|
|
|
if len(patches) > 0: |
|
for i in range(len(patches)): |
|
for j in range(len(patches[0])): |
|
slice_images.append(patches[i][j]) |
|
|
|
final_placeholder += get_grid_placeholder( |
|
tokenizer, best_grid, self.config.query_num |
|
) |
|
|
|
return slice_images, final_placeholder |
|
|
|
def generate( |
|
self, |
|
data_list=None, |
|
img_list=None, |
|
tokenizer=None, |
|
max_inp_length: Optional[int] = None, |
|
vision_hidden_states=None, |
|
return_vision_hidden_states=False, |
|
**kwargs |
|
): |
|
|
|
assert data_list is not None |
|
bs = len(data_list) |
|
if img_list == None: |
|
img_list = [[] for i in range(bs)] |
|
assert bs == len(img_list) |
|
|
|
model_inputs = self._process_list(tokenizer, data_list, max_inp_length) |
|
|
|
if vision_hidden_states is None: |
|
pixel_values = [] |
|
for i in range(bs): |
|
img_inps = [] |
|
for img in img_list[i]: |
|
img_inps.append(self.transform(img).to(self.device)) |
|
if img_inps: |
|
pixel_values.append(img_inps) |
|
else: |
|
pixel_values.append([]) |
|
model_inputs["pixel_values"] = pixel_values |
|
else: |
|
model_inputs["vision_hidden_states"] = vision_hidden_states |
|
|
|
with torch.inference_mode(): |
|
( |
|
model_inputs["inputs_embeds"], |
|
vision_hidden_states, |
|
) = self.get_vllm_embedding(model_inputs) |
|
|
|
result = self._decode(model_inputs["inputs_embeds"], tokenizer, **kwargs) |
|
|
|
if return_vision_hidden_states: |
|
return result, vision_hidden_states |
|
|
|
return result |
|
|
|
def chat( |
|
self, |
|
image, |
|
msgs, |
|
context, |
|
tokenizer, |
|
vision_hidden_states=None, |
|
max_new_tokens=1024, |
|
sampling=True, |
|
max_inp_length=2048, |
|
**kwargs |
|
): |
|
if isinstance(msgs, str): |
|
msgs = json.loads(msgs) |
|
|
|
prompt = "" |
|
for i, msg in enumerate(msgs): |
|
role = msg["role"] |
|
content = msg["content"] |
|
assert role in ["user", "assistant"] |
|
if i == 0: |
|
assert role == "user", "The role of first msg should be user" |
|
if self.config.slice_mode: |
|
images, final_placeholder = self.get_slice_image_placeholder( |
|
image, tokenizer |
|
) |
|
content = final_placeholder + "\n" + content |
|
else: |
|
images = [image] |
|
content = ( |
|
tokenizer.im_start |
|
+ tokenizer.unk_token * self.config.query_num |
|
+ tokenizer.im_end |
|
+ "\n" |
|
+ content |
|
) |
|
prompt += "<用户>" if role == "user" else "<AI>" |
|
prompt += content |
|
prompt += "<AI>" |
|
final_input = prompt |
|
|
|
if sampling: |
|
generation_config = { |
|
"top_p": 0.8, |
|
"top_k": 100, |
|
"temperature": 0.7, |
|
"do_sample": True, |
|
"repetition_penalty": 1.05 |
|
} |
|
else: |
|
generation_config = { |
|
"num_beams": 3, |
|
"repetition_penalty": 1.2, |
|
} |
|
|
|
generation_config.update( |
|
(k, kwargs[k]) for k in generation_config.keys() & kwargs.keys() |
|
) |
|
|
|
with torch.inference_mode(): |
|
res, vision_hidden_states = self.generate( |
|
data_list=[final_input], |
|
max_inp_length=max_inp_length, |
|
img_list=[images], |
|
tokenizer=tokenizer, |
|
max_new_tokens=max_new_tokens, |
|
vision_hidden_states=vision_hidden_states, |
|
return_vision_hidden_states=True, |
|
**generation_config |
|
) |
|
answer = res[0] |
|
context = msgs.copy() |
|
context.append({"role": "assistant", "content": answer}) |
|
|
|
return answer, context, generation_config |
|
|
|
|
|
class LlamaTokenizerWrapper(LlamaTokenizer): |
|
def __init__(self, **kwargs): |
|
super().__init__(**kwargs) |
|
self.im_start = "<image>" |
|
self.im_end = "</image>" |
|
self.ref_start = "<ref>" |
|
self.ref_end = "</ref>" |
|
self.box_start = "<box>" |
|
self.box_end = "</box>" |
|
self.quad_start = "<quad>" |
|
self.quad_end = "</quad>" |
|
self.point_start = "<point>" |
|
self.point_end = "</point>" |
|
self.slice_start = "<slice>" |
|
self.slice_end = "</slice>" |
|
|
|
@property |
|
def eos_id(self): |
|
return self.sp_model.eos_id() |
|
|
|
@property |
|
def bos_id(self): |
|
return self.sp_model.bos_id() |
|
|
|
@property |
|
def unk_id(self): |
|
return self.sp_model.unk_id() |
|
|
|
@property |
|
def im_start_id(self): |
|
return self._convert_token_to_id(self.im_start) |
|
|
|
@property |
|
def im_end_id(self): |
|
return self._convert_token_to_id(self.im_end) |
|
|
|
|
|
def pad(orig_items, key, max_length=None, padding_value=0, padding_side="left"): |
|
items = [] |
|
if isinstance(orig_items[0][key], list): |
|
assert isinstance(orig_items[0][key][0], torch.Tensor) |
|
for it in orig_items: |
|
for tr in it[key]: |
|
items.append({key: tr}) |
|
else: |
|
assert isinstance(orig_items[0][key], torch.Tensor) |
|
items = orig_items |
|
|
|
batch_size = len(items) |
|
shape = items[0][key].shape |
|
dim = len(shape) |
|
assert dim <= 3 |
|
if max_length is None: |
|
max_length = 0 |
|
max_length = max(max_length, max(item[key].shape[-1] for item in items)) |
|
min_length = min(item[key].shape[-1] for item in items) |
|
dtype = items[0][key].dtype |
|
|
|
if dim == 1: |
|
return torch.cat([item[key] for item in items], dim=0) |
|
elif dim == 2: |
|
if max_length == min_length: |
|
return torch.cat([item[key] for item in items], dim=0) |
|
tensor = torch.zeros((batch_size, max_length), dtype=dtype) + padding_value |
|
else: |
|
tensor = ( |
|
torch.zeros((batch_size, max_length, shape[-1]), dtype=dtype) |
|
+ padding_value |
|
) |
|
|
|
for i, item in enumerate(items): |
|
if dim == 2: |
|
if padding_side == "left": |
|
tensor[i, -len(item[key][0]) :] = item[key][0].clone() |
|
else: |
|
tensor[i, : len(item[key][0])] = item[key][0].clone() |
|
elif dim == 3: |
|
if padding_side == "left": |
|
tensor[i, -len(item[key][0]) :, :] = item[key][0].clone() |
|
else: |
|
tensor[i, : len(item[key][0]), :] = item[key][0].clone() |
|
|
|
return tensor |
|
|
|
|
|
def slice_image( |
|
image, max_slice_nums=9, scale_resolution=448, patch_size=14, never_split=False |
|
): |
|
original_size = image.size |
|
original_width, original_height = original_size |
|
log_ratio = math.log(original_width / original_height) |
|
ratio = original_width * original_height / (scale_resolution * scale_resolution) |
|
multiple = min(math.ceil(ratio), max_slice_nums) |
|
|
|
source_image = None |
|
best_grid = None |
|
patches = [] |
|
|
|
if multiple <= 1 or never_split: |
|
|
|
best_size = find_best_resize( |
|
original_size, scale_resolution, patch_size, allow_upscale=True |
|
) |
|
source_image = image.resize(best_size, Image.Resampling.BICUBIC) |
|
else: |
|
candidate_split_grids_nums = [] |
|
for i in [multiple - 1, multiple, multiple + 1]: |
|
if i == 1 or i > max_slice_nums: |
|
continue |
|
candidate_split_grids_nums.append(i) |
|
|
|
|
|
best_resize = find_best_resize(original_size, scale_resolution, patch_size) |
|
source_image = image.copy().resize(best_resize, Image.Resampling.BICUBIC) |
|
candidate_grids = [] |
|
|
|
|
|
for split_grids_nums in candidate_split_grids_nums: |
|
m = 1 |
|
while m <= split_grids_nums: |
|
if split_grids_nums % m == 0: |
|
candidate_grids.append([m, split_grids_nums // m]) |
|
m += 1 |
|
|
|
best_grid = [1, 1] |
|
min_error = float("inf") |
|
for grid in candidate_grids: |
|
error = abs(log_ratio - math.log(grid[0] / grid[1])) |
|
if error < min_error: |
|
best_grid = grid |
|
min_error = error |
|
|
|
refine_size = get_refine_size( |
|
original_size, best_grid, scale_resolution, patch_size, allow_upscale=True |
|
) |
|
|
|
refine_image = image.resize(refine_size, Image.Resampling.BICUBIC) |
|
patches = split_to_patches(refine_image, best_grid) |
|
|
|
return source_image, patches, best_grid |
|
|
|
|
|
def ensure_divide(length, patch_size): |
|
return max(round(length / patch_size) * patch_size, patch_size) |
|
|
|
|
|
def find_best_resize(original_size, scale_resolution, patch_size, allow_upscale=False): |
|
width, height = original_size |
|
if (width * height > scale_resolution * scale_resolution) or allow_upscale: |
|
r = width / height |
|
height = int(scale_resolution / math.sqrt(r)) |
|
width = int(height * r) |
|
best_width = ensure_divide(width, patch_size) |
|
best_height = ensure_divide(height, patch_size) |
|
return (best_width, best_height) |
|
|
|
|
|
def get_refine_size( |
|
original_size, grid, scale_resolution, patch_size, allow_upscale=False |
|
): |
|
width, height = original_size |
|
grid_x, grid_y = grid |
|
|
|
refine_width = ensure_divide(width, grid_x) |
|
refine_height = ensure_divide(height, grid_y) |
|
|
|
grid_width = refine_width / grid_x |
|
grid_height = refine_height / grid_y |
|
|
|
best_grid_size = find_best_resize( |
|
(grid_width, grid_height), |
|
scale_resolution, |
|
patch_size, |
|
allow_upscale=allow_upscale, |
|
) |
|
|
|
refine_size = (best_grid_size[0] * grid_x, best_grid_size[1] * grid_y) |
|
|
|
return refine_size |
|
|
|
|
|
def split_to_patches(image, grid): |
|
patches = [] |
|
width, height = image.size |
|
grid_x = int(width / grid[0]) |
|
grid_y = int(height / grid[1]) |
|
|
|
for i in range(0, height, grid_y): |
|
images = [] |
|
for j in range(0, width, grid_x): |
|
box = (j, i, j + grid_x, i + grid_y) |
|
patch = image.crop(box) |
|
images.append(patch) |
|
patches.append(images) |
|
|
|
return patches |
|
|
|
|
|
def get_grid_placeholder(tokenizer, grid, query_num): |
|
image_placeholder = ( |
|
tokenizer.im_start + tokenizer.unk_token * query_num + tokenizer.im_end |
|
) |
|
|
|
cols = grid[0] |
|
rows = grid[1] |
|
slices = [] |
|
for i in range(rows): |
|
lines = [] |
|
for j in range(cols): |
|
lines.append(image_placeholder) |
|
slices.append("".join(lines)) |
|
slice_placeholder = tokenizer.slice_start + "\n".join(slices) + tokenizer.slice_end |
|
return slice_placeholder |
|
|