from typing import Optional, Union, Dict, Any import torch import math import PIL.Image import PIL.ImageSequence import numpy as np import PIL from PIL import Image from transformers.utils import TensorType, requires_backends, is_torch_dtype, is_torch_device from transformers.image_processing_utils import BaseImageProcessor, BatchFeature from transformers import AutoImageProcessor from transformers.image_transforms import to_channel_dimension_format from transformers.image_utils import ( ImageInput, make_list_of_images, valid_images, is_torch_tensor, to_numpy_array, infer_channel_dimension_format, ChannelDimension ) def recursive_converter(converter, value): if isinstance(value, list): new_value = [] for v in value: new_value += [recursive_converter(converter, v)] return new_value else: return converter(value) class MiniCPMVBatchFeature(BatchFeature): r""" Extend from BatchFeature for supporting various image size """ def __init__(self, data: Optional[Dict[str, Any]] = None, tensor_type: Union[None, str, TensorType] = None): super().__init__(data) self.convert_to_tensors(tensor_type=tensor_type) def convert_to_tensors(self, tensor_type: Optional[Union[str, TensorType]] = None): if tensor_type is None: return self is_tensor, as_tensor = self._get_is_as_tensor_fns(tensor_type) def converter(value): try: if not is_tensor(value): tensor = as_tensor(value) return tensor return value except: # noqa E722 if key == "overflowing_values": raise ValueError("Unable to create tensor returning overflowing values of different lengths. ") raise ValueError( "Unable to create tensor, you should probably activate padding " "with 'padding=True' to have batched tensors with the same length." ) for key, value in self.items(): self[key] = recursive_converter(converter, value) return self def to(self, *args, **kwargs) -> "MiniCPMVBatchFeature": requires_backends(self, ["torch"]) import torch def cast_tensor(v): # check if v is a floating point if v is None: return None if torch.is_floating_point(v): # cast and send to device return v.to(*args, **kwargs) elif device is not None: return v.to(device=device) else: return v new_data = {} device = kwargs.get("device") # Check if the args are a device or a dtype if device is None and len(args) > 0: # device should be always the first argument arg = args[0] if is_torch_dtype(arg): # The first argument is a dtype pass elif isinstance(arg, str) or is_torch_device(arg) or isinstance(arg, int): device = arg else: # it's something else raise ValueError(f"Attempting to cast a BatchFeature to type {str(arg)}. This is not supported.") # We cast only floating point tensors to avoid issues with tokenizers casting `LongTensor` to `FloatTensor` for k, v in self.items(): new_data[k] = recursive_converter(cast_tensor, v) self.data = new_data return self class MiniCPMVImageProcessor(BaseImageProcessor): model_input_names = ["pixel_values"] def __init__( self, max_slice_nums=9, scale_resolution=448, patch_size=14, **kwargs): super().__init__(**kwargs) self.max_slice_nums = max_slice_nums self.scale_resolution = scale_resolution self.patch_size = patch_size self.image_feature_size = kwargs.pop("image_feature_size", 64) self.im_start_token = kwargs.pop("im_start", "") self.im_end_token = kwargs.pop("im_end", "") self.slice_start_token = kwargs.pop("slice_start", "") self.slice_end_token = kwargs.pop("slice_end", "") self.unk_token = kwargs.pop("unk", "") self.mean = np.array(kwargs.pop("norm_mean", [0.5, 0.5, 0.5])) self.std = np.array(kwargs.pop("norm_std", [0.5, 0.5, 0.5])) self.version = kwargs.pop("version", 2.0) def ensure_divide(self, length, patch_size): return max(round(length / patch_size) * patch_size, patch_size) def find_best_resize(self, original_size, scale_resolution, patch_size, allow_upscale=False): width, height = original_size if (width * height > scale_resolution * scale_resolution) or allow_upscale: r = width / height height = int(scale_resolution / math.sqrt(r)) width = int(height * r) best_width = self.ensure_divide(width, patch_size) best_height = self.ensure_divide(height, patch_size) return (best_width, best_height) def get_refine_size(self, original_size, grid, scale_resolution, patch_size, allow_upscale=False): width, height = original_size grid_x, grid_y = grid refine_width = self.ensure_divide(width, grid_x) refine_height = self.ensure_divide(height, grid_y) grid_width = refine_width / grid_x grid_height = refine_height / grid_y best_grid_size = self.find_best_resize((grid_width, grid_height), scale_resolution, patch_size, allow_upscale=allow_upscale) refine_size = (best_grid_size[0] * grid_x, best_grid_size[1] * grid_y) return refine_size def split_to_patches(self, image, grid): patches = [] width, height = image.size grid_x = int(width / grid[0]) grid_y = int(height / grid[1]) for i in range(0, height, grid_y): images = [] for j in range(0, width, grid_x): box = (j, i, j + grid_x, i + grid_y) patch = image.crop(box) images.append(patch) patches.append(images) return patches def slice_image( self, image, max_slice_nums=9, scale_resolution=448, patch_size=14, never_split=False ): original_size = image.size original_width, original_height = original_size log_ratio = math.log(original_width / original_height) ratio = original_width * original_height / (scale_resolution * scale_resolution) multiple = min(math.ceil(ratio), max_slice_nums) source_image = None best_grid = None patches = [] if multiple <= 1 or never_split: # dont need to slice, upsample best_size = self.find_best_resize( original_size, scale_resolution, patch_size, allow_upscale=True ) source_image = image.resize(best_size, resample=Image.Resampling.BICUBIC) else: candidate_split_grids_nums = [] for i in [multiple - 1, multiple, multiple + 1]: if i == 1 or i > max_slice_nums: continue candidate_split_grids_nums.append(i) # source image, down-sampling and ensure divided by patch_size best_resize = self.find_best_resize(original_size, scale_resolution, patch_size) source_image = image.copy().resize(best_resize, resample=Image.Resampling.BICUBIC) candidate_grids = [] # find best grid for split_grids_nums in candidate_split_grids_nums: m = 1 while m <= split_grids_nums: if split_grids_nums % m == 0: candidate_grids.append([m, split_grids_nums // m]) m += 1 best_grid = [1, 1] min_error = float("inf") for grid in candidate_grids: error = abs(log_ratio - math.log(grid[0] / grid[1])) if error < min_error: best_grid = grid min_error = error refine_size = self.get_refine_size( original_size, best_grid, scale_resolution, patch_size, allow_upscale=True ) refine_image = image.resize(refine_size, resample=Image.Resampling.BICUBIC) patches = self.split_to_patches(refine_image, best_grid) return source_image, patches, best_grid def get_grid_placeholder(self, grid): if grid is None: return "" image_placeholder = ( self.im_start_token + self.unk_token * self.image_feature_size + self.im_end_token ) cols = grid[0] rows = grid[1] slices = [] for i in range(rows): lines = [] for j in range(cols): lines.append(image_placeholder) slices.append("".join(lines)) slice_placeholder = self.slice_start_token + "\n".join(slices) + self.slice_end_token return slice_placeholder def get_sliced_images(self, image): slice_images = [] source_image, patches, sliced_grid = self.slice_image( image, self.max_slice_nums, # default: 9 self.scale_resolution, # default: 448 self.patch_size # default: 14 ) slice_images.append(source_image) if len(patches) > 0: for i in range(len(patches)): for j in range(len(patches[0])): slice_images.append(patches[i][j]) return slice_images def get_sliced_grid(self, image_size): original_width, original_height = image_size log_ratio = math.log(original_width / original_height) ratio = original_width * original_height / (self.scale_resolution * self.scale_resolution) multiple = min(math.ceil(ratio), self.max_slice_nums) if multiple <= 1: return None candidate_split_grids_nums = [] for i in [multiple - 1, multiple, multiple + 1]: if i == 1 or i > self.max_slice_nums: continue candidate_split_grids_nums.append(i) candidate_grids = [] for split_grids_nums in candidate_split_grids_nums: m = 1 while m <= split_grids_nums: if split_grids_nums % m == 0: candidate_grids.append([m, split_grids_nums // m]) m += 1 best_grid = [1, 1] min_error = float("inf") for grid in candidate_grids: error = abs(log_ratio - math.log(grid[0] / grid[1])) if error < min_error: best_grid = grid min_error = error return best_grid def get_slice_image_placeholder(self, image_size): grid = self.get_sliced_grid(image_size=image_size) return ( self.im_start_token + self.unk_token * self.image_feature_size + self.im_end_token ) + self.get_grid_placeholder(grid=grid) + "\n" def to_pil_image(self, image, rescale=None) -> PIL.Image.Image: """ Converts `image` to a PIL Image. Optionally rescales it and puts the channel dimension back as the last axis if needed. Args: image (`PIL.Image.Image` or `numpy.ndarray` or `torch.Tensor`): The image to convert to the PIL Image format. rescale (`bool`, *optional*): Whether or not to apply the scaling factor (to make pixel values integers between 0 and 255). Will default to `True` if the image type is a floating type, `False` otherwise. """ if isinstance(image, PIL.Image.Image): return image if is_torch_tensor(image): image = image.numpy() if isinstance(image, np.ndarray): if rescale is None: # rescale default to the array being of floating type. rescale = isinstance(image.flat[0], np.floating) # If the channel as been moved to first dim, we put it back at the end. if image.ndim == 3 and image.shape[0] in [1, 3]: image = image.transpose(1, 2, 0) if rescale: image = image * 255 image = image.astype(np.uint8) return PIL.Image.fromarray(image) return image def reshape_by_patch(self, image): """ :param image: shape [3, H, W] :param patch_size: :return: [3, patch_size, HW/patch_size] """ image = torch.from_numpy(image) patch_size = self.patch_size patches = torch.nn.functional.unfold( image, (patch_size, patch_size), stride=(patch_size, patch_size) ) patches = patches.reshape(image.size(0), patch_size, patch_size, -1) patches = patches.permute(0, 1, 3, 2).reshape(image.size(0), patch_size, -1) return patches.numpy() def preprocess( self, images: ImageInput, do_pad: Optional[bool] = True, # TODO: add pad for MiniCPM-Llama3-V-2_5 return_tensors: Optional[Union[str, TensorType]] = None ) -> MiniCPMVBatchFeature: images = make_list_of_images(images) if not valid_images(images): raise ValueError( "Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, " "torch.Tensor, tf.Tensor or jax.ndarray." ) images = [self.to_pil_image(image).convert("RGB") for image in images] input_data_format = infer_channel_dimension_format(np.array(images[0])) new_images = [] image_sizes = [image.size for image in images] tgt_sizes = [] for image in images: image_patches = self.get_sliced_images(image) image_patches = [to_numpy_array(image).astype(np.float32) / 255 for image in image_patches] image_patches = [ self.normalize(image=image, mean=self.mean, std=self.std, input_data_format=input_data_format) for image in image_patches ] image_patches = [ to_channel_dimension_format(image, ChannelDimension.FIRST, input_channel_dim=input_data_format) for image in image_patches ] patches_tgt_sizes = [ np.array((image.shape[1] // self.patch_size, image.shape[2] // self.patch_size)) for image in image_patches ] patches_tgt_sizes = np.vstack(patches_tgt_sizes) new_images += [image_patches] tgt_sizes += [patches_tgt_sizes] return MiniCPMVBatchFeature( data={"pixel_values": new_images, "image_sizes": image_sizes, "tgt_sizes": tgt_sizes}, tensor_type=return_tensors ) AutoImageProcessor.register("MiniCPMVImageProcessor", MiniCPMVImageProcessor)