# coding=utf-8 # Copyright 2024 RhapsodyAI and ModelBest Inc. and Microsoft and the HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re from typing import List, Optional, Union, Dict import math import torch from torchvision import transforms from PIL import Image import transformers from transformers.feature_extraction_utils import BatchFeature from transformers.image_utils import ImageInput from transformers.processing_utils import ProcessorMixin from transformers.tokenization_utils_base import PaddingStrategy, TextInput, TruncationStrategy, PreTokenizedInput from transformers.utils import TensorType from transformers.image_processing_utils import BaseImageProcessor, BatchFeature # from transformers.image_transforms import ( # convert_to_rgb, # ) from transformers import LlamaTokenizer # for text processing from transformers.utils import logging logger = logging.get_logger(__name__) # image tokenizer def ensure_divide(length, patch_size): return max(round(length / patch_size) * patch_size, patch_size) def find_best_resize(original_size, scale_resolution, patch_size, allow_upscale=False): width, height = original_size if (width * height > scale_resolution * scale_resolution) or allow_upscale: r = width / height height = int(scale_resolution / math.sqrt(r)) width = int(height * r) best_width = ensure_divide(width, patch_size) best_height = ensure_divide(height, patch_size) return (best_width, best_height) def get_refine_size( original_size, grid, scale_resolution, patch_size, allow_upscale=False ): width, height = original_size grid_x, grid_y = grid refine_width = ensure_divide(width, grid_x) refine_height = ensure_divide(height, grid_y) grid_width = refine_width / grid_x grid_height = refine_height / grid_y best_grid_size = find_best_resize( (grid_width, grid_height), scale_resolution, patch_size, allow_upscale=allow_upscale, ) refine_size = (best_grid_size[0] * grid_x, best_grid_size[1] * grid_y) return refine_size def split_to_patches(image, grid): patches = [] width, height = image.size grid_x = int(width / grid[0]) grid_y = int(height / grid[1]) for i in range(0, height, grid_y): images = [] for j in range(0, width, grid_x): box = (j, i, j + grid_x, i + grid_y) patch = image.crop(box) logger.info(f"I don't think it is so called `patch`. split_to_patches: patch size = {box}") images.append(patch) patches.append(images) return patches def slice_image( image, max_slice_nums=9, scale_resolution=448, patch_size=14, never_split=False ): original_size = image.size original_width, original_height = original_size log_ratio = math.log(original_width / original_height) ratio = original_width * original_height / (scale_resolution * scale_resolution) multiple = min(math.ceil(ratio), max_slice_nums) source_image = None best_grid = None patches = [] if multiple <= 1 or never_split: # don't need to slice, upsample best_size = find_best_resize( original_size, scale_resolution, patch_size, allow_upscale=True ) source_image = image.resize(best_size, Image.Resampling.BICUBIC) else: candidate_split_grids_nums = [] for i in [multiple - 1, multiple, multiple + 1]: if i == 1 or i > max_slice_nums: continue candidate_split_grids_nums.append(i) # source image, down-sampling and ensure divided by patch_size best_resize = find_best_resize(original_size, scale_resolution, patch_size) source_image = image.copy().resize(best_resize, Image.Resampling.BICUBIC) candidate_grids = [] # find best grid for split_grids_nums in candidate_split_grids_nums: m = 1 while m <= split_grids_nums: if split_grids_nums % m == 0: candidate_grids.append([m, split_grids_nums // m]) m += 1 best_grid = [1, 1] min_error = float("inf") for grid in candidate_grids: error = abs(log_ratio - math.log(grid[0] / grid[1])) if error < min_error: best_grid = grid min_error = error refine_size = get_refine_size( original_size, best_grid, scale_resolution, patch_size, allow_upscale=True ) refine_image = image.resize(refine_size, Image.Resampling.BICUBIC) patches = split_to_patches(refine_image, best_grid) return source_image, patches, best_grid def reshape_by_patch(image_tensor, patch_size=14): """ :param image_tensor: shape [3, H, W] :param patch_size: :return: [3, patch_size, HW/patch_size] """ patches = torch.nn.functional.unfold( image_tensor, (patch_size, patch_size), stride=(patch_size, patch_size) ) patches = patches.reshape(image_tensor.size(0), patch_size, patch_size, -1) patches = patches.permute(0, 1, 3, 2).reshape(image_tensor.size(0), patch_size, -1) return patches class MiniCPMVImageProcessor(BaseImageProcessor): r""" MiniCPMV image processor. -> Based on Phi3 image processor -> Used LlaVa-UHD. dynamic slicing one image image. Args: image_mean (`float` or `List[float]`, *optional*, defaults to `[0.48145466, 0.4578275, 0.40821073]`): Mean to use if normalizing the image. This is a float or list of floats the length of the number of channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method. image_std (`float` or `List[float]`, *optional*, defaults to `[0.26862954, 0.26130258, 0.27577711]`): Standard deviation to use if normalizing the image. This is a float or list of floats the length of the number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess` method. Can be overridden by the `image_std` parameter in the `preprocess` method. do_convert_rgb (`bool`, *optional*, defaults to `True`): Whether to convert the image to RGB. """ def __init__( self, query_num: int = 64, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, max_slice_nums: int = 9, scale_resolution: int = 448, patch_size: int = 14, **kwargs, ) -> None: super().__init__(**kwargs) self.query_num = query_num self.image_mean = image_mean self.image_std = image_std self.max_slice_nums = max_slice_nums self.scale_resolution = scale_resolution self.patch_size = patch_size def preprocess( self, image, slice_mode: bool = True, return_tensors: Optional[Union[str, TensorType]] = None, ): """ Args: images (`ImageInput`): Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If passing in images with pixel values between 0 and 1, set `do_rescale=False`. # modified: one image per invoke. return_tensors (`str` or `TensorType`, *optional*): The type of tensors to return. Can be one of: - Unset: Return a list of `np.ndarray`. - `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`. - `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`. - `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`. - `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`. """ transform = transforms.Compose( [ transforms.ToTensor(), transforms.Normalize(mean=self.image_mean, std=self.image_std) ] ) images_ = [] tgt_sizes = [] if slice_mode: slice_images = [] source_image, patches, best_grid = slice_image( # 耗时 image, self.max_slice_nums, self.scale_resolution, self.patch_size, ) slice_images.append(source_image) if len(patches) > 0: for i in range(len(patches)): for j in range(len(patches[0])): slice_images.append(patches[i][j]) for image_ in slice_images: slice_image_ = transform(image_) # 耗时 H, W = slice_image_.shape[1:] slice_image_patchified_ = reshape_by_patch(slice_image_) images_.append(slice_image_patchified_) tgt_sizes.append(torch.Tensor([H // self.patch_size, W // self.patch_size]).type(torch.int32)) else: best_grid = None image_ = transform(image) H, W = image_.shape[1:] image_patchified_ = reshape_by_patch(image_) images_.append(image_patchified_) # 耗时 tgt_sizes.append(torch.Tensor([H // self.patch_size, W // self.patch_size]).type(torch.int32)) return images_, tgt_sizes, best_grid # text tokenizer class MiniCPMVTextTokenizer(LlamaTokenizer): def __init__(self, **kwargs): super().__init__(**kwargs) self.im_start = "" self.im_end = "" self.ref_start = "" self.ref_end = "" self.box_start = "" self.box_end = "" self.quad_start = "" self.quad_end = "" self.point_start = "" self.point_end = "" self.slice_start = "" self.slice_end = "" @property def eos_id(self): return self.sp_model.eos_id() @property def bos_id(self): return self.sp_model.bos_id() @property def unk_id(self): return self.sp_model.unk_id() @property def im_start_id(self): return self._convert_token_to_id(self.im_start) @property def im_end_id(self): return self._convert_token_to_id(self.im_end) def get_grid_placeholder(tokenizer, grid, query_num): image_placeholder = ( tokenizer.im_start + tokenizer.unk_token * query_num + tokenizer.im_end ) cols = grid[0] rows = grid[1] slices = [] for i in range(rows): lines = [] for j in range(cols): lines.append(image_placeholder) slices.append("".join(lines)) slice_placeholder = tokenizer.slice_start + "\n".join(slices) + tokenizer.slice_end return slice_placeholder def pad(orig_items, max_length=None, padding_value=0, padding_side="left"): """ Args: orig_items: a list of input_ids, each input_ids should be [1, length_i] """ assert isinstance(orig_items, list) assert isinstance(orig_items[0], torch.Tensor) padding_value = 2 items = [t.squeeze() for t in orig_items] batch_size = len(items) shape = items[0].shape dim = len(shape) assert dim == 1, "This pad function only expect B * Tensor([seq_len]) input." # Assuming 1D tensors for simplicity if max_length is None: max_length = max(item.shape[0] for item in items) tensor = torch.full((batch_size, max_length), padding_value, dtype=items[0].dtype) attention_mask = torch.zeros((batch_size, max_length), dtype=torch.int8) for i, item in enumerate(items): length = item.shape[0] if padding_side == "left": raise Exception("Please use right padding") tensor[i, -length:] = item.clone() attention_mask[i, -length:] = 1 else: tensor[i, 0:length] = item.clone() attention_mask[i, 0:length] = 1 return_dict = { "input_ids": tensor, "attention_mask": attention_mask, } return return_dict def convert_to_tokens(input_str, tokenizer, max_inp_length): if tokenizer.add_bos_token: input_ids = tokenizer.encode(input_str) else: input_ids = [tokenizer.bos_id] + tokenizer.encode(input_str) input_ids = input_ids[:max_inp_length] input_ids = torch.tensor(input_ids, dtype=torch.int32) image_start_tokens = torch.where(input_ids == tokenizer.im_start_id)[0] # 跳过 im_start image_start_tokens += 1 image_end_tokens = torch.where(input_ids == tokenizer.im_end_id)[0] valid_image_nums = max(len(image_start_tokens), len(image_end_tokens)) image_bound = torch.hstack( [ image_start_tokens[:valid_image_nums].unsqueeze(-1), image_end_tokens[:valid_image_nums].unsqueeze(-1), ] ) model_input = {} model_input["input_ids"] = input_ids.unsqueeze(0) model_input["image_bound"] = image_bound return model_input class MiniCPMVProcessor(ProcessorMixin): r""" Based on Siglip. Constructs a Siglip processor which wraps a Siglip image processor and a Siglip tokenizer into a single processor. [`SiglipProcessor`] offers all the functionalities of [`SiglipImageProcessor`] and [`SiglipTokenizer`]. See the [`~SiglipProcessor.__call__`] and [`~SiglipProcessor.decode`] for more information. Args: image_processor ([`SiglipImageProcessor`]): The image processor is a required input. tokenizer ([`SiglipTokenizer`]): The tokenizer is a required input. """ attributes = ["image_processor", "tokenizer"] image_processor_class = "AutoImageProcessor" # sorry, we can't find a way to make `image_processor_class` equal to `MiniCPMVImageProcessor` tokenizer_class = "AutoTokenizer" def __init__(self, image_processor, tokenizer, query_num=64, slice_mode=True, max_inp_length=2048): super().__init__(image_processor, tokenizer) self.query_num = query_num self.slice_mode = slice_mode self.max_inp_length = max_inp_length def __call__( self, messages: Dict[str, Union[str, Image.Image]] = None, # ChatML format slice_mode: bool = None, max_inp_length: int = None, padding: Union[bool, str, PaddingStrategy] = False, padding_side: str = "left", truncation: Union[bool, str, TruncationStrategy] = None, return_tensors: Optional[Union[str, TensorType]] = TensorType.PYTORCH, ) -> BatchFeature: """ Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` and `kwargs` arguments to SiglipTokenizer's [`~SiglipTokenizer.__call__`] if `text` is not `None` to encode the text. To prepare the image(s), this method forwards the `images` argument to SiglipImageProcessor's [`~SiglipImageProcessor.__call__`] if `images` is not `None`. Please refer to the doctsring of the above two methods for more information. Args: text (`str`, `List[str]`, `List[List[str]]`): The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`): The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch tensor. In case of a NumPy array/PyTorch tensor, each image should be of shape (C, H, W), where C is a number of channels, H and W are image height and width. padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `False`): Select a strategy to pad the returned sequences (according to the model's padding side and padding index) among: - `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single sequence if provided). - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum acceptable input length for the model if that argument is not provided. - `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different lengths). max_input_length (`int`, *optional*): Maximum length of the returned list and optionally padding length (see above). truncation (`bool`, *optional*): Activates truncation to cut input sequences longer than `max_length` to `max_length`. Returns: [`BatchFeature`]: A [`BatchFeature`] with the following fields: - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not `None`). - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. """ # assert len(messages) == 1, 'Do not support batch > 1' if slice_mode is None: if self.slice_mode is None: raise ValueError("`slice_mode` is not specified by config or usage") else: slice_mode = self.slice_mode if max_inp_length is None: if self.max_inp_length is None: raise ValueError("`max_inp_length` is not specified by config or usage") else: max_inp_length = self.max_inp_length processed_subimages_all_data = [] processed_text_all_data = [] tgt_sizes_all_data = [] for msgs in messages: assert len(msgs) > 0, 'msgs is empty' processed_text_all_msgs = [] processed_subimages_all_msgs = [] tgt_sizes_all_msgs = [] # process each message, each message is look like [text/image, ...] for i, msg in enumerate(msgs): role = msg["role"] c = msg["content"] assert role in ["user", "assistant"] if i == 0: assert role == "user", "The role of first msg should be user" if isinstance(c, Image.Image): processed_subimages, tgt_sizes, best_grid = self.image_processor.preprocess(image=c, slice_mode=slice_mode) # make image placeholders if slice_mode: cur_msg = ( self.tokenizer.im_start + self.tokenizer.unk_token * self.query_num + self.tokenizer.im_end ) if len(processed_subimages) > 1: cur_msg += get_grid_placeholder( self.tokenizer, best_grid, self.query_num ) else: cur_msg = ( self.tokenizer.im_start + self.tokenizer.unk_token * self.query_num + self.tokenizer.im_end ) tgt_sizes_all_msgs.extend(tgt_sizes) processed_subimages_all_msgs.extend(processed_subimages) elif isinstance(c, str): cur_msg = c else: raise NotImplementedError(f"message {type(c)}: {c} can't be handled") role_title = "<用户>" if role == "user" else "" processed_text_all_msgs.append(role_title + cur_msg) processed_text_all_msgs_concat = "".join(processed_text_all_msgs) processed_text_all_msgs_concat += "" processed_text_all_data.append(processed_text_all_msgs_concat) processed_subimages_all_data.append(processed_subimages_all_msgs) tgt_sizes_all_msgs = torch.vstack(tgt_sizes_all_msgs) tgt_sizes_all_data.append(tgt_sizes_all_msgs) # convert text string to tokens, at this step, `input_ids` and `image_bound` is added model_inputs_uncollated = [] for text in processed_text_all_data: model_inputs_ = convert_to_tokens( text, max_inp_length=max_inp_length, tokenizer=self.tokenizer ) model_inputs_uncollated.append(model_inputs_) # pad: in this step, attention mask is added model_inputs_final = pad([i["input_ids"] for i in model_inputs_uncollated], padding_side=padding_side) # add image bound back model_inputs_final["image_bound"] = [i["image_bound"] for i in model_inputs_uncollated] # add pixels values model_inputs_final["pixel_values"] = processed_subimages_all_data # add target sizes model_inputs_final["tgt_sizes"] = tgt_sizes_all_data return BatchFeature(data=model_inputs_final, tensor_type=None)