|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Image processor class for Beit.""" |
|
|
|
import warnings |
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
import numpy as np |
|
|
|
from ...image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict |
|
from ...image_transforms import resize, to_channel_dimension_format |
|
from ...image_utils import ( |
|
IMAGENET_STANDARD_MEAN, |
|
IMAGENET_STANDARD_STD, |
|
ChannelDimension, |
|
ImageInput, |
|
PILImageResampling, |
|
infer_channel_dimension_format, |
|
is_scaled_image, |
|
make_list_of_images, |
|
to_numpy_array, |
|
valid_images, |
|
) |
|
from ...utils import TensorType, is_torch_available, is_torch_tensor, is_vision_available, logging |
|
|
|
|
|
if is_vision_available(): |
|
import PIL |
|
|
|
if is_torch_available(): |
|
import torch |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
class BeitImageProcessor(BaseImageProcessor): |
|
r""" |
|
Constructs a BEiT image processor. |
|
|
|
Args: |
|
do_resize (`bool`, *optional*, defaults to `True`): |
|
Whether to resize the image's (height, width) dimensions to the specified `size`. Can be overridden by the |
|
`do_resize` parameter in the `preprocess` method. |
|
size (`Dict[str, int]` *optional*, defaults to `{"height": 256, "width": 256}`): |
|
Size of the output image after resizing. Can be overridden by the `size` parameter in the `preprocess` |
|
method. |
|
resample (`PILImageResampling`, *optional*, defaults to `Resampling.BICUBIC`): |
|
Resampling filter to use if resizing the image. Can be overridden by the `resample` parameter in the |
|
`preprocess` method. |
|
do_center_crop (`bool`, *optional*, defaults to `True`): |
|
Whether to center crop the image. If the input size is smaller than `crop_size` along any edge, the image |
|
is padded with 0's and then center cropped. Can be overridden by the `do_center_crop` parameter in the |
|
`preprocess` method. |
|
crop_size (`Dict[str, int]`, *optional*, defaults to `{"height": 224, "width": 224}`): |
|
Desired output size when applying center-cropping. Only has an effect if `do_center_crop` is set to `True`. |
|
Can be overridden by the `crop_size` parameter in the `preprocess` method. |
|
rescale_factor (`int` or `float`, *optional*, defaults to `1/255`): |
|
Scale factor to use if rescaling the image. Can be overridden by the `rescale_factor` parameter in the |
|
`preprocess` method. |
|
do_rescale (`bool`, *optional*, defaults to `True`): |
|
Whether to rescale the image by the specified scale `rescale_factor`. Can be overridden by the `do_rescale` |
|
parameter in the `preprocess` method. |
|
do_normalize (`bool`, *optional*, defaults to `True`): |
|
Whether to normalize the image. Can be overridden by the `do_normalize` parameter in the `preprocess` |
|
method. |
|
image_mean (`float` or `List[float]`, *optional*, defaults to `IMAGENET_STANDARD_MEAN`): |
|
The mean to use if normalizing the image. This is a float or list of floats of length of the number of |
|
channels of the image. Can be overridden by the `image_mean` parameter in the `preprocess` method. |
|
image_std (`float` or `List[float]`, *optional*, defaults to `IMAGENET_STANDARD_STD`): |
|
The standard deviation to use if normalizing the image. This is a float or list of floats of length of the |
|
number of channels of the image. Can be overridden by the `image_std` parameter in the `preprocess` method. |
|
do_reduce_labels (`bool`, *optional*, defaults to `False`): |
|
Whether or not to reduce all label values of segmentation maps by 1. Usually used for datasets where 0 is |
|
used for background, and background itself is not included in all classes of a dataset (e.g. ADE20k). The |
|
background label will be replaced by 255. Can be overridden by the `do_reduce_labels` parameter in the |
|
`preprocess` method. |
|
""" |
|
|
|
model_input_names = ["pixel_values"] |
|
|
|
def __init__( |
|
self, |
|
do_resize: bool = True, |
|
size: Dict[str, int] = None, |
|
resample: PILImageResampling = PILImageResampling.BICUBIC, |
|
do_center_crop: bool = True, |
|
crop_size: Dict[str, int] = None, |
|
rescale_factor: Union[int, float] = 1 / 255, |
|
do_rescale: bool = True, |
|
do_normalize: bool = True, |
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
image_std: Optional[Union[float, List[float]]] = None, |
|
do_reduce_labels: bool = False, |
|
**kwargs, |
|
) -> None: |
|
if "reduce_labels" in kwargs: |
|
warnings.warn( |
|
"The `reduce_labels` parameter is deprecated and will be removed in a future version. Please use" |
|
" `do_reduce_labels` instead.", |
|
FutureWarning, |
|
) |
|
do_reduce_labels = kwargs.pop("reduce_labels") |
|
super().__init__(**kwargs) |
|
size = size if size is not None else {"height": 256, "width": 256} |
|
size = get_size_dict(size) |
|
crop_size = crop_size if crop_size is not None else {"height": 224, "width": 224} |
|
crop_size = get_size_dict(crop_size, param_name="crop_size") |
|
self.do_resize = do_resize |
|
self.size = size |
|
self.resample = resample |
|
self.do_center_crop = do_center_crop |
|
self.crop_size = crop_size |
|
self.do_rescale = do_rescale |
|
self.rescale_factor = rescale_factor |
|
self.do_normalize = do_normalize |
|
self.image_mean = image_mean if image_mean is not None else IMAGENET_STANDARD_MEAN |
|
self.image_std = image_std if image_std is not None else IMAGENET_STANDARD_STD |
|
self.do_reduce_labels = do_reduce_labels |
|
|
|
@classmethod |
|
def from_dict(cls, image_processor_dict: Dict[str, Any], **kwargs): |
|
""" |
|
Overrides the `from_dict` method from the base class to make sure `reduce_labels` is updated if image processor |
|
is created using from_dict and kwargs e.g. `BeitImageProcessor.from_pretrained(checkpoint, reduce_labels=True)` |
|
""" |
|
image_processor_dict = image_processor_dict.copy() |
|
if "reduce_labels" in kwargs: |
|
image_processor_dict["reduce_labels"] = kwargs.pop("reduce_labels") |
|
return super().from_dict(image_processor_dict, **kwargs) |
|
|
|
def resize( |
|
self, |
|
image: np.ndarray, |
|
size: Dict[str, int], |
|
resample: PILImageResampling = PILImageResampling.BICUBIC, |
|
data_format: Optional[Union[str, ChannelDimension]] = None, |
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
**kwargs, |
|
) -> np.ndarray: |
|
""" |
|
Resize an image to (size["height"], size["width"]). |
|
|
|
Args: |
|
image (`np.ndarray`): |
|
Image to resize. |
|
size (`Dict[str, int]`): |
|
Size of the output image. |
|
resample (`PILImageResampling`, *optional*, defaults to `PIL.Image.BICUBIC`): |
|
Resampling filter to use when resiizing the image. |
|
data_format (`str` or `ChannelDimension`, *optional*): |
|
The channel dimension format of the image. If not provided, it will be the same as the input image. |
|
input_data_format (`str` or `ChannelDimension`, *optional*): |
|
The channel dimension format of the input image. If not provided, it will be inferred. |
|
""" |
|
size = get_size_dict(size, default_to_square=True, param_name="size") |
|
if "height" not in size or "width" not in size: |
|
raise ValueError(f"The `size` argument must contain `height` and `width` keys. Got {size.keys()}") |
|
return resize( |
|
image, |
|
size=(size["height"], size["width"]), |
|
resample=resample, |
|
data_format=data_format, |
|
input_data_format=input_data_format, |
|
**kwargs, |
|
) |
|
|
|
def reduce_label(self, label: ImageInput) -> np.ndarray: |
|
label = to_numpy_array(label) |
|
|
|
label[label == 0] = 255 |
|
label = label - 1 |
|
label[label == 254] = 255 |
|
return label |
|
|
|
def _preprocess( |
|
self, |
|
image: ImageInput, |
|
do_reduce_labels: bool = None, |
|
do_resize: bool = None, |
|
size: Dict[str, int] = None, |
|
resample: PILImageResampling = None, |
|
do_center_crop: bool = None, |
|
crop_size: Dict[str, int] = None, |
|
do_rescale: bool = None, |
|
rescale_factor: float = None, |
|
do_normalize: bool = None, |
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
image_std: Optional[Union[float, List[float]]] = None, |
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
): |
|
if do_reduce_labels: |
|
image = self.reduce_label(image) |
|
|
|
if do_resize: |
|
image = self.resize(image=image, size=size, resample=resample, input_data_format=input_data_format) |
|
|
|
if do_center_crop: |
|
image = self.center_crop(image=image, size=crop_size, input_data_format=input_data_format) |
|
|
|
if do_rescale: |
|
image = self.rescale(image=image, scale=rescale_factor, input_data_format=input_data_format) |
|
|
|
if do_normalize: |
|
image = self.normalize(image=image, mean=image_mean, std=image_std, input_data_format=input_data_format) |
|
|
|
return image |
|
|
|
def _preprocess_image( |
|
self, |
|
image: ImageInput, |
|
do_resize: bool = None, |
|
size: Dict[str, int] = None, |
|
resample: PILImageResampling = None, |
|
do_center_crop: bool = None, |
|
crop_size: Dict[str, int] = None, |
|
do_rescale: bool = None, |
|
rescale_factor: float = None, |
|
do_normalize: bool = None, |
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
image_std: Optional[Union[float, List[float]]] = None, |
|
data_format: Optional[Union[str, ChannelDimension]] = None, |
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
) -> np.ndarray: |
|
"""Preprocesses a single image.""" |
|
|
|
image = to_numpy_array(image) |
|
if is_scaled_image(image) and do_rescale: |
|
logger.warning_once( |
|
"It looks like you are trying to rescale already rescaled images. If the input" |
|
" images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again." |
|
) |
|
if input_data_format is None: |
|
input_data_format = infer_channel_dimension_format(image) |
|
image = self._preprocess( |
|
image, |
|
do_reduce_labels=False, |
|
do_resize=do_resize, |
|
size=size, |
|
resample=resample, |
|
do_center_crop=do_center_crop, |
|
crop_size=crop_size, |
|
do_rescale=do_rescale, |
|
rescale_factor=rescale_factor, |
|
do_normalize=do_normalize, |
|
image_mean=image_mean, |
|
image_std=image_std, |
|
input_data_format=input_data_format, |
|
) |
|
if data_format is not None: |
|
image = to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) |
|
return image |
|
|
|
def _preprocess_segmentation_map( |
|
self, |
|
segmentation_map: ImageInput, |
|
do_resize: bool = None, |
|
size: Dict[str, int] = None, |
|
resample: PILImageResampling = None, |
|
do_center_crop: bool = None, |
|
crop_size: Dict[str, int] = None, |
|
do_reduce_labels: bool = None, |
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
): |
|
"""Preprocesses a single segmentation map.""" |
|
|
|
segmentation_map = to_numpy_array(segmentation_map) |
|
|
|
if segmentation_map.ndim == 2: |
|
segmentation_map = segmentation_map[None, ...] |
|
added_dimension = True |
|
input_data_format = ChannelDimension.FIRST |
|
else: |
|
added_dimension = False |
|
if input_data_format is None: |
|
input_data_format = infer_channel_dimension_format(segmentation_map, num_channels=1) |
|
segmentation_map = self._preprocess( |
|
image=segmentation_map, |
|
do_reduce_labels=do_reduce_labels, |
|
do_resize=do_resize, |
|
resample=resample, |
|
size=size, |
|
do_center_crop=do_center_crop, |
|
crop_size=crop_size, |
|
do_normalize=False, |
|
do_rescale=False, |
|
input_data_format=ChannelDimension.FIRST, |
|
) |
|
|
|
if added_dimension: |
|
segmentation_map = np.squeeze(segmentation_map, axis=0) |
|
segmentation_map = segmentation_map.astype(np.int64) |
|
return segmentation_map |
|
|
|
def __call__(self, images, segmentation_maps=None, **kwargs): |
|
|
|
|
|
return super().__call__(images, segmentation_maps=segmentation_maps, **kwargs) |
|
|
|
def preprocess( |
|
self, |
|
images: ImageInput, |
|
segmentation_maps: Optional[ImageInput] = None, |
|
do_resize: bool = None, |
|
size: Dict[str, int] = None, |
|
resample: PILImageResampling = None, |
|
do_center_crop: bool = None, |
|
crop_size: Dict[str, int] = None, |
|
do_rescale: bool = None, |
|
rescale_factor: float = None, |
|
do_normalize: bool = None, |
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
image_std: Optional[Union[float, List[float]]] = None, |
|
do_reduce_labels: Optional[bool] = None, |
|
return_tensors: Optional[Union[str, TensorType]] = None, |
|
data_format: ChannelDimension = ChannelDimension.FIRST, |
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
**kwargs, |
|
) -> PIL.Image.Image: |
|
""" |
|
Preprocess an image or batch of images. |
|
|
|
Args: |
|
images (`ImageInput`): |
|
Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If |
|
passing in images with pixel values between 0 and 1, set `do_rescale=False`. |
|
do_resize (`bool`, *optional*, defaults to `self.do_resize`): |
|
Whether to resize the image. |
|
size (`Dict[str, int]`, *optional*, defaults to `self.size`): |
|
Size of the image after resizing. |
|
resample (`int`, *optional*, defaults to `self.resample`): |
|
Resampling filter to use if resizing the image. This can be one of the enum `PILImageResampling`, Only |
|
has an effect if `do_resize` is set to `True`. |
|
do_center_crop (`bool`, *optional*, defaults to `self.do_center_crop`): |
|
Whether to center crop the image. |
|
crop_size (`Dict[str, int]`, *optional*, defaults to `self.crop_size`): |
|
Size of the image after center crop. If one edge the image is smaller than `crop_size`, it will be |
|
padded with zeros and then cropped |
|
do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): |
|
Whether to rescale the image values between [0 - 1]. |
|
rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): |
|
Rescale factor to rescale the image by if `do_rescale` is set to `True`. |
|
do_normalize (`bool`, *optional*, defaults to `self.do_normalize`): |
|
Whether to normalize the image. |
|
image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`): |
|
Image mean. |
|
image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`): |
|
Image standard deviation. |
|
do_reduce_labels (`bool`, *optional*, defaults to `self.do_reduce_labels`): |
|
Whether or not to reduce all label values of segmentation maps by 1. Usually used for datasets where 0 |
|
is used for background, and background itself is not included in all classes of a dataset (e.g. |
|
ADE20k). The background label will be replaced by 255. |
|
return_tensors (`str` or `TensorType`, *optional*): |
|
The type of tensors to return. Can be one of: |
|
- Unset: Return a list of `np.ndarray`. |
|
- `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`. |
|
- `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`. |
|
- `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`. |
|
- `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`. |
|
data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`): |
|
The channel dimension format for the output image. Can be one of: |
|
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
|
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
|
- Unset: Use the channel dimension format of the input image. |
|
input_data_format (`ChannelDimension` or `str`, *optional*): |
|
The channel dimension format for the input image. If unset, the channel dimension format is inferred |
|
from the input image. Can be one of: |
|
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. |
|
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. |
|
- `"none"` or `ChannelDimension.NONE`: image in (height, width) format. |
|
""" |
|
do_resize = do_resize if do_resize is not None else self.do_resize |
|
size = size if size is not None else self.size |
|
size = get_size_dict(size, default_to_square=True, param_name="size") |
|
resample = resample if resample is not None else self.resample |
|
do_center_crop = do_center_crop if do_center_crop is not None else self.do_center_crop |
|
crop_size = crop_size if crop_size is not None else self.crop_size |
|
crop_size = get_size_dict(crop_size, default_to_square=True, param_name="crop_size") |
|
do_rescale = do_rescale if do_rescale is not None else self.do_rescale |
|
rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor |
|
do_normalize = do_normalize if do_normalize is not None else self.do_normalize |
|
image_mean = image_mean if image_mean is not None else self.image_mean |
|
image_std = image_std if image_std is not None else self.image_std |
|
do_reduce_labels = do_reduce_labels if do_reduce_labels is not None else self.do_reduce_labels |
|
|
|
images = make_list_of_images(images) |
|
if segmentation_maps is not None: |
|
segmentation_maps = make_list_of_images(segmentation_maps, expected_ndims=2) |
|
|
|
if not valid_images(images): |
|
raise ValueError( |
|
"Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, " |
|
"torch.Tensor, tf.Tensor or jax.ndarray." |
|
) |
|
|
|
if segmentation_maps is not None and not valid_images(segmentation_maps): |
|
raise ValueError( |
|
"Invalid segmentation map type. Must be of type PIL.Image.Image, numpy.ndarray, " |
|
"torch.Tensor, tf.Tensor or jax.ndarray." |
|
) |
|
|
|
if do_resize and size is None or resample is None: |
|
raise ValueError("Size and resample must be specified if do_resize is True.") |
|
|
|
if do_center_crop and crop_size is None: |
|
raise ValueError("Crop size must be specified if do_center_crop is True.") |
|
|
|
if do_rescale and rescale_factor is None: |
|
raise ValueError("Rescale factor must be specified if do_rescale is True.") |
|
|
|
if do_normalize and (image_mean is None or image_std is None): |
|
raise ValueError("Image mean and std must be specified if do_normalize is True.") |
|
|
|
images = [ |
|
self._preprocess_image( |
|
image=img, |
|
do_resize=do_resize, |
|
do_center_crop=do_center_crop, |
|
do_rescale=do_rescale, |
|
do_normalize=do_normalize, |
|
resample=resample, |
|
size=size, |
|
rescale_factor=rescale_factor, |
|
crop_size=crop_size, |
|
image_mean=image_mean, |
|
image_std=image_std, |
|
data_format=data_format, |
|
input_data_format=input_data_format, |
|
) |
|
for img in images |
|
] |
|
|
|
data = {"pixel_values": images} |
|
|
|
if segmentation_maps is not None: |
|
segmentation_maps = [ |
|
self._preprocess_segmentation_map( |
|
segmentation_map=segmentation_map, |
|
do_reduce_labels=do_reduce_labels, |
|
do_resize=do_resize, |
|
resample=resample, |
|
size=size, |
|
do_center_crop=do_center_crop, |
|
crop_size=crop_size, |
|
) |
|
for segmentation_map in segmentation_maps |
|
] |
|
data["labels"] = segmentation_maps |
|
|
|
return BatchFeature(data=data, tensor_type=return_tensors) |
|
|
|
def post_process_semantic_segmentation(self, outputs, target_sizes: List[Tuple] = None): |
|
""" |
|
Converts the output of [`BeitForSemanticSegmentation`] into semantic segmentation maps. Only supports PyTorch. |
|
|
|
Args: |
|
outputs ([`BeitForSemanticSegmentation`]): |
|
Raw outputs of the model. |
|
target_sizes (`List[Tuple]` of length `batch_size`, *optional*): |
|
List of tuples corresponding to the requested final size (height, width) of each prediction. If unset, |
|
predictions will not be resized. |
|
|
|
Returns: |
|
semantic_segmentation: `List[torch.Tensor]` of length `batch_size`, where each item is a semantic |
|
segmentation map of shape (height, width) corresponding to the target_sizes entry (if `target_sizes` is |
|
specified). Each entry of each `torch.Tensor` correspond to a semantic class id. |
|
""" |
|
|
|
logits = outputs.logits |
|
|
|
|
|
if target_sizes is not None: |
|
if len(logits) != len(target_sizes): |
|
raise ValueError( |
|
"Make sure that you pass in as many target sizes as the batch dimension of the logits" |
|
) |
|
|
|
if is_torch_tensor(target_sizes): |
|
target_sizes = target_sizes.numpy() |
|
|
|
semantic_segmentation = [] |
|
|
|
for idx in range(len(logits)): |
|
resized_logits = torch.nn.functional.interpolate( |
|
logits[idx].unsqueeze(dim=0), size=target_sizes[idx], mode="bilinear", align_corners=False |
|
) |
|
semantic_map = resized_logits[0].argmax(dim=0) |
|
semantic_segmentation.append(semantic_map) |
|
else: |
|
semantic_segmentation = logits.argmax(dim=1) |
|
semantic_segmentation = [semantic_segmentation[i] for i in range(semantic_segmentation.shape[0])] |
|
|
|
return semantic_segmentation |
|
|