Spaces:
Runtime error
Runtime error
| from typing import Any, Dict, List, Union | |
| import numpy as np | |
| from ..utils import add_end_docstrings, is_torch_available, is_vision_available, logging, requires_backends | |
| from .base import PIPELINE_INIT_ARGS, Pipeline | |
| if is_vision_available(): | |
| from PIL import Image | |
| from ..image_utils import load_image | |
| if is_torch_available(): | |
| from ..models.auto.modeling_auto import ( | |
| MODEL_FOR_IMAGE_SEGMENTATION_MAPPING_NAMES, | |
| MODEL_FOR_INSTANCE_SEGMENTATION_MAPPING_NAMES, | |
| MODEL_FOR_SEMANTIC_SEGMENTATION_MAPPING_NAMES, | |
| MODEL_FOR_UNIVERSAL_SEGMENTATION_MAPPING_NAMES, | |
| ) | |
| logger = logging.get_logger(__name__) | |
| Prediction = Dict[str, Any] | |
| Predictions = List[Prediction] | |
| class ImageSegmentationPipeline(Pipeline): | |
| """ | |
| Image segmentation pipeline using any `AutoModelForXXXSegmentation`. This pipeline predicts masks of objects and | |
| their classes. | |
| Example: | |
| ```python | |
| >>> from transformers import pipeline | |
| >>> segmenter = pipeline(model="facebook/detr-resnet-50-panoptic") | |
| >>> segments = segmenter("https://huggingface.co/datasets/Narsil/image_dummy/raw/main/parrots.png") | |
| >>> len(segments) | |
| 2 | |
| >>> segments[0]["label"] | |
| 'bird' | |
| >>> segments[1]["label"] | |
| 'bird' | |
| >>> type(segments[0]["mask"]) # This is a black and white mask showing where is the bird on the original image. | |
| <class 'PIL.Image.Image'> | |
| >>> segments[0]["mask"].size | |
| (768, 512) | |
| ``` | |
| This image segmentation pipeline can currently be loaded from [`pipeline`] using the following task identifier: | |
| `"image-segmentation"`. | |
| See the list of available models on | |
| [huggingface.co/models](https://huggingface.co/models?filter=image-segmentation). | |
| """ | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| if self.framework == "tf": | |
| raise ValueError(f"The {self.__class__} is only available in PyTorch.") | |
| requires_backends(self, "vision") | |
| mapping = MODEL_FOR_IMAGE_SEGMENTATION_MAPPING_NAMES.copy() | |
| mapping.update(MODEL_FOR_SEMANTIC_SEGMENTATION_MAPPING_NAMES) | |
| mapping.update(MODEL_FOR_INSTANCE_SEGMENTATION_MAPPING_NAMES) | |
| mapping.update(MODEL_FOR_UNIVERSAL_SEGMENTATION_MAPPING_NAMES) | |
| self.check_model_type(mapping) | |
| def _sanitize_parameters(self, **kwargs): | |
| preprocess_kwargs = {} | |
| postprocess_kwargs = {} | |
| if "subtask" in kwargs: | |
| postprocess_kwargs["subtask"] = kwargs["subtask"] | |
| preprocess_kwargs["subtask"] = kwargs["subtask"] | |
| if "threshold" in kwargs: | |
| postprocess_kwargs["threshold"] = kwargs["threshold"] | |
| if "mask_threshold" in kwargs: | |
| postprocess_kwargs["mask_threshold"] = kwargs["mask_threshold"] | |
| if "overlap_mask_area_threshold" in kwargs: | |
| postprocess_kwargs["overlap_mask_area_threshold"] = kwargs["overlap_mask_area_threshold"] | |
| if "timeout" in kwargs: | |
| preprocess_kwargs["timeout"] = kwargs["timeout"] | |
| return preprocess_kwargs, {}, postprocess_kwargs | |
| def __call__(self, images, **kwargs) -> Union[Predictions, List[Prediction]]: | |
| """ | |
| Perform segmentation (detect masks & classes) in the image(s) passed as inputs. | |
| Args: | |
| images (`str`, `List[str]`, `PIL.Image` or `List[PIL.Image]`): | |
| The pipeline handles three types of images: | |
| - A string containing an HTTP(S) link pointing to an image | |
| - A string containing a local path to an image | |
| - An image loaded in PIL directly | |
| The pipeline accepts either a single image or a batch of images. Images in a batch must all be in the | |
| same format: all as HTTP(S) links, all as local paths, or all as PIL images. | |
| subtask (`str`, *optional*): | |
| Segmentation task to be performed, choose [`semantic`, `instance` and `panoptic`] depending on model | |
| capabilities. If not set, the pipeline will attempt tp resolve in the following order: | |
| `panoptic`, `instance`, `semantic`. | |
| threshold (`float`, *optional*, defaults to 0.9): | |
| Probability threshold to filter out predicted masks. | |
| mask_threshold (`float`, *optional*, defaults to 0.5): | |
| Threshold to use when turning the predicted masks into binary values. | |
| overlap_mask_area_threshold (`float`, *optional*, defaults to 0.5): | |
| Mask overlap threshold to eliminate small, disconnected segments. | |
| timeout (`float`, *optional*, defaults to None): | |
| The maximum time in seconds to wait for fetching images from the web. If None, no timeout is set and | |
| the call may block forever. | |
| Return: | |
| A dictionary or a list of dictionaries containing the result. If the input is a single image, will return a | |
| list of dictionaries, if the input is a list of several images, will return a list of list of dictionaries | |
| corresponding to each image. | |
| The dictionaries contain the mask, label and score (where applicable) of each detected object and contains | |
| the following keys: | |
| - **label** (`str`) -- The class label identified by the model. | |
| - **mask** (`PIL.Image`) -- A binary mask of the detected object as a Pil Image of shape (width, height) of | |
| the original image. Returns a mask filled with zeros if no object is found. | |
| - **score** (*optional* `float`) -- Optionally, when the model is capable of estimating a confidence of the | |
| "object" described by the label and the mask. | |
| """ | |
| return super().__call__(images, **kwargs) | |
| def preprocess(self, image, subtask=None, timeout=None): | |
| image = load_image(image, timeout=timeout) | |
| target_size = [(image.height, image.width)] | |
| if self.model.config.__class__.__name__ == "OneFormerConfig": | |
| if subtask is None: | |
| kwargs = {} | |
| else: | |
| kwargs = {"task_inputs": [subtask]} | |
| inputs = self.image_processor(images=[image], return_tensors="pt", **kwargs) | |
| inputs["task_inputs"] = self.tokenizer( | |
| inputs["task_inputs"], | |
| padding="max_length", | |
| max_length=self.model.config.task_seq_len, | |
| return_tensors=self.framework, | |
| )["input_ids"] | |
| else: | |
| inputs = self.image_processor(images=[image], return_tensors="pt") | |
| inputs["target_size"] = target_size | |
| return inputs | |
| def _forward(self, model_inputs): | |
| target_size = model_inputs.pop("target_size") | |
| model_outputs = self.model(**model_inputs) | |
| model_outputs["target_size"] = target_size | |
| return model_outputs | |
| def postprocess( | |
| self, model_outputs, subtask=None, threshold=0.9, mask_threshold=0.5, overlap_mask_area_threshold=0.5 | |
| ): | |
| fn = None | |
| if subtask in {"panoptic", None} and hasattr(self.image_processor, "post_process_panoptic_segmentation"): | |
| fn = self.image_processor.post_process_panoptic_segmentation | |
| elif subtask in {"instance", None} and hasattr(self.image_processor, "post_process_instance_segmentation"): | |
| fn = self.image_processor.post_process_instance_segmentation | |
| if fn is not None: | |
| outputs = fn( | |
| model_outputs, | |
| threshold=threshold, | |
| mask_threshold=mask_threshold, | |
| overlap_mask_area_threshold=overlap_mask_area_threshold, | |
| target_sizes=model_outputs["target_size"], | |
| )[0] | |
| annotation = [] | |
| segmentation = outputs["segmentation"] | |
| for segment in outputs["segments_info"]: | |
| mask = (segmentation == segment["id"]) * 255 | |
| mask = Image.fromarray(mask.numpy().astype(np.uint8), mode="L") | |
| label = self.model.config.id2label[segment["label_id"]] | |
| score = segment["score"] | |
| annotation.append({"score": score, "label": label, "mask": mask}) | |
| elif subtask in {"semantic", None} and hasattr(self.image_processor, "post_process_semantic_segmentation"): | |
| outputs = self.image_processor.post_process_semantic_segmentation( | |
| model_outputs, target_sizes=model_outputs["target_size"] | |
| )[0] | |
| annotation = [] | |
| segmentation = outputs.numpy() | |
| labels = np.unique(segmentation) | |
| for label in labels: | |
| mask = (segmentation == label) * 255 | |
| mask = Image.fromarray(mask.astype(np.uint8), mode="L") | |
| label = self.model.config.id2label[label] | |
| annotation.append({"score": None, "label": label, "mask": mask}) | |
| else: | |
| raise ValueError(f"Subtask {subtask} is not supported for model {type(self.model)}") | |
| return annotation | |