MiniCPM-V / modeling_minicpmv.py
finalf0's picture
Fix device for cpu
b4c82be
raw
history blame contribute delete
No virus
13 kB
import math
from typing import List, Optional
import timm
import torch
import torchvision
from timm.data import IMAGENET_INCEPTION_MEAN, IMAGENET_INCEPTION_STD
from torchvision import transforms
from transformers import LlamaTokenizer
from .configuration_minicpm import MiniCPMVConfig
from .modeling_minicpm import MiniCPMPreTrainedModel, MiniCPMForCausalLM
from .resampler import Resampler
class MiniCPMVPreTrainedModel(MiniCPMPreTrainedModel):
config_class = MiniCPMVConfig
class MiniCPMV(MiniCPMVPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.llm = MiniCPMForCausalLM(config)
self.vpm = self.init_vision_module()
self.vision_dim = self.vpm.embed_dim
self.embed_dim = self.llm.config.hidden_size
self.resampler = self.init_resampler(self.embed_dim ,self.vision_dim)
self.transform = self.init_transform()
def init_vision_module(self):
model = timm.create_model(
self.config.vision_encoder,
pretrained=False,
num_classes=0,
dynamic_img_size=True,
dynamic_img_pad=True
)
if isinstance(model, timm.models.VisionTransformer):
if model.attn_pool is not None:
model.attn_pool = torch.nn.Identity()
if self.config.drop_vision_last_layer:
model.blocks = model.blocks[:-1]
return model
def init_resampler(self, embed_dim, vision_dim):
return Resampler(
grid_size=int(math.sqrt(self.config.query_num)),
embed_dim=embed_dim,
num_heads=embed_dim // 128,
kv_dim=vision_dim,
)
def init_transform(self):
return transforms.Compose([
transforms.Resize(
(self.config.image_size, self.config.image_size),
interpolation=torchvision.transforms.InterpolationMode.BICUBIC
),
transforms.ToTensor(),
transforms.Normalize(mean=IMAGENET_INCEPTION_MEAN, std=IMAGENET_INCEPTION_STD)
])
def get_vision_embedding(self, pixel_values):
res = []
dtype = self.vpm.pos_embed.data.dtype
for pixel_value in pixel_values:
vision_embedding = self.vpm.forward_features(pixel_value.unsqueeze(0).type(dtype))
if hasattr(self.vpm, 'num_prefix_tokens') and self.vpm.num_prefix_tokens > 0:
vision_embedding = vision_embedding[:, self.vpm.num_prefix_tokens:]
res.append(self.resampler(vision_embedding))
return torch.vstack(res)
def get_vllm_embedding(self, data):
if 'vision_hidden_states' not in data:
pixel_values_list = data['pixel_values']
vision_hidden_states = []
for pixel_values in pixel_values_list:
if len(pixel_values) > 0:
vision_hidden_states.append(self.get_vision_embedding(pixel_values))
elif self.training:
dtype = self.vpm.pos_embed.data.dtype
device = self.vpm.pos_embed.data.device
dummy_image = torch.zeros(
(1, 3, 224, 224),
device=device, dtype=dtype
)
vision_hidden_states.append(self.get_vision_embedding(dummy_image))
else:
vision_hidden_states.append([])
else:
vision_hidden_states = data['vision_hidden_states']
vllm_embedding = self.llm.model.embed_tokens(data['input_ids']) * self.llm.config.scale_emb
vision_hidden_states = [i.type(vllm_embedding.dtype) if isinstance(
i, torch.Tensor) else i for i in vision_hidden_states]
bs = len(data['input_ids'])
for i in range(bs):
cur_vs_hs = vision_hidden_states[i]
if len(cur_vs_hs) > 0:
cur_vllm_emb = vllm_embedding[i]
cur_image_bound = data['image_bound'][i]
if len(cur_image_bound) > 0:
image_indices = torch.stack(
[torch.arange(r[0], r[1], dtype=torch.long) for r in cur_image_bound]
).to(vllm_embedding.device)
cur_vllm_emb.scatter_(0, image_indices.view(-1, 1).repeat(1, cur_vllm_emb.shape[-1]),
cur_vs_hs.view(-1, cur_vs_hs.shape[-1]))
elif self.training:
cur_vllm_emb += cur_vs_hs[0].mean() * 0
return vllm_embedding, vision_hidden_states
def forward(self, data, **kwargs):
vllm_embedding, vision_hidden_states = self.get_vllm_embedding(data)
position_ids = data["position_ids"]
if position_ids.dtype != torch.int64:
position_ids = position_ids.long()
return self.llm(
input_ids=None,
position_ids=position_ids,
inputs_embeds=vllm_embedding,
**kwargs
)
def _convert_to_tensors(self, tokenizer, input_str, max_inp_length: Optional[int] = None):
if tokenizer.add_bos_token:
input_ids = tokenizer.encode(input_str)
else:
input_ids = [tokenizer.bos_id] + tokenizer.encode(input_str)
if max_inp_length is not None:
input_ids = input_ids[: max_inp_length]
input_ids = torch.tensor(input_ids, dtype=torch.int32)
image_start_tokens = torch.where(input_ids == tokenizer.im_start_id)[0]
# 跳过 im_start
image_start_tokens += 1
image_end_tokens = torch.where(input_ids == tokenizer.im_end_id)[0]
valid_image_nums = max(len(image_start_tokens), len(image_end_tokens))
image_bound = torch.hstack(
[image_start_tokens[: valid_image_nums].unsqueeze(-1),
image_end_tokens[:valid_image_nums].unsqueeze(-1)]
)
model_input = {}
model_input["input_ids"] = input_ids.unsqueeze(0).to(self.device)
model_input["image_bound"] = image_bound
return model_input
def _process_list(self, tokenizer, data_list: List[str], max_inp_length: Optional[int] = None):
pad_keys = ['input_ids']
input_tensors = []
for data in data_list:
input_tensors.append(self._convert_to_tensors(tokenizer, data, max_inp_length))
padded = {}
for key in pad_keys:
padded[key] = pad(input_tensors, key, padding_side="left").to(self.device)
padded['image_bound'] = [i['image_bound'] for i in input_tensors]
return padded
def _decode(self, inputs_embeds, tokenizer, **kwargs):
output = self.llm.generate(
inputs_embeds=inputs_embeds,
pad_token_id=0,
eos_token_id=tokenizer.eos_token_id,
**kwargs
)
return self._decode_text(output, tokenizer)
def _decode_text(self, result_ids, tokenizer):
result_text = []
for result in result_ids:
result = result[result != 0]
if result[0] == tokenizer.bos_id:
result = result[1:]
if result[-1] == tokenizer.eos_id:
result = result[:-1]
result_text.append(tokenizer.decode(result).strip())
return result_text
def generate(
self,
data_list=None,
img_list=None,
tokenizer=None,
max_inp_length: Optional[int] = None,
vision_hidden_states=None,
return_vision_hidden_states=False,
**kwargs
):
assert data_list is not None
bs = len(data_list)
if img_list == None:
img_list = [[] for i in range(bs)]
assert bs == len(img_list)
model_inputs = self._process_list(tokenizer, data_list, max_inp_length)
if vision_hidden_states is None:
pixel_values = []
for i in range(bs):
img_inps = []
for img in img_list[i]:
img_inps.append(self.transform(img))
if img_inps:
pixel_values.append(torch.stack(img_inps).to(self.device))
else:
pixel_values.append([])
model_inputs['pixel_values'] = pixel_values
else:
model_inputs['vision_hidden_states'] = vision_hidden_states
with torch.inference_mode():
model_inputs['inputs_embeds'], vision_hidden_states = self.get_vllm_embedding(model_inputs)
result = self._decode(model_inputs['inputs_embeds'], tokenizer, **kwargs)
if return_vision_hidden_states:
return result, vision_hidden_states
return result
def chat(self, image, msgs, context, tokenizer, vision_hidden_states=None, max_new_tokens=2048, sampling=False, **kwargs):
if isinstance(msgs, str):
msgs = json.loads(msgs)
# msgs to prompt
prompt = ''
for i, msg in enumerate(msgs):
role = msg['role']
content = msg['content']
assert role in ['user', 'assistant']
if i == 0:
assert role == 'user', 'The role of first msg should be user'
content = tokenizer.im_start + tokenizer.unk_token * self.config.query_num + tokenizer.im_end + '\n' + content
prompt += '<用户>' if role=='user' else '<AI>'
prompt += content
prompt += '<AI>'
final_input = prompt
if sampling:
generation_config = {
'top_p': 0.8,
'top_k': 100,
'temperature':0.6,
'do_sample': True
}
else:
generation_config = {
'num_beams': 3,
'repetition_penalty': 1.2,
}
generation_config.update((k, kwargs[k]) for k in generation_config.keys() & kwargs.keys())
with torch.inference_mode():
res, vision_hidden_states = self.generate(
data_list=[final_input],
max_inp_length=2048,
img_list=[[image]],
tokenizer=tokenizer,
max_new_tokens=max_new_tokens,
vision_hidden_states=vision_hidden_states,
return_vision_hidden_states=True,
**generation_config
)
answer = res[0]
context = msgs
context.append({'role':'assistant', 'content': answer})
return answer, context, generation_config
class LlamaTokenizerWrapper(LlamaTokenizer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.im_start = "<image>"
self.im_end = "</image>"
self.ref_start = "<ref>"
self.ref_end = "</ref>"
self.box_start = "<box>"
self.box_end = "</box>"
self.quad_start = "<quad>"
self.quad_end = "</quad>"
@property
def eos_id(self):
return self.sp_model.eos_id()
@property
def bos_id(self):
return self.sp_model.bos_id()
@property
def unk_id(self):
return self.sp_model.unk_id()
@property
def im_start_id(self):
return self._convert_token_to_id(self.im_start)
@property
def im_end_id(self):
return self._convert_token_to_id(self.im_end)
def pad(orig_items, key, max_length=None, padding_value=0, padding_side="left"):
items = []
if isinstance(orig_items[0][key], list):
assert isinstance(orig_items[0][key][0], torch.Tensor)
for it in orig_items:
for tr in it[key]:
items.append({key: tr})
else:
assert isinstance(orig_items[0][key], torch.Tensor)
items = orig_items
batch_size = len(items)
shape = items[0][key].shape
dim = len(shape)
assert dim <= 3
if max_length is None:
max_length = 0
max_length = max(max_length, max(item[key].shape[-1] for item in items))
min_length = min(item[key].shape[-1] for item in items)
dtype = items[0][key].dtype
if dim == 1:
return torch.cat([item[key] for item in items], dim=0)
elif dim == 2:
if max_length == min_length:
return torch.cat([item[key] for item in items], dim=0)
tensor = torch.zeros((batch_size, max_length), dtype=dtype) + padding_value
else:
tensor = torch.zeros((batch_size, max_length, shape[-1]), dtype=dtype) + padding_value
for i, item in enumerate(items):
if dim == 2:
if padding_side == "left":
tensor[i, -len(item[key][0]):] = item[key][0].clone()
else:
tensor[i, : len(item[key][0])] = item[key][0].clone()
elif dim == 3:
if padding_side == "left":
tensor[i, -len(item[key][0]):, :] = item[key][0].clone()
else:
tensor[i, : len(item[key][0]), :] = item[key][0].clone()
return tensor