from typing import Any, Dict, List, Union import numpy as np from ..utils import add_end_docstrings, is_torch_available, is_vision_available, logging, requires_backends from .base import PIPELINE_INIT_ARGS, Pipeline if is_vision_available(): from PIL import Image from ..image_utils import load_image if is_torch_available(): from ..models.auto.modeling_auto import ( MODEL_FOR_IMAGE_SEGMENTATION_MAPPING_NAMES, MODEL_FOR_INSTANCE_SEGMENTATION_MAPPING_NAMES, MODEL_FOR_SEMANTIC_SEGMENTATION_MAPPING_NAMES, MODEL_FOR_UNIVERSAL_SEGMENTATION_MAPPING_NAMES, ) logger = logging.get_logger(__name__) Prediction = Dict[str, Any] Predictions = List[Prediction] @add_end_docstrings(PIPELINE_INIT_ARGS) class ImageSegmentationPipeline(Pipeline): """ Image segmentation pipeline using any `AutoModelForXXXSegmentation`. This pipeline predicts masks of objects and their classes. Example: ```python >>> from transformers import pipeline >>> segmenter = pipeline(model="facebook/detr-resnet-50-panoptic") >>> segments = segmenter("https://huggingface.co/datasets/Narsil/image_dummy/raw/main/parrots.png") >>> len(segments) 2 >>> segments[0]["label"] 'bird' >>> segments[1]["label"] 'bird' >>> type(segments[0]["mask"]) # This is a black and white mask showing where is the bird on the original image. >>> segments[0]["mask"].size (768, 512) ``` This image segmentation pipeline can currently be loaded from [`pipeline`] using the following task identifier: `"image-segmentation"`. See the list of available models on [huggingface.co/models](https://huggingface.co/models?filter=image-segmentation). """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if self.framework == "tf": raise ValueError(f"The {self.__class__} is only available in PyTorch.") requires_backends(self, "vision") mapping = MODEL_FOR_IMAGE_SEGMENTATION_MAPPING_NAMES.copy() mapping.update(MODEL_FOR_SEMANTIC_SEGMENTATION_MAPPING_NAMES) mapping.update(MODEL_FOR_INSTANCE_SEGMENTATION_MAPPING_NAMES) mapping.update(MODEL_FOR_UNIVERSAL_SEGMENTATION_MAPPING_NAMES) self.check_model_type(mapping) def _sanitize_parameters(self, **kwargs): preprocess_kwargs = {} postprocess_kwargs = {} if "subtask" in kwargs: postprocess_kwargs["subtask"] = kwargs["subtask"] preprocess_kwargs["subtask"] = kwargs["subtask"] if "threshold" in kwargs: postprocess_kwargs["threshold"] = kwargs["threshold"] if "mask_threshold" in kwargs: postprocess_kwargs["mask_threshold"] = kwargs["mask_threshold"] if "overlap_mask_area_threshold" in kwargs: postprocess_kwargs["overlap_mask_area_threshold"] = kwargs["overlap_mask_area_threshold"] if "timeout" in kwargs: preprocess_kwargs["timeout"] = kwargs["timeout"] return preprocess_kwargs, {}, postprocess_kwargs def __call__(self, images, **kwargs) -> Union[Predictions, List[Prediction]]: """ Perform segmentation (detect masks & classes) in the image(s) passed as inputs. Args: images (`str`, `List[str]`, `PIL.Image` or `List[PIL.Image]`): The pipeline handles three types of images: - A string containing an HTTP(S) link pointing to an image - A string containing a local path to an image - An image loaded in PIL directly The pipeline accepts either a single image or a batch of images. Images in a batch must all be in the same format: all as HTTP(S) links, all as local paths, or all as PIL images. subtask (`str`, *optional*): Segmentation task to be performed, choose [`semantic`, `instance` and `panoptic`] depending on model capabilities. If not set, the pipeline will attempt tp resolve in the following order: `panoptic`, `instance`, `semantic`. threshold (`float`, *optional*, defaults to 0.9): Probability threshold to filter out predicted masks. mask_threshold (`float`, *optional*, defaults to 0.5): Threshold to use when turning the predicted masks into binary values. overlap_mask_area_threshold (`float`, *optional*, defaults to 0.5): Mask overlap threshold to eliminate small, disconnected segments. timeout (`float`, *optional*, defaults to None): The maximum time in seconds to wait for fetching images from the web. If None, no timeout is set and the call may block forever. Return: A dictionary or a list of dictionaries containing the result. If the input is a single image, will return a list of dictionaries, if the input is a list of several images, will return a list of list of dictionaries corresponding to each image. The dictionaries contain the mask, label and score (where applicable) of each detected object and contains the following keys: - **label** (`str`) -- The class label identified by the model. - **mask** (`PIL.Image`) -- A binary mask of the detected object as a Pil Image of shape (width, height) of the original image. Returns a mask filled with zeros if no object is found. - **score** (*optional* `float`) -- Optionally, when the model is capable of estimating a confidence of the "object" described by the label and the mask. """ return super().__call__(images, **kwargs) def preprocess(self, image, subtask=None, timeout=None): image = load_image(image, timeout=timeout) target_size = [(image.height, image.width)] if self.model.config.__class__.__name__ == "OneFormerConfig": if subtask is None: kwargs = {} else: kwargs = {"task_inputs": [subtask]} inputs = self.image_processor(images=[image], return_tensors="pt", **kwargs) inputs["task_inputs"] = self.tokenizer( inputs["task_inputs"], padding="max_length", max_length=self.model.config.task_seq_len, return_tensors=self.framework, )["input_ids"] else: inputs = self.image_processor(images=[image], return_tensors="pt") inputs["target_size"] = target_size return inputs def _forward(self, model_inputs): target_size = model_inputs.pop("target_size") model_outputs = self.model(**model_inputs) model_outputs["target_size"] = target_size return model_outputs def postprocess( self, model_outputs, subtask=None, threshold=0.9, mask_threshold=0.5, overlap_mask_area_threshold=0.5 ): fn = None if subtask in {"panoptic", None} and hasattr(self.image_processor, "post_process_panoptic_segmentation"): fn = self.image_processor.post_process_panoptic_segmentation elif subtask in {"instance", None} and hasattr(self.image_processor, "post_process_instance_segmentation"): fn = self.image_processor.post_process_instance_segmentation if fn is not None: outputs = fn( model_outputs, threshold=threshold, mask_threshold=mask_threshold, overlap_mask_area_threshold=overlap_mask_area_threshold, target_sizes=model_outputs["target_size"], )[0] annotation = [] segmentation = outputs["segmentation"] for segment in outputs["segments_info"]: mask = (segmentation == segment["id"]) * 255 mask = Image.fromarray(mask.numpy().astype(np.uint8), mode="L") label = self.model.config.id2label[segment["label_id"]] score = segment["score"] annotation.append({"score": score, "label": label, "mask": mask}) elif subtask in {"semantic", None} and hasattr(self.image_processor, "post_process_semantic_segmentation"): outputs = self.image_processor.post_process_semantic_segmentation( model_outputs, target_sizes=model_outputs["target_size"] )[0] annotation = [] segmentation = outputs.numpy() labels = np.unique(segmentation) for label in labels: mask = (segmentation == label) * 255 mask = Image.fromarray(mask.astype(np.uint8), mode="L") label = self.model.config.id2label[label] annotation.append({"score": None, "label": label, "mask": mask}) else: raise ValueError(f"Subtask {subtask} is not supported for model {type(self.model)}") return annotation