|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
from torch import nn |
|
import math |
|
from dataclasses import dataclass |
|
from typing import Optional, Tuple |
|
|
|
from transformers.utils import ModelOutput |
|
from transformers.modeling_utils import PreTrainedModel |
|
|
|
from .configuration_siglip import SiglipVisionConfig |
|
from .configuration_minicpm import MiniCPMConfig |
|
from .configuration_minicpmv import MiniCPMVConfig |
|
|
|
from .resampler import Resampler |
|
from .modeling_minicpm import MiniCPMForCausalLM |
|
from .modeling_siglip import SiglipVisionModel |
|
|
|
from transformers import LlamaTokenizer |
|
|
|
|
|
@dataclass |
|
class CausalVLMOutput(ModelOutput): |
|
loss: Optional[torch.FloatTensor] = None |
|
logits: torch.FloatTensor = None |
|
hidden_states: Optional[Tuple[torch.FloatTensor]] = None |
|
vision_hidden_states: Optional[Tuple[torch.FloatTensor]] = None |
|
attentions: Optional[Tuple[torch.FloatTensor]] = None |
|
|
|
|
|
class MiniCPMVForCausalLM(PreTrainedModel): |
|
model_type = "minicpm" |
|
_supports_flash_attn_2 = True |
|
|
|
def __init__(self, config: MiniCPMVConfig, adaptive=False): |
|
super().__init__(config) |
|
|
|
llm_config = config.llm_config |
|
vpm_config = config.vpm_config |
|
|
|
self.query_num = config.query_num |
|
self.patch_size = vpm_config.patch_size |
|
self.adaptive = adaptive |
|
self.slice_mode = config.slice_mode |
|
self.max_slice_nums = config.max_slice_nums |
|
self.mm_use_im_start_end = config.mm_use_im_start_end |
|
|
|
drop_vision_last_layer = config.drop_vision_last_layer |
|
|
|
|
|
vpm = SiglipVisionModel(vpm_config).vision_model |
|
|
|
if drop_vision_last_layer: |
|
vpm.encoder.layers = nn.ModuleList(vpm.encoder.layers[:-1]) |
|
|
|
self.vpm = vpm |
|
|
|
|
|
self.llm = MiniCPMForCausalLM(llm_config) |
|
|
|
embed_dim = llm_config.hidden_size |
|
|
|
self.resampler = Resampler( |
|
num_queries=config.query_num, |
|
embed_dim=embed_dim, |
|
num_heads=embed_dim // 128, |
|
kv_dim=vpm_config.hidden_size, |
|
adaptive=adaptive |
|
) |
|
|
|
return |
|
|
|
def vpm_forward(self, data): |
|
if 'vision_hidden_states' not in data: |
|
dtype = self.vpm.embeddings.position_embedding.weight.dtype |
|
device = self.vpm.embeddings.position_embedding.weight.device |
|
|
|
pixel_values_list = data['pixel_values'] |
|
tgt_sizes = data['tgt_sizes'] |
|
|
|
vision_hidden_states = [] |
|
|
|
all_pixel_values = [] |
|
img_cnt = [] |
|
|
|
for pixel_values in pixel_values_list: |
|
img_cnt.append(len(pixel_values)) |
|
all_pixel_values.extend([i.flatten(end_dim=1).permute(1, 0) for i in pixel_values]) |
|
|
|
|
|
if all_pixel_values: |
|
tgt_sizes = torch.vstack(tgt_sizes).type(torch.int32) |
|
max_patches = torch.max(tgt_sizes[:, 0] * tgt_sizes[:, 1]) |
|
|
|
all_pixel_values = torch.nn.utils.rnn.pad_sequence(all_pixel_values, batch_first=True, padding_value=0.0) |
|
all_pixel_values = all_pixel_values.to(device) |
|
|
|
B, L, _ = all_pixel_values.shape |
|
all_pixel_values = all_pixel_values.permute(0, 2, 1).reshape(B, 3, -1, L) |
|
|
|
patch_attn_mask = torch.zeros((B, 1, max_patches), dtype=torch.bool, device=device) |
|
for i in range(B): |
|
patch_attn_mask[i, :tgt_sizes[i][0] * tgt_sizes[i][1]] = True |
|
|
|
vision_embedding = self.vpm(all_pixel_values.type(dtype), patch_attention_mask=patch_attn_mask).last_hidden_state |
|
vision_embedding = self.resampler(vision_embedding, tgt_sizes) |
|
|
|
start = 0 |
|
for pixel_values in pixel_values_list: |
|
img_cnt = len(pixel_values) |
|
if img_cnt > 0: |
|
vision_hidden_states.append(vision_embedding[start: start + img_cnt]) |
|
start += img_cnt |
|
else: |
|
vision_hidden_states.append([]) |
|
else: |
|
if self.training: |
|
dummy_image = torch.zeros( |
|
(1, 3, 224, 224), |
|
device=device, dtype=dtype |
|
) |
|
|
|
tgt_sizes = torch.Tensor([[(224 // self.patch_size), math.ceil(224 / self.patch_size)]]).type(torch.int32) |
|
dummy_feature = self.resampler(self.vpm(dummy_image).last_hidden_state, tgt_sizes) |
|
else: |
|
dummy_feature = [] |
|
for _ in range(len(pixel_values_list)): |
|
vision_hidden_states.append(dummy_feature) |
|
|
|
else: |
|
vision_hidden_states = data['vision_hidden_states'] |
|
|
|
if hasattr(self.llm.config, 'scale_emb'): |
|
vllm_embedding = self.llm.model.embed_tokens(data['input_ids']) * self.llm.config.scale_emb |
|
else: |
|
vllm_embedding = self.llm.model.embed_tokens(data['input_ids']) |
|
|
|
vision_hidden_states = [i.type(vllm_embedding.dtype) if isinstance( |
|
i, torch.Tensor) else i for i in vision_hidden_states] |
|
|
|
bs = len(data['input_ids']) |
|
for i in range(bs): |
|
cur_vs_hs = vision_hidden_states[i] |
|
|
|
if len(cur_vs_hs) > 0: |
|
|
|
cur_vllm_emb = vllm_embedding[i] |
|
|
|
cur_image_bound = data['image_bound'][i] |
|
|
|
if len(cur_image_bound) > 0: |
|
|
|
image_indices = torch.stack( |
|
[torch.arange(r[0], r[1], dtype=torch.long) for r in cur_image_bound] |
|
).to(vllm_embedding.device) |
|
|
|
cur_vllm_emb.scatter_( |
|
0, |
|
image_indices.view(-1, 1).repeat(1, cur_vllm_emb.shape[-1]), |
|
cur_vs_hs.view(-1, cur_vs_hs.shape[-1]) |
|
) |
|
|
|
return vllm_embedding, vision_hidden_states |
|
|
|
def forward(self, data, **kwargs): |
|
vllm_embedding, vision_hidden_states = self.vpm_forward(data) |
|
|
|
output = self.llm( |
|
inputs_embeds=vllm_embedding, |
|
attention_mask=data["attention_mask"], |
|
return_dict=True |
|
) |
|
|
|
return CausalVLMOutput( |
|
logits=output.logits, |
|
hidden_states=output.hidden_states, |
|
vision_hidden_states=vision_hidden_states |
|
) |
|
|
|
def generate(self, data, **kwargs): |
|
vllm_embedding, vision_hidden_states = self.vpm_forward(data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
output = self.llm.generate( |
|
inputs_embeds=vllm_embedding, |
|
|
|
attention_mask=data["attention_mask"], |
|
**kwargs |
|
) |
|
|
|
return output |
|
|
|
|