|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re |
|
from typing import List, Optional, Union, Dict |
|
|
|
import math |
|
import torch |
|
from torchvision import transforms |
|
from PIL import Image |
|
|
|
import transformers |
|
from transformers.feature_extraction_utils import BatchFeature |
|
from transformers.image_utils import ImageInput |
|
from transformers.processing_utils import ProcessorMixin |
|
from transformers.tokenization_utils_base import PaddingStrategy, TextInput, TruncationStrategy, PreTokenizedInput |
|
from transformers.utils import TensorType |
|
from transformers.image_processing_utils import BaseImageProcessor, BatchFeature |
|
|
|
|
|
|
|
|
|
from transformers import LlamaTokenizer |
|
|
|
from transformers.utils import logging |
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
|
|
def ensure_divide(length, patch_size): |
|
return max(round(length / patch_size) * patch_size, patch_size) |
|
|
|
def find_best_resize(original_size, scale_resolution, patch_size, allow_upscale=False): |
|
width, height = original_size |
|
if (width * height > scale_resolution * scale_resolution) or allow_upscale: |
|
r = width / height |
|
height = int(scale_resolution / math.sqrt(r)) |
|
width = int(height * r) |
|
best_width = ensure_divide(width, patch_size) |
|
best_height = ensure_divide(height, patch_size) |
|
return (best_width, best_height) |
|
|
|
def get_refine_size( |
|
original_size, grid, scale_resolution, patch_size, allow_upscale=False |
|
): |
|
width, height = original_size |
|
grid_x, grid_y = grid |
|
|
|
refine_width = ensure_divide(width, grid_x) |
|
refine_height = ensure_divide(height, grid_y) |
|
|
|
grid_width = refine_width / grid_x |
|
grid_height = refine_height / grid_y |
|
|
|
best_grid_size = find_best_resize( |
|
(grid_width, grid_height), |
|
scale_resolution, |
|
patch_size, |
|
allow_upscale=allow_upscale, |
|
) |
|
|
|
refine_size = (best_grid_size[0] * grid_x, best_grid_size[1] * grid_y) |
|
|
|
return refine_size |
|
|
|
def split_to_patches(image, grid): |
|
patches = [] |
|
width, height = image.size |
|
grid_x = int(width / grid[0]) |
|
grid_y = int(height / grid[1]) |
|
|
|
for i in range(0, height, grid_y): |
|
images = [] |
|
for j in range(0, width, grid_x): |
|
box = (j, i, j + grid_x, i + grid_y) |
|
patch = image.crop(box) |
|
logger.info(f"I don't think it is so called `patch`. split_to_patches: patch size = {box}") |
|
images.append(patch) |
|
patches.append(images) |
|
|
|
return patches |
|
|
|
def slice_image( |
|
image, |
|
max_slice_nums=9, |
|
scale_resolution=448, |
|
patch_size=14, |
|
never_split=False |
|
): |
|
original_size = image.size |
|
original_width, original_height = original_size |
|
log_ratio = math.log(original_width / original_height) |
|
ratio = original_width * original_height / (scale_resolution * scale_resolution) |
|
multiple = min(math.ceil(ratio), max_slice_nums) |
|
|
|
source_image = None |
|
best_grid = None |
|
patches = [] |
|
|
|
if multiple <= 1 or never_split: |
|
|
|
best_size = find_best_resize( |
|
original_size, scale_resolution, patch_size, allow_upscale=True |
|
) |
|
source_image = image.resize(best_size, Image.Resampling.BICUBIC) |
|
else: |
|
candidate_split_grids_nums = [] |
|
for i in [multiple - 1, multiple, multiple + 1]: |
|
if i == 1 or i > max_slice_nums: |
|
continue |
|
candidate_split_grids_nums.append(i) |
|
|
|
|
|
best_resize = find_best_resize(original_size, scale_resolution, patch_size) |
|
source_image = image.copy().resize(best_resize, Image.Resampling.BICUBIC) |
|
candidate_grids = [] |
|
|
|
|
|
for split_grids_nums in candidate_split_grids_nums: |
|
m = 1 |
|
while m <= split_grids_nums: |
|
if split_grids_nums % m == 0: |
|
candidate_grids.append([m, split_grids_nums // m]) |
|
m += 1 |
|
|
|
best_grid = [1, 1] |
|
min_error = float("inf") |
|
for grid in candidate_grids: |
|
error = abs(log_ratio - math.log(grid[0] / grid[1])) |
|
if error < min_error: |
|
best_grid = grid |
|
min_error = error |
|
|
|
refine_size = get_refine_size( |
|
original_size, best_grid, scale_resolution, patch_size, allow_upscale=True |
|
) |
|
|
|
refine_image = image.resize(refine_size, Image.Resampling.BICUBIC) |
|
patches = split_to_patches(refine_image, best_grid) |
|
|
|
return source_image, patches, best_grid |
|
|
|
def reshape_by_patch(image_tensor, patch_size=14): |
|
""" |
|
:param image_tensor: shape [3, H, W] |
|
:param patch_size: |
|
:return: [3, patch_size, HW/patch_size] |
|
""" |
|
patches = torch.nn.functional.unfold( |
|
image_tensor, |
|
(patch_size, patch_size), |
|
stride=(patch_size, patch_size) |
|
) |
|
|
|
patches = patches.reshape(image_tensor.size(0), patch_size, patch_size, -1) |
|
patches = patches.permute(0, 1, 3, 2).reshape(image_tensor.size(0), patch_size, -1) |
|
return patches |
|
|
|
class MiniCPMVImageProcessor(BaseImageProcessor): |
|
r""" |
|
MiniCPMV image processor. -> Based on Phi3 image processor -> Used LlaVa-UHD. dynamic slicing one image image. |
|
|
|
Args: |
|
image_mean (`float` or `List[float]`, *optional*, defaults to `[0.48145466, 0.4578275, 0.40821073]`): |
|
Mean to use if normalizing the image. This is a float or list of floats the length of the number of |
|
channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method. |
|
image_std (`float` or `List[float]`, *optional*, defaults to `[0.26862954, 0.26130258, 0.27577711]`): |
|
Standard deviation to use if normalizing the image. This is a float or list of floats the length of the |
|
number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess` method. |
|
Can be overridden by the `image_std` parameter in the `preprocess` method. |
|
do_convert_rgb (`bool`, *optional*, defaults to `True`): |
|
Whether to convert the image to RGB. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
query_num: int = 64, |
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
image_std: Optional[Union[float, List[float]]] = None, |
|
max_slice_nums: int = 9, |
|
scale_resolution: int = 448, |
|
patch_size: int = 14, |
|
**kwargs, |
|
) -> None: |
|
super().__init__(**kwargs) |
|
|
|
self.query_num = query_num |
|
self.image_mean = image_mean |
|
self.image_std = image_std |
|
self.max_slice_nums = max_slice_nums |
|
self.scale_resolution = scale_resolution |
|
self.patch_size = patch_size |
|
|
|
def preprocess( |
|
self, |
|
image, |
|
slice_mode: bool = True, |
|
return_tensors: Optional[Union[str, TensorType]] = None, |
|
): |
|
""" |
|
Args: |
|
images (`ImageInput`): |
|
Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If |
|
passing in images with pixel values between 0 and 1, set `do_rescale=False`. # modified: one image per invoke. |
|
return_tensors (`str` or `TensorType`, *optional*): |
|
The type of tensors to return. Can be one of: |
|
- Unset: Return a list of `np.ndarray`. |
|
- `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`. |
|
- `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`. |
|
- `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`. |
|
- `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`. |
|
""" |
|
|
|
transform = transforms.Compose( |
|
[ |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=self.image_mean, std=self.image_std) |
|
] |
|
) |
|
|
|
images_ = [] |
|
tgt_sizes = [] |
|
|
|
if slice_mode: |
|
slice_images = [] |
|
source_image, patches, best_grid = slice_image( |
|
image, |
|
self.max_slice_nums, |
|
self.scale_resolution, |
|
self.patch_size, |
|
) |
|
|
|
slice_images.append(source_image) |
|
if len(patches) > 0: |
|
for i in range(len(patches)): |
|
for j in range(len(patches[0])): |
|
slice_images.append(patches[i][j]) |
|
|
|
for image_ in slice_images: |
|
slice_image_ = transform(image_) |
|
H, W = slice_image_.shape[1:] |
|
slice_image_patchified_ = reshape_by_patch(slice_image_) |
|
images_.append(slice_image_patchified_) |
|
tgt_sizes.append(torch.Tensor([H // self.patch_size, W // self.patch_size]).type(torch.int32)) |
|
|
|
else: |
|
best_grid = None |
|
image_ = transform(image) |
|
H, W = image_.shape[1:] |
|
image_patchified_ = reshape_by_patch(image_) |
|
images_.append(image_patchified_) |
|
tgt_sizes.append(torch.Tensor([H // self.patch_size, W // self.patch_size]).type(torch.int32)) |
|
|
|
return images_, tgt_sizes, best_grid |
|
|
|
|
|
|
|
class MiniCPMVTextTokenizer(LlamaTokenizer): |
|
def __init__(self, **kwargs): |
|
super().__init__(**kwargs) |
|
self.im_start = "<image>" |
|
self.im_end = "</image>" |
|
self.ref_start = "<ref>" |
|
self.ref_end = "</ref>" |
|
self.box_start = "<box>" |
|
self.box_end = "</box>" |
|
self.quad_start = "<quad>" |
|
self.quad_end = "</quad>" |
|
self.point_start = "<point>" |
|
self.point_end = "</point>" |
|
self.slice_start = "<slice>" |
|
self.slice_end = "</slice>" |
|
|
|
@property |
|
def eos_id(self): |
|
return self.sp_model.eos_id() |
|
|
|
@property |
|
def bos_id(self): |
|
return self.sp_model.bos_id() |
|
|
|
@property |
|
def unk_id(self): |
|
return self.sp_model.unk_id() |
|
|
|
@property |
|
def im_start_id(self): |
|
return self._convert_token_to_id(self.im_start) |
|
|
|
@property |
|
def im_end_id(self): |
|
return self._convert_token_to_id(self.im_end) |
|
|
|
def get_grid_placeholder(tokenizer, grid, query_num): |
|
image_placeholder = ( |
|
tokenizer.im_start + tokenizer.unk_token * query_num + tokenizer.im_end |
|
) |
|
|
|
cols = grid[0] |
|
rows = grid[1] |
|
slices = [] |
|
for i in range(rows): |
|
lines = [] |
|
for j in range(cols): |
|
lines.append(image_placeholder) |
|
slices.append("".join(lines)) |
|
slice_placeholder = tokenizer.slice_start + "\n".join(slices) + tokenizer.slice_end |
|
return slice_placeholder |
|
|
|
def pad(orig_items, max_length=None, padding_value=0, padding_side="left"): |
|
""" |
|
Args: |
|
orig_items: a list of input_ids, each input_ids should be [1, length_i] |
|
""" |
|
assert isinstance(orig_items, list) |
|
assert isinstance(orig_items[0], torch.Tensor) |
|
padding_value = 2 |
|
items = [t.squeeze() for t in orig_items] |
|
|
|
batch_size = len(items) |
|
shape = items[0].shape |
|
|
|
dim = len(shape) |
|
assert dim == 1, "This pad function only expect B * Tensor([seq_len]) input." |
|
|
|
if max_length is None: |
|
max_length = max(item.shape[0] for item in items) |
|
|
|
tensor = torch.full((batch_size, max_length), padding_value, dtype=items[0].dtype) |
|
attention_mask = torch.zeros((batch_size, max_length), dtype=torch.int8) |
|
|
|
for i, item in enumerate(items): |
|
length = item.shape[0] |
|
if padding_side == "left": |
|
raise Exception("Please use right padding") |
|
tensor[i, -length:] = item.clone() |
|
attention_mask[i, -length:] = 1 |
|
else: |
|
tensor[i, 0:length] = item.clone() |
|
attention_mask[i, 0:length] = 1 |
|
|
|
return_dict = { |
|
"input_ids": tensor, |
|
"attention_mask": attention_mask, |
|
} |
|
|
|
return return_dict |
|
|
|
def convert_to_tokens(input_str, tokenizer, max_inp_length): |
|
if tokenizer.add_bos_token: |
|
input_ids = tokenizer.encode(input_str) |
|
else: |
|
input_ids = [tokenizer.bos_id] + tokenizer.encode(input_str) |
|
|
|
input_ids = input_ids[:max_inp_length] |
|
|
|
input_ids = torch.tensor(input_ids, dtype=torch.int32) |
|
|
|
image_start_tokens = torch.where(input_ids == tokenizer.im_start_id)[0] |
|
|
|
|
|
image_start_tokens += 1 |
|
image_end_tokens = torch.where(input_ids == tokenizer.im_end_id)[0] |
|
valid_image_nums = max(len(image_start_tokens), len(image_end_tokens)) |
|
|
|
image_bound = torch.hstack( |
|
[ |
|
image_start_tokens[:valid_image_nums].unsqueeze(-1), |
|
image_end_tokens[:valid_image_nums].unsqueeze(-1), |
|
] |
|
) |
|
|
|
model_input = {} |
|
model_input["input_ids"] = input_ids.unsqueeze(0) |
|
model_input["image_bound"] = image_bound |
|
|
|
return model_input |
|
|
|
class MiniCPMVProcessor(ProcessorMixin): |
|
r""" |
|
Based on Siglip. Constructs a Siglip processor which wraps a Siglip image processor and a Siglip tokenizer into a single processor. |
|
|
|
[`SiglipProcessor`] offers all the functionalities of [`SiglipImageProcessor`] and [`SiglipTokenizer`]. See the |
|
[`~SiglipProcessor.__call__`] and [`~SiglipProcessor.decode`] for more information. |
|
|
|
Args: |
|
image_processor ([`SiglipImageProcessor`]): |
|
The image processor is a required input. |
|
tokenizer ([`SiglipTokenizer`]): |
|
The tokenizer is a required input. |
|
""" |
|
|
|
attributes = ["image_processor", "tokenizer"] |
|
image_processor_class = "AutoImageProcessor" |
|
tokenizer_class = "AutoTokenizer" |
|
|
|
def __init__(self, image_processor, tokenizer, query_num=64, slice_mode=True, max_inp_length=2048): |
|
super().__init__(image_processor, tokenizer) |
|
self.query_num = query_num |
|
self.slice_mode = slice_mode |
|
self.max_inp_length = max_inp_length |
|
|
|
def __call__( |
|
self, |
|
messages: Dict[str, Union[str, Image.Image]] = None, |
|
slice_mode: bool = None, |
|
max_inp_length: int = None, |
|
padding: Union[bool, str, PaddingStrategy] = False, |
|
padding_side: str = "left", |
|
truncation: Union[bool, str, TruncationStrategy] = None, |
|
return_tensors: Optional[Union[str, TensorType]] = TensorType.PYTORCH, |
|
) -> BatchFeature: |
|
""" |
|
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` |
|
and `kwargs` arguments to SiglipTokenizer's [`~SiglipTokenizer.__call__`] if `text` is not `None` to encode |
|
the text. To prepare the image(s), this method forwards the `images` argument to |
|
SiglipImageProcessor's [`~SiglipImageProcessor.__call__`] if `images` is not `None`. Please refer to the doctsring |
|
of the above two methods for more information. |
|
|
|
Args: |
|
text (`str`, `List[str]`, `List[List[str]]`): |
|
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings |
|
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set |
|
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences). |
|
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`): |
|
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch |
|
tensor. In case of a NumPy array/PyTorch tensor, each image should be of shape (C, H, W), where C is a |
|
number of channels, H and W are image height and width. |
|
padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `False`): |
|
Select a strategy to pad the returned sequences (according to the model's padding side and padding |
|
index) among: |
|
- `True` or `'longest'`: Pad to the longest sequence in the batch (or no padding if only a single |
|
sequence if provided). |
|
- `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum |
|
acceptable input length for the model if that argument is not provided. |
|
- `False` or `'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of different |
|
lengths). |
|
max_input_length (`int`, *optional*): |
|
Maximum length of the returned list and optionally padding length (see above). |
|
truncation (`bool`, *optional*): |
|
Activates truncation to cut input sequences longer than `max_length` to `max_length`. |
|
|
|
Returns: |
|
[`BatchFeature`]: A [`BatchFeature`] with the following fields: |
|
|
|
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. |
|
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when |
|
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not |
|
`None`). |
|
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. |
|
""" |
|
|
|
|
|
if slice_mode is None: |
|
if self.slice_mode is None: |
|
raise ValueError("`slice_mode` is not specified by config or usage") |
|
else: |
|
slice_mode = self.slice_mode |
|
|
|
if max_inp_length is None: |
|
if self.max_inp_length is None: |
|
raise ValueError("`max_inp_length` is not specified by config or usage") |
|
else: |
|
max_inp_length = self.max_inp_length |
|
|
|
processed_subimages_all_data = [] |
|
processed_text_all_data = [] |
|
tgt_sizes_all_data = [] |
|
|
|
for msgs in messages: |
|
assert len(msgs) > 0, 'msgs is empty' |
|
|
|
processed_text_all_msgs = [] |
|
processed_subimages_all_msgs = [] |
|
tgt_sizes_all_msgs = [] |
|
|
|
|
|
for i, msg in enumerate(msgs): |
|
|
|
role = msg["role"] |
|
c = msg["content"] |
|
|
|
assert role in ["user", "assistant"] |
|
|
|
if i == 0: |
|
assert role == "user", "The role of first msg should be user" |
|
|
|
if isinstance(c, Image.Image): |
|
|
|
processed_subimages, tgt_sizes, best_grid = self.image_processor.preprocess(image=c, slice_mode=slice_mode) |
|
|
|
|
|
if slice_mode: |
|
cur_msg = ( |
|
self.tokenizer.im_start |
|
+ self.tokenizer.unk_token * self.query_num |
|
+ self.tokenizer.im_end |
|
) |
|
if len(processed_subimages) > 1: |
|
cur_msg += get_grid_placeholder( |
|
self.tokenizer, best_grid, self.query_num |
|
) |
|
|
|
else: |
|
cur_msg = ( |
|
self.tokenizer.im_start |
|
+ self.tokenizer.unk_token * self.query_num |
|
+ self.tokenizer.im_end |
|
) |
|
|
|
tgt_sizes_all_msgs.extend(tgt_sizes) |
|
processed_subimages_all_msgs.extend(processed_subimages) |
|
|
|
elif isinstance(c, str): |
|
cur_msg = c |
|
|
|
else: |
|
raise NotImplementedError(f"message {type(c)}: {c} can't be handled") |
|
|
|
role_title = "<用户>" if role == "user" else "<AI>" |
|
processed_text_all_msgs.append(role_title + cur_msg) |
|
|
|
processed_text_all_msgs_concat = "".join(processed_text_all_msgs) |
|
processed_text_all_msgs_concat += "<AI>" |
|
processed_text_all_data.append(processed_text_all_msgs_concat) |
|
|
|
processed_subimages_all_data.append(processed_subimages_all_msgs) |
|
tgt_sizes_all_msgs = torch.vstack(tgt_sizes_all_msgs) |
|
tgt_sizes_all_data.append(tgt_sizes_all_msgs) |
|
|
|
|
|
model_inputs_uncollated = [] |
|
for text in processed_text_all_data: |
|
model_inputs_ = convert_to_tokens( |
|
text, max_inp_length=max_inp_length, tokenizer=self.tokenizer |
|
) |
|
model_inputs_uncollated.append(model_inputs_) |
|
|
|
|
|
model_inputs_final = pad([i["input_ids"] for i in model_inputs_uncollated], padding_side=padding_side) |
|
|
|
|
|
model_inputs_final["image_bound"] = [i["image_bound"] for i in model_inputs_uncollated] |
|
|
|
|
|
model_inputs_final["pixel_values"] = processed_subimages_all_data |
|
|
|
|
|
model_inputs_final["tgt_sizes"] = tgt_sizes_all_data |
|
|
|
return BatchFeature(data=model_inputs_final, tensor_type=None) |
|
|
|
|