import os from importlib import import_module from typing import List, Callable, Union, Optional import PIL.Image import torch import torch.nn.functional as F from torch import LongTensor, IntTensor, Tensor from transformers import CLIPImageProcessor, CLIPVisionModel, SiglipImageProcessor, SiglipVisionModel from transformers import PreTrainedModel, AutoModel, AutoTokenizer, AutoModelForCausalLM, AutoImageProcessor from transformers.generation.utils import GenerateOutput from transformers.cache_utils import HybridCache from .configuration_ovis import BaseVisualTokenizerConfig, ClipVisualTokenizerConfig, SiglipVisualTokenizerConfig from .configuration_ovis import OvisConfig, ConversationFormatter, IGNORE_INDEX, IMAGE_TOKEN_INDEX # ---------------------------------------------------------------------- # Visual Tokenizer # ---------------------------------------------------------------------- class BaseVisualTokenizer(PreTrainedModel): base_model_prefix = "backbone" main_input_name = None _image_processor_class = None _image_processor_kwargs = {} _backbone_class = None _backbone_name_or_path = None def __init__(self, config: BaseVisualTokenizerConfig, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) if kwargs.get('train_from_scratch'): self.image_processor = self._image_processor_class.from_pretrained( self._backbone_name_or_path, **self._image_processor_kwargs) self.backbone = self._backbone_class.from_pretrained( self._backbone_name_or_path, **self.config.backbone_kwargs) self.config.backbone_config = self.backbone.config else: self.image_processor = AutoImageProcessor.from_pretrained( kwargs['image_processor_name_or_path']) self.backbone = AutoModel.from_config(self.config.backbone_config) self.head = None assert all((self.image_processor.do_resize, not getattr(self.image_processor, 'do_center_crop', False), self.image_processor.do_rescale, self.image_processor.do_normalize )), f"image_processor `{self.image_processor}` is not supported currently" def get_backbone(self): return self.backbone def get_image_processor(self): return self.image_processor def get_zero_pixel_values(self, n=1): height, width = self.get_image_size() if self.config.hd_booster is None: return torch.zeros(n, 3, height, width) elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']: return torch.zeros(n, 3 * 5, height, width) else: raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}') def get_head(self): return self.head def get_image_size(self): raise NotImplementedError def preprocess_image(self, image: PIL.Image.Image, convert_to_rgb=True): def _preprocess(img: PIL.Image.Image): # first resize and preprocess sides = self.get_image_size() if sides[0] != sides[1]: raise ValueError('get_image_size() returns non-square size') side = sides[0] w, h = img.size if w == h: new_width = new_height = side elif w > h: new_width = side new_height = int(h / w * new_width) else: new_height = side new_width = int(w / h * new_height) new_size = dict(height=new_height, width=new_width) pixel_values = self.image_processor.preprocess( img, size=new_size, return_tensors='pt')['pixel_values'] # then pad to square square_values = torch.zeros( [1, 3, side, side], dtype=pixel_values.dtype, device=pixel_values.device) new_height, new_width = pixel_values.shape[2:] if new_height == new_width: square_values[:, :, :, :] = pixel_values elif new_height > new_width: from_index = (side - new_width) // 2 square_values[:, :, :, from_index:from_index + new_width] = pixel_values else: from_index = (side - new_height) // 2 square_values[:, :, from_index:from_index + new_height, :] = pixel_values return square_values if convert_to_rgb and image.mode != 'RGB': image = image.convert('RGB') if self.config.hd_booster is None: return _preprocess(image) # [1, 3, side, side] elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']: width, height = image.size is_low_resolution = (height < self.get_image_size()[0] * 1.5 or width < self.get_image_size()[1] * 1.5) if self.config.hd_booster == 's2wrapper-adaptive' and is_low_resolution: values = self.get_zero_pixel_values() + torch.inf values[0][:3] = _preprocess(image)[0] else: center_x, center_y = width // 2, height // 2 image_top_left = image.crop((0, 0, center_x, center_y)) image_top_right = image.crop((center_x, 0, width, center_y)) image_bottom_left = image.crop((0, center_y, center_x, height)) image_bottom_right = image.crop((center_x, center_y, width, height)) imgs = [image, image_top_left, image_top_right, image_bottom_left, image_bottom_right] values = torch.cat([_preprocess(img) for img in imgs], dim=1) return values # [1, 3*5, side, side] else: raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}') def get_backbone_layer(self, index): return self.backbone.vision_model.encoder.layers[index] def tokenize(self, logits): def st_argmax(y_soft, dim): # straight-through softmax index = y_soft.max(dim, keepdim=True)[1] y_hard = torch.zeros_like( y_soft, memory_format=torch.legacy_contiguous_format).scatter_(dim, index, 1.0) ret = y_hard - y_soft.detach() + y_soft return ret if self.config.tokenize_function == 'softmax': tokens = F.softmax(logits, dim=-1) elif self.config.tokenize_function == 'gumbel_argmax': tokens = F.gumbel_softmax(logits, tau=self.config.tau, hard=True) elif self.config.tokenize_function == 'st_argmax': tokens = st_argmax(logits, dim=-1) else: raise ValueError( f'Invalid `max_type`, expected softmax or gumbel_argmax or st_argmax,' f' but got {self.config.tokenize_function}') return tokens class ClipVisualTokenizer(BaseVisualTokenizer): config_class = ClipVisualTokenizerConfig supports_gradient_checkpointing = True _no_split_modules = ["CLIPEncoderLayer"] _image_processor_class = CLIPImageProcessor _image_processor_kwargs = dict(do_center_crop=False) _backbone_class = CLIPVisionModel _backbone_name_or_path = "openai/clip-vit-large-patch14-336" def __init__(self, config: ClipVisualTokenizerConfig = None, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) head_dim = self.config.vocab_size if self.config.use_indicators: head_dim -= 2 # reserved for two image indicator tokens if self.config.hd_booster is None: self.head = torch.nn.Sequential( torch.nn.Linear(self.backbone.config.hidden_size, head_dim, bias=False), torch.nn.LayerNorm(head_dim) ) elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']: self.head = torch.nn.Sequential( torch.nn.Linear(self.backbone.config.hidden_size * 2, head_dim, bias=False), torch.nn.LayerNorm(head_dim) ) else: raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}') def get_image_size(self): height = self.image_processor.crop_size["height"] width = self.image_processor.crop_size["width"] return height, width def encode(self, pixel_values): if self.config.hd_booster is None: output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True) features = output.hidden_states[-1] if self.config.drop_cls_token: features = features[:, 1:, :] elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']: n, c, side, _ = pixel_values.shape if self.config.hd_booster == 's2wrapper-adaptive': pixel_values_mask = torch.isinf(pixel_values) # [n, c, side, side] pixel_values = torch.masked_fill(pixel_values, pixel_values_mask, 0.0) pixel_values = pixel_values.reshape(n * 5, c // 5, side, side) output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True) features = output.hidden_states[-1] if self.config.drop_cls_token: features = features[:, 1:, :] _, l, d = features.shape features = features.reshape(n, 5, l, d) features_overall = features[:, 0, :, :] # [n, l, d] features_parts = features[:, 1:, :, :] # [n, 4, l, d] sqrt_l = int(l ** 0.5) assert sqrt_l ** 2 == l, "The token sequence length should be a perfect square." features_parts = features_parts.reshape(n, 4, sqrt_l, sqrt_l, d) # [n, 4, sqrt(l), sqrt(l), d] features_top = torch.concat( [features_parts[:, 0, :, :, :], features_parts[:, 1, :, :, :]], dim=-2) # [n, sqrt(l), sqrt(l)*2, d] features_bottom = torch.concat( [features_parts[:, 2, :, :, :], features_parts[:, 3, :, :, :]], dim=-2) # [n, sqrt(l), sqrt(l)*2, d] features_merge = torch.concat([features_top, features_bottom], dim=-3) # [n, sqrt(l)*2, sqrt(l)*2, d] features_pool = F.interpolate( features_merge.permute(0, 3, 1, 2).to(torch.float32), size=sqrt_l, mode='area' ) # [n, d, sqrt_l, sqrt_l] features_pool = features_pool.flatten(2).permute(0, 2, 1).to(features.dtype) # [n, l, d] if self.config.hd_booster == 's2wrapper-adaptive': features_pool_mask = torch.unsqueeze( torch.unsqueeze(pixel_values_mask[:, -1, -1, -1], dim=-1), dim=-1) # [n, 1, 1] features_pool = torch.masked_fill(features_pool, features_pool_mask, 0.0) features = torch.cat([features_overall, features_pool], dim=-1) # [n, l, 2*d] else: raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}') return features def forward(self, pixel_values) -> Tensor: # [BatchSize, ImageShape] -> [BatchSize, #Token, VocabSize] features = self.encode(pixel_values) logits = self.head(features) tokens = self.tokenize(logits) if self.config.use_indicators: # tokens' shape is [BatchSize, #Token, VocabSize-2], so padding with [BatchSize, #Token, 2], # after which, tokens' shape should become [BatchSize, #Token, VocabSize] batch_size, token_len, _ = tokens.shape padding_tensor = torch.zeros( size=(batch_size, token_len, 2), dtype=tokens.dtype, device=tokens.device, layout=tokens.layout, requires_grad=False ) tokens = torch.cat((tokens, padding_tensor), dim=2) # adding indicator tokens, after which tokens' shape should become [BatchSize, 1+#Token+1, VocabSize] begin_indicator = torch.zeros( size=(batch_size, 1), dtype=torch.long, device=tokens.device, requires_grad=False ) + self.config.vocab_size - 2 begin_indicator_token = F.one_hot( begin_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype) end_indicator = torch.zeros( size=(batch_size, 1), dtype=torch.long, device=tokens.device, requires_grad=False ) + self.config.vocab_size - 1 end_indicator_token = F.one_hot( end_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype) tokens = torch.cat((begin_indicator_token, tokens, end_indicator_token), dim=1) return tokens class SiglipVisualTokenizer(BaseVisualTokenizer): config_class = SiglipVisualTokenizerConfig supports_gradient_checkpointing = True _no_split_modules = ["SiglipVisionTransformer"] _image_processor_class = SiglipImageProcessor _image_processor_kwargs = {} _backbone_class = SiglipVisionModel _backbone_name_or_path = "google/siglip-so400m-patch14-384" def __init__(self, config: SiglipVisualTokenizerConfig = None, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) head_dim = self.config.vocab_size if self.config.use_indicators: head_dim -= 2 # reserved for two image indicator tokens if self.config.hd_booster is None: self.head = torch.nn.Sequential( torch.nn.Linear( self.backbone.config.hidden_size * self.config.hidden_stride * self.config.hidden_stride, head_dim, bias=False ), torch.nn.LayerNorm(head_dim) ) elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']: self.head = torch.nn.Sequential( torch.nn.Linear( self.backbone.config.hidden_size * self.config.hidden_stride * self.config.hidden_stride * 2, head_dim, bias=False ), torch.nn.LayerNorm(head_dim) ) else: raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}') def get_image_size(self): height = self.image_processor.size["height"] width = self.image_processor.size["width"] return height, width def encode(self, pixel_values): if self.config.hd_booster is None: output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True) features = output.hidden_states[-1] if self.config.drop_cls_token: features = features[:, 1:, :] elif self.config.hd_booster in ['s2wrapper', 's2wrapper-adaptive']: n, c, side, _ = pixel_values.shape if self.config.hd_booster == 's2wrapper-adaptive': pixel_values_mask = torch.isinf(pixel_values) # [n, c, side, side] pixel_values = torch.masked_fill(pixel_values, pixel_values_mask, 0.0) pixel_values = pixel_values.reshape(n * 5, c // 5, side, side) output = self.backbone(pixel_values, output_hidden_states=True, return_dict=True) features = output.hidden_states[-1] if self.config.drop_cls_token: features = features[:, 1:, :] _, l, d = features.shape features = features.reshape(n, 5, l, d) features_overall = features[:, 0, :, :] # [n, l, d] features_parts = features[:, 1:, :, :] # [n, 4, l, d] sqrt_l = int(l ** 0.5) assert sqrt_l ** 2 == l, "The token sequence length should be a perfect square." features_parts = features_parts.reshape(n, 4, sqrt_l, sqrt_l, d) # [n, 4, sqrt(l), sqrt(l), d] features_top = torch.concat( [features_parts[:, 0, :, :, :], features_parts[:, 1, :, :, :]], dim=-2) # [n, sqrt(l), sqrt(l)*2, d] features_bottom = torch.concat( [features_parts[:, 2, :, :, :], features_parts[:, 3, :, :, :]], dim=-2) # [n, sqrt(l), sqrt(l)*2, d] features_merge = torch.concat([features_top, features_bottom], dim=-3) # [n, sqrt(l)*2, sqrt(l)*2, d] features_pool = F.interpolate( features_merge.permute(0, 3, 1, 2).to(torch.float32), size=sqrt_l, mode='area' ) # [n, d, sqrt_l, sqrt_l] features_pool = features_pool.flatten(2).permute(0, 2, 1).to(features.dtype) # [n, l, d] if self.config.hd_booster == 's2wrapper-adaptive': features_pool_mask = torch.unsqueeze( torch.unsqueeze(pixel_values_mask[:, -1, -1, -1], dim=-1), dim=-1) # [n, 1, 1] features_pool = torch.masked_fill(features_pool, features_pool_mask, 0.0) features = torch.cat([features_overall, features_pool], dim=-1) # [n, l, 2*d] else: raise ValueError(f'Unsupported hd_booster {self.config.hd_booster}') # merge number of `hidden_stride * hidden_stride` hidden states together to reduce token sequence length # e.g., for hidden_stride=3, this leads to a token length reduction: 729 -> 81 if self.config.hidden_stride > 1: n, l, d = features.shape # this `d` maybe different from the above `d sqrt_l = int(l ** 0.5) assert sqrt_l ** 2 == l, "The token sequence length should be a perfect square." assert l % (self.config.hidden_stride ** 2) == 0, \ "The token sequence length should be divisible by `hidden_stride**2`." features = features.reshape(n, sqrt_l, sqrt_l, d) features = features.reshape(n, sqrt_l // self.config.hidden_stride, self.config.hidden_stride, sqrt_l // self.config.hidden_stride, self.config.hidden_stride, d) features = features.permute(0, 1, 3, 2, 4, 5) # [n, sqrt_l/hs, sqrt_l/hs, hs, hs, d] features = features.flatten(3) # [n, sqrt_l/hs, sqrt_l/hs, hs*hs*d] features = features.reshape(n, l // (self.config.hidden_stride * self.config.hidden_stride), self.config.hidden_stride * self.config.hidden_stride * d) return features def forward(self, pixel_values) -> Tensor: # [BatchSize, ImageShape] -> [BatchSize, #Token, VocabSize] features = self.encode(pixel_values) logits = self.head(features) tokens = self.tokenize(logits) if self.config.use_indicators: # tokens' shape is [BatchSize, #Token, VocabSize-2], so padding with [BatchSize, #Token, 2], after # which, tokens' shape should become [BatchSize, #Token, VocabSize] batch_size, token_len, _ = tokens.shape padding_tensor = torch.zeros( size=(batch_size, token_len, 2), dtype=tokens.dtype, device=tokens.device, layout=tokens.layout, requires_grad=False ) tokens = torch.cat((tokens, padding_tensor), dim=2) # adding indicator tokens, after which tokens' shape should become [BatchSize, 1+#Token+1, VocabSize] begin_indicator = torch.zeros( size=(batch_size, 1), dtype=torch.long, device=tokens.device, requires_grad=False ) + self.config.vocab_size - 2 begin_indicator_token = F.one_hot( begin_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype) end_indicator = torch.zeros( size=(batch_size, 1), dtype=torch.long, device=tokens.device, requires_grad=False ) + self.config.vocab_size - 1 end_indicator_token = F.one_hot( end_indicator, num_classes=self.config.vocab_size).to(dtype=tokens.dtype) tokens = torch.cat((begin_indicator_token, tokens, end_indicator_token), dim=1) return tokens AutoModel.register(ClipVisualTokenizerConfig, ClipVisualTokenizer) AutoModel.register(SiglipVisualTokenizerConfig, SiglipVisualTokenizer) # ---------------------------------------------------------------------- # Ovis # ---------------------------------------------------------------------- class VisualEmbedding(torch.nn.Embedding): def forward(self, input: Tensor) -> Tensor: if any((isinstance(input, LongTensor), isinstance(input, IntTensor))): return super().forward(input) return torch.matmul(input, self.weight) class OvisPreTrainedModel(PreTrainedModel): config_class = OvisConfig base_model_prefix = "ovis" class Ovis(OvisPreTrainedModel): def __init__(self, config: OvisConfig, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) self.llm = AutoModelForCausalLM.from_config(self.config.llm_config, attn_implementation="sdpa") assert self.config.hidden_size == self.llm.config.hidden_size, "hidden size mismatch" self.text_tokenizer = AutoTokenizer.from_pretrained(self.config.name_or_path) self.visual_tokenizer = AutoModel.from_config( self.config.visual_tokenizer_config, image_processor_name_or_path=self.config.name_or_path ) self.vte = VisualEmbedding( self.config.visual_tokenizer_config.vocab_size, self.config.hidden_size, device=self.visual_tokenizer.device, dtype=self.visual_tokenizer.dtype ) def _merge_modules(modules_list: tuple): merged_modules = [] for modules in modules_list: merged_modules.extend(modules if modules else []) return merged_modules self._no_split_modules = _merge_modules( (self.llm._no_split_modules, self.visual_tokenizer._no_split_modules)) self._skip_keys_device_placement = self.llm._skip_keys_device_placement self._keep_in_fp32_modules = _merge_modules( (self.llm._keep_in_fp32_modules, self.visual_tokenizer._keep_in_fp32_modules)) self.is_parallelizable = all((self.llm.is_parallelizable, self.visual_tokenizer.is_parallelizable)) self.supports_gradient_checkpointing = all( (self.llm.supports_gradient_checkpointing, self.visual_tokenizer.supports_gradient_checkpointing)) self._supports_flash_attn_2 = all( (self.llm._supports_flash_attn_2, self.visual_tokenizer._supports_flash_attn_2)) self._supports_sdpa = all((self.llm._supports_sdpa, self.visual_tokenizer._supports_sdpa)) def get_text_tokenizer(self): return self.text_tokenizer def get_visual_tokenizer(self): return self.visual_tokenizer def get_llm(self): return self.llm def get_vte(self): return self.vte def get_wte(self): return self.llm.get_input_embeddings() def get_conversation_formatter(self) -> ConversationFormatter: if getattr(self, 'conversation_formatter', None) is None: self.conversation_formatter = getattr( import_module(".configuration_ovis", __package__), self.config.conversation_formatter_class )(self.text_tokenizer) return self.conversation_formatter def forward( self, input_ids: torch.Tensor, attention_mask: torch.Tensor, labels: Optional[torch.Tensor], pixel_values: List[Optional[torch.Tensor]], **kwargs ): assert self.training, "`forward` can only be used in training. For inference, use `generate`." _, inputs_embeds, labels, attention_mask = self.merge_multimodal( text_input_ids=input_ids, text_attention_masks=attention_mask, text_labels=labels, pixel_values=pixel_values ) return self.llm(inputs_embeds=inputs_embeds, labels=labels, attention_mask=attention_mask, **kwargs) def merge_multimodal( self, text_input_ids: torch.Tensor, text_attention_masks: torch.Tensor, text_labels: Optional[torch.Tensor], pixel_values: List[Optional[torch.Tensor]] ): input_device = text_input_ids.device if self.training: # When training, to be compatible with deepspeed zero, each sample has to include pixel_value tensor. # For text-only sample, one can simply use a full zero tensor as pixel_value, which will be ignored # (see below in this function); so, the gradient will not be affected. num_images = [x.shape[0] for x in pixel_values] visual_tokens = self.visual_tokenizer(torch.cat([x for x in pixel_values], dim=0)) visual_embeds = torch.split( self.get_vte()(visual_tokens).to(dtype=self.dtype, device=input_device), split_size_or_sections=num_images, dim=0 ) visual_input_ids = torch.split( torch.argmax(visual_tokens, dim=-1).to(device=input_device), split_size_or_sections=num_images, dim=0 ) visual_labels = [ torch.full( x.shape, IGNORE_INDEX, dtype=torch.long, device=input_device ) for x in visual_input_ids ] else: # When inference, sample can include only text with `None` pixel_value num_images = [x.shape[0] if x is not None else 0 for x in pixel_values] if sum(num_images) > 0: visual_tokens = self.visual_tokenizer(torch.cat([x for x in pixel_values if x is not None], dim=0)) visual_embeds = torch.split( self.get_vte()(visual_tokens).to(dtype=self.dtype, device=input_device), split_size_or_sections=num_images, dim=0 ) visual_input_ids = torch.split( torch.argmax(visual_tokens, dim=-1).to(device=input_device), split_size_or_sections=num_images, dim=0 ) visual_labels = [ torch.full( x.shape, IGNORE_INDEX, dtype=torch.long, device=input_device ) for x in visual_input_ids ] else: # just placeholders visual_embeds = [None] * len(num_images) visual_input_ids = [None] * len(num_images) visual_labels = [None] * len(num_images) # just placeholders text_labels = torch.full(text_input_ids.shape, IGNORE_INDEX, dtype=torch.long, device=input_device) input_embeds = [] attention_masks = [] labels = [] for text_input_id, text_label, text_attention_mask, visual_embed, visual_input_id, visual_label in zip( text_input_ids, text_labels, text_attention_masks, visual_embeds, visual_input_ids, visual_labels ): image_token_mask = torch.eq(text_input_id, IMAGE_TOKEN_INDEX) text_embed = self.get_wte()(torch.masked_fill(text_input_id, image_token_mask, 0)) image_token_positions = torch.where(image_token_mask)[0].tolist() if len(image_token_positions) > 0: input_embed_parts = [] attention_mask_parts = [] label_parts = [] prev_image_token_position = -1 for index, image_token_position in enumerate(image_token_positions): input_embed_parts.append( text_embed[prev_image_token_position + 1:image_token_position, :]) label_parts.append( text_label[prev_image_token_position + 1:image_token_position]) attention_mask_parts.append( text_attention_mask[prev_image_token_position + 1:image_token_position]) input_embed_parts.append(visual_embed[index]) attention_mask_parts.append( torch.ones_like(visual_label[index], dtype=torch.bool)) label_parts.append(visual_label[index]) prev_image_token_position = image_token_position if prev_image_token_position + 1 < text_input_id.shape[0]: input_embed_parts.append( text_embed[prev_image_token_position + 1:, :]) attention_mask_parts.append( text_attention_mask[prev_image_token_position + 1:]) label_parts.append( text_label[prev_image_token_position + 1:]) input_embed = torch.cat(input_embed_parts, dim=0) attention_mask = torch.cat(attention_mask_parts, dim=0) label = torch.cat(label_parts, dim=0) else: input_embed = text_embed attention_mask = text_attention_mask label = text_label if self.training: # Make visual_embed involved in the backward graph, # to be compatible with deepspeed zero and ddp. input_embed += torch.sum(visual_embed * 0.0) input_embeds.append(input_embed) attention_masks.append(attention_mask) labels.append(label) batch_input_embeds = torch.nn.utils.rnn.pad_sequence( input_embeds, batch_first=True, padding_value=0.0)[:, :self.config.multimodal_max_length, :] batch_attention_mask = torch.nn.utils.rnn.pad_sequence( attention_masks, batch_first=True, padding_value=False)[:, :self.config.multimodal_max_length] batch_labels = torch.nn.utils.rnn.pad_sequence( labels, batch_first=True, padding_value=IGNORE_INDEX)[:, :self.config.multimodal_max_length] return visual_input_ids, batch_input_embeds, batch_labels, batch_attention_mask def save_pretrained( self, save_directory: Union[str, os.PathLike], is_main_process: bool = True, state_dict: Optional[dict] = None, save_function: Callable = torch.save, push_to_hub: bool = False, max_shard_size: Union[int, str] = "5GB", safe_serialization: bool = True, variant: Optional[str] = None, token: Optional[Union[str, bool]] = None, save_peft_format: bool = True, **kwargs ): super().save_pretrained(save_directory, is_main_process=is_main_process, state_dict=state_dict, save_function=save_function, safe_serialization=safe_serialization) self.get_text_tokenizer().save_pretrained(save_directory) self.get_visual_tokenizer().get_image_processor().save_pretrained(save_directory) # uncomment the following will additionally save a separate visual tokenizer # visual_tokenizer_directory = os.path.join(save_directory, 'visual_tokenizer') # self.get_visual_tokenizer().save_pretrained(visual_tokenizer_directory, # is_main_process=is_main_process, # state_dict=None, # save_function=save_function, # safe_serialization=safe_serialization) # self.get_visual_tokenizer().get_image_processor().save_pretrained(visual_tokenizer_directory) def _get_hybrid_cache_for_llm(self, max_batch_size: int, max_cache_len: int): cache_cls = HybridCache llm = self.get_llm() need_new_cache = ( not hasattr(llm, "_cache") or (not isinstance(llm._cache, cache_cls)) or llm._cache.max_batch_size != max_batch_size or llm._cache.max_cache_len < max_cache_len ) if need_new_cache: if hasattr(llm.config, "_pre_quantization_dtype"): cache_dtype = llm.config._pre_quantization_dtype else: cache_dtype = llm.dtype llm._cache = cache_cls( config=llm.config, max_batch_size=max_batch_size, max_cache_len=max_cache_len, device=llm.device, dtype=cache_dtype, ) else: llm._cache.reset() return llm._cache # TODO: support batch generation def generate( self, inputs: Optional[torch.Tensor] = None, **kwargs ) -> Union[GenerateOutput, torch.LongTensor]: assert inputs.shape[0] == 1, 'Currently, only support `batch_size=1`' _, inputs_embeds, labels, attention_mask = self.merge_multimodal( text_input_ids=inputs, text_attention_masks=kwargs.pop('attention_mask'), text_labels=None, pixel_values=kwargs.pop('pixel_values') ) if getattr(self.generation_config, 'cache_implementation') == 'hybrid': # mainly for Gemma2 kwargs['past_key_values'] = self._get_hybrid_cache_for_llm( getattr(kwargs, "num_beams", 1), kwargs['max_new_tokens'] + inputs_embeds.shape[-2]) self.get_llm()._supports_cache_class = True kwargs['cache_implementation'] = None return self.llm.generate(inputs=None, inputs_embeds=inputs_embeds, attention_mask=attention_mask, **kwargs)