Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2018 The HuggingFace Inc. team. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import collections | |
import csv | |
import importlib | |
import json | |
import os | |
import pickle | |
import sys | |
import traceback | |
import types | |
import warnings | |
from abc import ABC, abstractmethod | |
from collections import UserDict | |
from contextlib import contextmanager | |
from os.path import abspath, exists | |
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union | |
from ..dynamic_module_utils import custom_object_save | |
from ..feature_extraction_utils import PreTrainedFeatureExtractor | |
from ..image_processing_utils import BaseImageProcessor | |
from ..modelcard import ModelCard | |
from ..models.auto.configuration_auto import AutoConfig | |
from ..tokenization_utils import PreTrainedTokenizer | |
from ..utils import ModelOutput, add_end_docstrings, infer_framework, is_tf_available, is_torch_available, logging | |
GenericTensor = Union[List["GenericTensor"], "torch.Tensor", "tf.Tensor"] | |
if is_tf_available(): | |
import tensorflow as tf | |
from ..models.auto.modeling_tf_auto import TFAutoModel | |
if is_torch_available(): | |
import torch | |
from torch.utils.data import DataLoader, Dataset | |
from ..models.auto.modeling_auto import AutoModel | |
# Re-export for backward compatibility | |
from .pt_utils import KeyDataset | |
else: | |
Dataset = None | |
KeyDataset = None | |
if TYPE_CHECKING: | |
from ..modeling_tf_utils import TFPreTrainedModel | |
from ..modeling_utils import PreTrainedModel | |
logger = logging.get_logger(__name__) | |
def no_collate_fn(items): | |
if len(items) != 1: | |
raise ValueError("This collate_fn is meant to be used with batch_size=1") | |
return items[0] | |
def _pad(items, key, padding_value, padding_side): | |
batch_size = len(items) | |
if isinstance(items[0][key], torch.Tensor): | |
# Others include `attention_mask` etc... | |
shape = items[0][key].shape | |
dim = len(shape) | |
if key in ["pixel_values", "image"]: | |
# This is probable image so padding shouldn't be necessary | |
# B, C, H, W | |
return torch.cat([item[key] for item in items], dim=0) | |
elif dim == 4 and key == "input_features": | |
# this is probably a mel spectrogram batched | |
return torch.cat([item[key] for item in items], dim=0) | |
max_length = max(item[key].shape[1] for item in items) | |
min_length = min(item[key].shape[1] for item in items) | |
dtype = items[0][key].dtype | |
if dim == 2: | |
if max_length == min_length: | |
# Bypass for `ImageGPT` which doesn't provide a padding value, yet | |
# we can consistently pad since the size should be matching | |
return torch.cat([item[key] for item in items], dim=0) | |
tensor = torch.zeros((batch_size, max_length), dtype=dtype) + padding_value | |
elif dim == 3: | |
tensor = torch.zeros((batch_size, max_length, shape[-1]), dtype=dtype) + padding_value | |
elif dim == 4: | |
tensor = torch.zeros((batch_size, max_length, shape[-2], shape[-1]), dtype=dtype) + padding_value | |
for i, item in enumerate(items): | |
if dim == 2: | |
if padding_side == "left": | |
tensor[i, -len(item[key][0]) :] = item[key][0].clone() | |
else: | |
tensor[i, : len(item[key][0])] = item[key][0].clone() | |
elif dim == 3: | |
if padding_side == "left": | |
tensor[i, -len(item[key][0]) :, :] = item[key][0].clone() | |
else: | |
tensor[i, : len(item[key][0]), :] = item[key][0].clone() | |
elif dim == 4: | |
if padding_side == "left": | |
tensor[i, -len(item[key][0]) :, :, :] = item[key][0].clone() | |
else: | |
tensor[i, : len(item[key][0]), :, :] = item[key][0].clone() | |
return tensor | |
else: | |
return [item[key] for item in items] | |
def pad_collate_fn(tokenizer, feature_extractor): | |
# Tokenizer | |
t_padding_side = None | |
# Feature extractor | |
f_padding_side = None | |
if tokenizer is None and feature_extractor is None: | |
raise ValueError("Pipeline without tokenizer or feature_extractor cannot do batching") | |
if tokenizer is not None: | |
if tokenizer.pad_token_id is None: | |
raise ValueError( | |
"Pipeline with tokenizer without pad_token cannot do batching. You can try to set it with " | |
"`pipe.tokenizer.pad_token_id = model.config.eos_token_id`." | |
) | |
else: | |
t_padding_value = tokenizer.pad_token_id | |
t_padding_side = tokenizer.padding_side | |
if feature_extractor is not None: | |
# Feature extractor can be images, where no padding is expected | |
f_padding_value = getattr(feature_extractor, "padding_value", None) | |
f_padding_side = getattr(feature_extractor, "padding_side", None) | |
if t_padding_side is not None and f_padding_side is not None and t_padding_side != f_padding_side: | |
raise ValueError( | |
f"The feature extractor, and tokenizer don't agree on padding side {t_padding_side} != {f_padding_side}" | |
) | |
padding_side = "right" | |
if t_padding_side is not None: | |
padding_side = t_padding_side | |
if f_padding_side is not None: | |
padding_side = f_padding_side | |
def inner(items): | |
keys = set(items[0].keys()) | |
for item in items: | |
if set(item.keys()) != keys: | |
raise ValueError( | |
f"The elements of the batch contain different keys. Cannot batch them ({set(item.keys())} !=" | |
f" {keys})" | |
) | |
# input_values, input_pixels, input_ids, ... | |
padded = {} | |
for key in keys: | |
if key in {"input_ids"}: | |
# ImageGPT uses a feature extractor | |
if tokenizer is None and feature_extractor is not None: | |
_padding_value = f_padding_value | |
else: | |
_padding_value = t_padding_value | |
elif key in {"input_values", "pixel_values", "input_features"}: | |
_padding_value = f_padding_value | |
elif key in {"p_mask", "special_tokens_mask"}: | |
_padding_value = 1 | |
elif key in {"attention_mask", "token_type_ids"}: | |
_padding_value = 0 | |
else: | |
# This is likely another random key maybe even user provided | |
_padding_value = 0 | |
padded[key] = _pad(items, key, _padding_value, padding_side) | |
return padded | |
return inner | |
def infer_framework_load_model( | |
model, | |
config: AutoConfig, | |
model_classes: Optional[Dict[str, Tuple[type]]] = None, | |
task: Optional[str] = None, | |
framework: Optional[str] = None, | |
**model_kwargs, | |
): | |
""" | |
Select framework (TensorFlow or PyTorch) to use from the `model` passed. Returns a tuple (framework, model). | |
If `model` is instantiated, this function will just infer the framework from the model class. Otherwise `model` is | |
actually a checkpoint name and this method will try to instantiate it using `model_classes`. Since we don't want to | |
instantiate the model twice, this model is returned for use by the pipeline. | |
If both frameworks are installed and available for `model`, PyTorch is selected. | |
Args: | |
model (`str`, [`PreTrainedModel`] or [`TFPreTrainedModel`]): | |
The model to infer the framework from. If `str`, a checkpoint name. The model to infer the framewrok from. | |
config ([`AutoConfig`]): | |
The config associated with the model to help using the correct class | |
model_classes (dictionary `str` to `type`, *optional*): | |
A mapping framework to class. | |
task (`str`): | |
The task defining which pipeline will be returned. | |
model_kwargs: | |
Additional dictionary of keyword arguments passed along to the model's `from_pretrained(..., | |
**model_kwargs)` function. | |
Returns: | |
`Tuple`: A tuple framework, model. | |
""" | |
if not is_tf_available() and not is_torch_available(): | |
raise RuntimeError( | |
"At least one of TensorFlow 2.0 or PyTorch should be installed. " | |
"To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ " | |
"To install PyTorch, read the instructions at https://pytorch.org/." | |
) | |
if isinstance(model, str): | |
model_kwargs["_from_pipeline"] = task | |
class_tuple = () | |
look_pt = is_torch_available() and framework in {"pt", None} | |
look_tf = is_tf_available() and framework in {"tf", None} | |
if model_classes: | |
if look_pt: | |
class_tuple = class_tuple + model_classes.get("pt", (AutoModel,)) | |
if look_tf: | |
class_tuple = class_tuple + model_classes.get("tf", (TFAutoModel,)) | |
if config.architectures: | |
classes = [] | |
for architecture in config.architectures: | |
transformers_module = importlib.import_module("transformers") | |
if look_pt: | |
_class = getattr(transformers_module, architecture, None) | |
if _class is not None: | |
classes.append(_class) | |
if look_tf: | |
_class = getattr(transformers_module, f"TF{architecture}", None) | |
if _class is not None: | |
classes.append(_class) | |
class_tuple = class_tuple + tuple(classes) | |
if len(class_tuple) == 0: | |
raise ValueError(f"Pipeline cannot infer suitable model classes from {model}") | |
all_traceback = {} | |
for model_class in class_tuple: | |
kwargs = model_kwargs.copy() | |
if framework == "pt" and model.endswith(".h5"): | |
kwargs["from_tf"] = True | |
logger.warning( | |
"Model might be a TensorFlow model (ending with `.h5`) but TensorFlow is not available. " | |
"Trying to load the model with PyTorch." | |
) | |
elif framework == "tf" and model.endswith(".bin"): | |
kwargs["from_pt"] = True | |
logger.warning( | |
"Model might be a PyTorch model (ending with `.bin`) but PyTorch is not available. " | |
"Trying to load the model with Tensorflow." | |
) | |
try: | |
model = model_class.from_pretrained(model, **kwargs) | |
if hasattr(model, "eval"): | |
model = model.eval() | |
# Stop loading on the first successful load. | |
break | |
except (OSError, ValueError): | |
all_traceback[model_class.__name__] = traceback.format_exc() | |
continue | |
if isinstance(model, str): | |
error = "" | |
for class_name, trace in all_traceback.items(): | |
error += f"while loading with {class_name}, an error is thrown:\n{trace}\n" | |
raise ValueError( | |
f"Could not load model {model} with any of the following classes: {class_tuple}. See the original errors:\n\n{error}\n" | |
) | |
if framework is None: | |
framework = infer_framework(model.__class__) | |
return framework, model | |
def infer_framework_from_model( | |
model, | |
model_classes: Optional[Dict[str, Tuple[type]]] = None, | |
task: Optional[str] = None, | |
framework: Optional[str] = None, | |
**model_kwargs, | |
): | |
""" | |
Select framework (TensorFlow or PyTorch) to use from the `model` passed. Returns a tuple (framework, model). | |
If `model` is instantiated, this function will just infer the framework from the model class. Otherwise `model` is | |
actually a checkpoint name and this method will try to instantiate it using `model_classes`. Since we don't want to | |
instantiate the model twice, this model is returned for use by the pipeline. | |
If both frameworks are installed and available for `model`, PyTorch is selected. | |
Args: | |
model (`str`, [`PreTrainedModel`] or [`TFPreTrainedModel`]): | |
The model to infer the framework from. If `str`, a checkpoint name. The model to infer the framewrok from. | |
model_classes (dictionary `str` to `type`, *optional*): | |
A mapping framework to class. | |
task (`str`): | |
The task defining which pipeline will be returned. | |
model_kwargs: | |
Additional dictionary of keyword arguments passed along to the model's `from_pretrained(..., | |
**model_kwargs)` function. | |
Returns: | |
`Tuple`: A tuple framework, model. | |
""" | |
if isinstance(model, str): | |
config = AutoConfig.from_pretrained(model, _from_pipeline=task, **model_kwargs) | |
else: | |
config = model.config | |
return infer_framework_load_model( | |
model, config, model_classes=model_classes, _from_pipeline=task, task=task, framework=framework, **model_kwargs | |
) | |
def get_framework(model, revision: Optional[str] = None): | |
""" | |
Select framework (TensorFlow or PyTorch) to use. | |
Args: | |
model (`str`, [`PreTrainedModel`] or [`TFPreTrainedModel`]): | |
If both frameworks are installed, picks the one corresponding to the model passed (either a model class or | |
the model name). If no specific model is provided, defaults to using PyTorch. | |
""" | |
warnings.warn( | |
"`get_framework` is deprecated and will be removed in v5, use `infer_framework_from_model` instead.", | |
FutureWarning, | |
) | |
if not is_tf_available() and not is_torch_available(): | |
raise RuntimeError( | |
"At least one of TensorFlow 2.0 or PyTorch should be installed. " | |
"To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ " | |
"To install PyTorch, read the instructions at https://pytorch.org/." | |
) | |
if isinstance(model, str): | |
if is_torch_available() and not is_tf_available(): | |
model = AutoModel.from_pretrained(model, revision=revision) | |
elif is_tf_available() and not is_torch_available(): | |
model = TFAutoModel.from_pretrained(model, revision=revision) | |
else: | |
try: | |
model = AutoModel.from_pretrained(model, revision=revision) | |
except OSError: | |
model = TFAutoModel.from_pretrained(model, revision=revision) | |
framework = infer_framework(model.__class__) | |
return framework | |
def get_default_model_and_revision( | |
targeted_task: Dict, framework: Optional[str], task_options: Optional[Any] | |
) -> Union[str, Tuple[str, str]]: | |
""" | |
Select a default model to use for a given task. Defaults to pytorch if ambiguous. | |
Args: | |
targeted_task (`Dict` ): | |
Dictionary representing the given task, that should contain default models | |
framework (`str`, None) | |
"pt", "tf" or None, representing a specific framework if it was specified, or None if we don't know yet. | |
task_options (`Any`, None) | |
Any further value required by the task to get fully specified, for instance (SRC, TGT) languages for | |
translation task. | |
Returns | |
`str` The model string representing the default model for this pipeline | |
""" | |
if is_torch_available() and not is_tf_available(): | |
framework = "pt" | |
elif is_tf_available() and not is_torch_available(): | |
framework = "tf" | |
defaults = targeted_task["default"] | |
if task_options: | |
if task_options not in defaults: | |
raise ValueError(f"The task does not provide any default models for options {task_options}") | |
default_models = defaults[task_options]["model"] | |
elif "model" in defaults: | |
default_models = targeted_task["default"]["model"] | |
else: | |
# XXX This error message needs to be updated to be more generic if more tasks are going to become | |
# parametrized | |
raise ValueError('The task defaults can\'t be correctly selected. You probably meant "translation_XX_to_YY"') | |
if framework is None: | |
framework = "pt" | |
return default_models[framework] | |
class PipelineException(Exception): | |
""" | |
Raised by a [`Pipeline`] when handling __call__. | |
Args: | |
task (`str`): The task of the pipeline. | |
model (`str`): The model used by the pipeline. | |
reason (`str`): The error message to display. | |
""" | |
def __init__(self, task: str, model: str, reason: str): | |
super().__init__(reason) | |
self.task = task | |
self.model = model | |
class ArgumentHandler(ABC): | |
""" | |
Base interface for handling arguments for each [`~pipelines.Pipeline`]. | |
""" | |
def __call__(self, *args, **kwargs): | |
raise NotImplementedError() | |
class PipelineDataFormat: | |
""" | |
Base class for all the pipeline supported data format both for reading and writing. Supported data formats | |
currently includes: | |
- JSON | |
- CSV | |
- stdin/stdout (pipe) | |
`PipelineDataFormat` also includes some utilities to work with multi-columns like mapping from datasets columns to | |
pipelines keyword arguments through the `dataset_kwarg_1=dataset_column_1` format. | |
Args: | |
output_path (`str`): Where to save the outgoing data. | |
input_path (`str`): Where to look for the input data. | |
column (`str`): The column to read. | |
overwrite (`bool`, *optional*, defaults to `False`): | |
Whether or not to overwrite the `output_path`. | |
""" | |
SUPPORTED_FORMATS = ["json", "csv", "pipe"] | |
def __init__( | |
self, | |
output_path: Optional[str], | |
input_path: Optional[str], | |
column: Optional[str], | |
overwrite: bool = False, | |
): | |
self.output_path = output_path | |
self.input_path = input_path | |
self.column = column.split(",") if column is not None else [""] | |
self.is_multi_columns = len(self.column) > 1 | |
if self.is_multi_columns: | |
self.column = [tuple(c.split("=")) if "=" in c else (c, c) for c in self.column] | |
if output_path is not None and not overwrite: | |
if exists(abspath(self.output_path)): | |
raise OSError(f"{self.output_path} already exists on disk") | |
if input_path is not None: | |
if not exists(abspath(self.input_path)): | |
raise OSError(f"{self.input_path} doesnt exist on disk") | |
def __iter__(self): | |
raise NotImplementedError() | |
def save(self, data: Union[dict, List[dict]]): | |
""" | |
Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`]. | |
Args: | |
data (`dict` or list of `dict`): The data to store. | |
""" | |
raise NotImplementedError() | |
def save_binary(self, data: Union[dict, List[dict]]) -> str: | |
""" | |
Save the provided data object as a pickle-formatted binary data on the disk. | |
Args: | |
data (`dict` or list of `dict`): The data to store. | |
Returns: | |
`str`: Path where the data has been saved. | |
""" | |
path, _ = os.path.splitext(self.output_path) | |
binary_path = os.path.extsep.join((path, "pickle")) | |
with open(binary_path, "wb+") as f_output: | |
pickle.dump(data, f_output) | |
return binary_path | |
def from_str( | |
format: str, | |
output_path: Optional[str], | |
input_path: Optional[str], | |
column: Optional[str], | |
overwrite=False, | |
) -> "PipelineDataFormat": | |
""" | |
Creates an instance of the right subclass of [`~pipelines.PipelineDataFormat`] depending on `format`. | |
Args: | |
format (`str`): | |
The format of the desired pipeline. Acceptable values are `"json"`, `"csv"` or `"pipe"`. | |
output_path (`str`, *optional*): | |
Where to save the outgoing data. | |
input_path (`str`, *optional*): | |
Where to look for the input data. | |
column (`str`, *optional*): | |
The column to read. | |
overwrite (`bool`, *optional*, defaults to `False`): | |
Whether or not to overwrite the `output_path`. | |
Returns: | |
[`~pipelines.PipelineDataFormat`]: The proper data format. | |
""" | |
if format == "json": | |
return JsonPipelineDataFormat(output_path, input_path, column, overwrite=overwrite) | |
elif format == "csv": | |
return CsvPipelineDataFormat(output_path, input_path, column, overwrite=overwrite) | |
elif format == "pipe": | |
return PipedPipelineDataFormat(output_path, input_path, column, overwrite=overwrite) | |
else: | |
raise KeyError(f"Unknown reader {format} (Available reader are json/csv/pipe)") | |
class CsvPipelineDataFormat(PipelineDataFormat): | |
""" | |
Support for pipelines using CSV data format. | |
Args: | |
output_path (`str`): Where to save the outgoing data. | |
input_path (`str`): Where to look for the input data. | |
column (`str`): The column to read. | |
overwrite (`bool`, *optional*, defaults to `False`): | |
Whether or not to overwrite the `output_path`. | |
""" | |
def __init__( | |
self, | |
output_path: Optional[str], | |
input_path: Optional[str], | |
column: Optional[str], | |
overwrite=False, | |
): | |
super().__init__(output_path, input_path, column, overwrite=overwrite) | |
def __iter__(self): | |
with open(self.input_path, "r") as f: | |
reader = csv.DictReader(f) | |
for row in reader: | |
if self.is_multi_columns: | |
yield {k: row[c] for k, c in self.column} | |
else: | |
yield row[self.column[0]] | |
def save(self, data: List[dict]): | |
""" | |
Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`]. | |
Args: | |
data (`List[dict]`): The data to store. | |
""" | |
with open(self.output_path, "w") as f: | |
if len(data) > 0: | |
writer = csv.DictWriter(f, list(data[0].keys())) | |
writer.writeheader() | |
writer.writerows(data) | |
class JsonPipelineDataFormat(PipelineDataFormat): | |
""" | |
Support for pipelines using JSON file format. | |
Args: | |
output_path (`str`): Where to save the outgoing data. | |
input_path (`str`): Where to look for the input data. | |
column (`str`): The column to read. | |
overwrite (`bool`, *optional*, defaults to `False`): | |
Whether or not to overwrite the `output_path`. | |
""" | |
def __init__( | |
self, | |
output_path: Optional[str], | |
input_path: Optional[str], | |
column: Optional[str], | |
overwrite=False, | |
): | |
super().__init__(output_path, input_path, column, overwrite=overwrite) | |
with open(input_path, "r") as f: | |
self._entries = json.load(f) | |
def __iter__(self): | |
for entry in self._entries: | |
if self.is_multi_columns: | |
yield {k: entry[c] for k, c in self.column} | |
else: | |
yield entry[self.column[0]] | |
def save(self, data: dict): | |
""" | |
Save the provided data object in a json file. | |
Args: | |
data (`dict`): The data to store. | |
""" | |
with open(self.output_path, "w") as f: | |
json.dump(data, f) | |
class PipedPipelineDataFormat(PipelineDataFormat): | |
""" | |
Read data from piped input to the python process. For multi columns data, columns should separated by \t | |
If columns are provided, then the output will be a dictionary with {column_x: value_x} | |
Args: | |
output_path (`str`): Where to save the outgoing data. | |
input_path (`str`): Where to look for the input data. | |
column (`str`): The column to read. | |
overwrite (`bool`, *optional*, defaults to `False`): | |
Whether or not to overwrite the `output_path`. | |
""" | |
def __iter__(self): | |
for line in sys.stdin: | |
# Split for multi-columns | |
if "\t" in line: | |
line = line.split("\t") | |
if self.column: | |
# Dictionary to map arguments | |
yield {kwargs: l for (kwargs, _), l in zip(self.column, line)} | |
else: | |
yield tuple(line) | |
# No dictionary to map arguments | |
else: | |
yield line | |
def save(self, data: dict): | |
""" | |
Print the data. | |
Args: | |
data (`dict`): The data to store. | |
""" | |
print(data) | |
def save_binary(self, data: Union[dict, List[dict]]) -> str: | |
if self.output_path is None: | |
raise KeyError( | |
"When using piped input on pipeline outputting large object requires an output file path. " | |
"Please provide such output path through --output argument." | |
) | |
return super().save_binary(data) | |
class _ScikitCompat(ABC): | |
""" | |
Interface layer for the Scikit and Keras compatibility. | |
""" | |
def transform(self, X): | |
raise NotImplementedError() | |
def predict(self, X): | |
raise NotImplementedError() | |
PIPELINE_INIT_ARGS = r""" | |
Arguments: | |
model ([`PreTrainedModel`] or [`TFPreTrainedModel`]): | |
The model that will be used by the pipeline to make predictions. This needs to be a model inheriting from | |
[`PreTrainedModel`] for PyTorch and [`TFPreTrainedModel`] for TensorFlow. | |
tokenizer ([`PreTrainedTokenizer`]): | |
The tokenizer that will be used by the pipeline to encode data for the model. This object inherits from | |
[`PreTrainedTokenizer`]. | |
modelcard (`str` or [`ModelCard`], *optional*): | |
Model card attributed to the model for this pipeline. | |
framework (`str`, *optional*): | |
The framework to use, either `"pt"` for PyTorch or `"tf"` for TensorFlow. The specified framework must be | |
installed. | |
If no framework is specified, will default to the one currently installed. If no framework is specified and | |
both frameworks are installed, will default to the framework of the `model`, or to PyTorch if no model is | |
provided. | |
task (`str`, defaults to `""`): | |
A task-identifier for the pipeline. | |
num_workers (`int`, *optional*, defaults to 8): | |
When the pipeline will use *DataLoader* (when passing a dataset, on GPU for a Pytorch model), the number of | |
workers to be used. | |
batch_size (`int`, *optional*, defaults to 1): | |
When the pipeline will use *DataLoader* (when passing a dataset, on GPU for a Pytorch model), the size of | |
the batch to use, for inference this is not always beneficial, please read [Batching with | |
pipelines](https://huggingface.co/transformers/main_classes/pipelines.html#pipeline-batching) . | |
args_parser ([`~pipelines.ArgumentHandler`], *optional*): | |
Reference to the object in charge of parsing supplied pipeline parameters. | |
device (`int`, *optional*, defaults to -1): | |
Device ordinal for CPU/GPU supports. Setting this to -1 will leverage CPU, a positive will run the model on | |
the associated CUDA device id. You can pass native `torch.device` or a `str` too. | |
binary_output (`bool`, *optional*, defaults to `False`): | |
Flag indicating if the output the pipeline should happen in a binary format (i.e., pickle) or as raw text. | |
""" | |
if is_torch_available(): | |
from transformers.pipelines.pt_utils import ( | |
PipelineChunkIterator, | |
PipelineDataset, | |
PipelineIterator, | |
PipelinePackIterator, | |
) | |
class Pipeline(_ScikitCompat): | |
""" | |
The Pipeline class is the class from which all pipelines inherit. Refer to this class for methods shared across | |
different pipelines. | |
Base class implementing pipelined operations. Pipeline workflow is defined as a sequence of the following | |
operations: | |
Input -> Tokenization -> Model Inference -> Post-Processing (task dependent) -> Output | |
Pipeline supports running on CPU or GPU through the device argument (see below). | |
Some pipeline, like for instance [`FeatureExtractionPipeline`] (`'feature-extraction'`) output large tensor object | |
as nested-lists. In order to avoid dumping such large structure as textual data we provide the `binary_output` | |
constructor argument. If set to `True`, the output will be stored in the pickle format. | |
""" | |
default_input_names = None | |
def __init__( | |
self, | |
model: Union["PreTrainedModel", "TFPreTrainedModel"], | |
tokenizer: Optional[PreTrainedTokenizer] = None, | |
feature_extractor: Optional[PreTrainedFeatureExtractor] = None, | |
image_processor: Optional[BaseImageProcessor] = None, | |
modelcard: Optional[ModelCard] = None, | |
framework: Optional[str] = None, | |
task: str = "", | |
args_parser: ArgumentHandler = None, | |
device: Union[int, "torch.device"] = None, | |
torch_dtype: Optional[Union[str, "torch.dtype"]] = None, | |
binary_output: bool = False, | |
**kwargs, | |
): | |
if framework is None: | |
framework, model = infer_framework_load_model(model, config=model.config) | |
self.task = task | |
self.model = model | |
self.tokenizer = tokenizer | |
self.feature_extractor = feature_extractor | |
self.image_processor = image_processor | |
self.modelcard = modelcard | |
self.framework = framework | |
# `accelerate` device map | |
hf_device_map = getattr(self.model, "hf_device_map", None) | |
if hf_device_map is not None and device is not None: | |
raise ValueError( | |
"The model has been loaded with `accelerate` and therefore cannot be moved to a specific device. Please " | |
"discard the `device` argument when creating your pipeline object." | |
) | |
# We shouldn't call `model.to()` for models loaded with accelerate | |
if self.framework == "pt" and device is not None and not (isinstance(device, int) and device < 0): | |
self.model.to(device) | |
if device is None: | |
if hf_device_map is not None: | |
# Take the first device used by `accelerate`. | |
device = next(iter(hf_device_map.values())) | |
else: | |
device = -1 | |
if is_torch_available() and self.framework == "pt": | |
if isinstance(device, torch.device): | |
self.device = device | |
elif isinstance(device, str): | |
self.device = torch.device(device) | |
elif device < 0: | |
self.device = torch.device("cpu") | |
else: | |
self.device = torch.device(f"cuda:{device}") | |
else: | |
self.device = device if device is not None else -1 | |
self.torch_dtype = torch_dtype | |
self.binary_output = binary_output | |
# Update config and generation_config with task specific parameters | |
task_specific_params = self.model.config.task_specific_params | |
if task_specific_params is not None and task in task_specific_params: | |
self.model.config.update(task_specific_params.get(task)) | |
if self.model.can_generate(): | |
self.model.generation_config.update(**task_specific_params.get(task)) | |
self.call_count = 0 | |
self._batch_size = kwargs.pop("batch_size", None) | |
self._num_workers = kwargs.pop("num_workers", None) | |
self._preprocess_params, self._forward_params, self._postprocess_params = self._sanitize_parameters(**kwargs) | |
if self.image_processor is None and self.feature_extractor is not None: | |
if isinstance(self.feature_extractor, BaseImageProcessor): | |
# Backward compatible change, if users called | |
# ImageSegmentationPipeline(.., feature_extractor=MyFeatureExtractor()) | |
# then we should keep working | |
self.image_processor = self.feature_extractor | |
def save_pretrained(self, save_directory: str, safe_serialization: bool = False): | |
""" | |
Save the pipeline's model and tokenizer. | |
Args: | |
save_directory (`str`): | |
A path to the directory where to saved. It will be created if it doesn't exist. | |
safe_serialization (`str`): | |
Whether to save the model using `safetensors` or the traditional way for PyTorch or Tensorflow | |
""" | |
if os.path.isfile(save_directory): | |
logger.error(f"Provided path ({save_directory}) should be a directory, not a file") | |
return | |
os.makedirs(save_directory, exist_ok=True) | |
if hasattr(self, "_registered_impl"): | |
# Add info to the config | |
pipeline_info = self._registered_impl.copy() | |
custom_pipelines = {} | |
for task, info in pipeline_info.items(): | |
if info["impl"] != self.__class__: | |
continue | |
info = info.copy() | |
module_name = info["impl"].__module__ | |
last_module = module_name.split(".")[-1] | |
# Change classes into their names/full names | |
info["impl"] = f"{last_module}.{info['impl'].__name__}" | |
info["pt"] = tuple(c.__name__ for c in info["pt"]) | |
info["tf"] = tuple(c.__name__ for c in info["tf"]) | |
custom_pipelines[task] = info | |
self.model.config.custom_pipelines = custom_pipelines | |
# Save the pipeline custom code | |
custom_object_save(self, save_directory) | |
self.model.save_pretrained(save_directory, safe_serialization=safe_serialization) | |
if self.tokenizer is not None: | |
self.tokenizer.save_pretrained(save_directory) | |
if self.feature_extractor is not None: | |
self.feature_extractor.save_pretrained(save_directory) | |
if self.image_processor is not None: | |
self.image_processor.save_pretrained(save_directory) | |
if self.modelcard is not None: | |
self.modelcard.save_pretrained(save_directory) | |
def transform(self, X): | |
""" | |
Scikit / Keras interface to transformers' pipelines. This method will forward to __call__(). | |
""" | |
return self(X) | |
def predict(self, X): | |
""" | |
Scikit / Keras interface to transformers' pipelines. This method will forward to __call__(). | |
""" | |
return self(X) | |
def device_placement(self): | |
""" | |
Context Manager allowing tensor allocation on the user-specified device in framework agnostic way. | |
Returns: | |
Context manager | |
Examples: | |
```python | |
# Explicitly ask for tensor allocation on CUDA device :0 | |
pipe = pipeline(..., device=0) | |
with pipe.device_placement(): | |
# Every framework specific tensor allocation will be done on the request device | |
output = pipe(...) | |
```""" | |
if self.framework == "tf": | |
with tf.device("/CPU:0" if self.device == -1 else f"/device:GPU:{self.device}"): | |
yield | |
else: | |
if self.device.type == "cuda": | |
with torch.cuda.device(self.device): | |
yield | |
else: | |
yield | |
def ensure_tensor_on_device(self, **inputs): | |
""" | |
Ensure PyTorch tensors are on the specified device. | |
Args: | |
inputs (keyword arguments that should be `torch.Tensor`, the rest is ignored): | |
The tensors to place on `self.device`. | |
Recursive on lists **only**. | |
Return: | |
`Dict[str, torch.Tensor]`: The same as `inputs` but on the proper device. | |
""" | |
return self._ensure_tensor_on_device(inputs, self.device) | |
def _ensure_tensor_on_device(self, inputs, device): | |
if isinstance(inputs, ModelOutput): | |
return ModelOutput( | |
{name: self._ensure_tensor_on_device(tensor, device) for name, tensor in inputs.items()} | |
) | |
elif isinstance(inputs, dict): | |
return {name: self._ensure_tensor_on_device(tensor, device) for name, tensor in inputs.items()} | |
elif isinstance(inputs, UserDict): | |
return UserDict({name: self._ensure_tensor_on_device(tensor, device) for name, tensor in inputs.items()}) | |
elif isinstance(inputs, list): | |
return [self._ensure_tensor_on_device(item, device) for item in inputs] | |
elif isinstance(inputs, tuple): | |
return tuple([self._ensure_tensor_on_device(item, device) for item in inputs]) | |
elif isinstance(inputs, torch.Tensor): | |
if device == torch.device("cpu") and inputs.dtype in {torch.float16, torch.bfloat16}: | |
inputs = inputs.float() | |
return inputs.to(device) | |
else: | |
return inputs | |
def check_model_type(self, supported_models: Union[List[str], dict]): | |
""" | |
Check if the model class is in supported by the pipeline. | |
Args: | |
supported_models (`List[str]` or `dict`): | |
The list of models supported by the pipeline, or a dictionary with model class values. | |
""" | |
if not isinstance(supported_models, list): # Create from a model mapping | |
supported_models_names = [] | |
for _, model_name in supported_models.items(): | |
# Mapping can now contain tuples of models for the same configuration. | |
if isinstance(model_name, tuple): | |
supported_models_names.extend(list(model_name)) | |
else: | |
supported_models_names.append(model_name) | |
if hasattr(supported_models, "_model_mapping"): | |
for _, model in supported_models._model_mapping._extra_content.items(): | |
if isinstance(model_name, tuple): | |
supported_models_names.extend([m.__name__ for m in model]) | |
else: | |
supported_models_names.append(model.__name__) | |
supported_models = supported_models_names | |
if self.model.__class__.__name__ not in supported_models: | |
logger.error( | |
f"The model '{self.model.__class__.__name__}' is not supported for {self.task}. Supported models are" | |
f" {supported_models}." | |
) | |
def _sanitize_parameters(self, **pipeline_parameters): | |
""" | |
_sanitize_parameters will be called with any excessive named arguments from either `__init__` or `__call__` | |
methods. It should return 3 dictionnaries of the resolved parameters used by the various `preprocess`, | |
`forward` and `postprocess` methods. Do not fill dictionnaries if the caller didn't specify a kwargs. This | |
let's you keep defaults in function signatures, which is more "natural". | |
It is not meant to be called directly, it will be automatically called and the final parameters resolved by | |
`__init__` and `__call__` | |
""" | |
raise NotImplementedError("_sanitize_parameters not implemented") | |
def preprocess(self, input_: Any, **preprocess_parameters: Dict) -> Dict[str, GenericTensor]: | |
""" | |
Preprocess will take the `input_` of a specific pipeline and return a dictionary of everything necessary for | |
`_forward` to run properly. It should contain at least one tensor, but might have arbitrary other items. | |
""" | |
raise NotImplementedError("preprocess not implemented") | |
def _forward(self, input_tensors: Dict[str, GenericTensor], **forward_parameters: Dict) -> ModelOutput: | |
""" | |
_forward will receive the prepared dictionary from `preprocess` and run it on the model. This method might | |
involve the GPU or the CPU and should be agnostic to it. Isolating this function is the reason for `preprocess` | |
and `postprocess` to exist, so that the hot path, this method generally can run as fast as possible. | |
It is not meant to be called directly, `forward` is preferred. It is basically the same but contains additional | |
code surrounding `_forward` making sure tensors and models are on the same device, disabling the training part | |
of the code (leading to faster inference). | |
""" | |
raise NotImplementedError("_forward not implemented") | |
def postprocess(self, model_outputs: ModelOutput, **postprocess_parameters: Dict) -> Any: | |
""" | |
Postprocess will receive the raw outputs of the `_forward` method, generally tensors, and reformat them into | |
something more friendly. Generally it will output a list or a dict or results (containing just strings and | |
numbers). | |
""" | |
raise NotImplementedError("postprocess not implemented") | |
def get_inference_context(self): | |
return torch.no_grad | |
def forward(self, model_inputs, **forward_params): | |
with self.device_placement(): | |
if self.framework == "tf": | |
model_inputs["training"] = False | |
model_outputs = self._forward(model_inputs, **forward_params) | |
elif self.framework == "pt": | |
inference_context = self.get_inference_context() | |
with inference_context(): | |
model_inputs = self._ensure_tensor_on_device(model_inputs, device=self.device) | |
model_outputs = self._forward(model_inputs, **forward_params) | |
model_outputs = self._ensure_tensor_on_device(model_outputs, device=torch.device("cpu")) | |
else: | |
raise ValueError(f"Framework {self.framework} is not supported") | |
return model_outputs | |
def get_iterator( | |
self, inputs, num_workers: int, batch_size: int, preprocess_params, forward_params, postprocess_params | |
): | |
if isinstance(inputs, collections.abc.Sized): | |
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params) | |
else: | |
if num_workers > 1: | |
logger.warning( | |
"For iterable dataset using num_workers>1 is likely to result" | |
" in errors since everything is iterable, setting `num_workers=1`" | |
" to guarantee correctness." | |
) | |
num_workers = 1 | |
dataset = PipelineIterator(inputs, self.preprocess, preprocess_params) | |
if "TOKENIZERS_PARALLELISM" not in os.environ: | |
logger.info("Disabling tokenizer parallelism, we're using DataLoader multithreading already") | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
# TODO hack by collating feature_extractor and image_processor | |
feature_extractor = self.feature_extractor if self.feature_extractor is not None else self.image_processor | |
collate_fn = no_collate_fn if batch_size == 1 else pad_collate_fn(self.tokenizer, feature_extractor) | |
dataloader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, collate_fn=collate_fn) | |
model_iterator = PipelineIterator(dataloader, self.forward, forward_params, loader_batch_size=batch_size) | |
final_iterator = PipelineIterator(model_iterator, self.postprocess, postprocess_params) | |
return final_iterator | |
def __call__(self, inputs, *args, num_workers=None, batch_size=None, **kwargs): | |
if args: | |
logger.warning(f"Ignoring args : {args}") | |
if num_workers is None: | |
if self._num_workers is None: | |
num_workers = 0 | |
else: | |
num_workers = self._num_workers | |
if batch_size is None: | |
if self._batch_size is None: | |
batch_size = 1 | |
else: | |
batch_size = self._batch_size | |
preprocess_params, forward_params, postprocess_params = self._sanitize_parameters(**kwargs) | |
# Fuse __init__ params and __call__ params without modifying the __init__ ones. | |
preprocess_params = {**self._preprocess_params, **preprocess_params} | |
forward_params = {**self._forward_params, **forward_params} | |
postprocess_params = {**self._postprocess_params, **postprocess_params} | |
self.call_count += 1 | |
if self.call_count > 10 and self.framework == "pt" and self.device.type == "cuda": | |
warnings.warn( | |
"You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a" | |
" dataset", | |
UserWarning, | |
) | |
is_dataset = Dataset is not None and isinstance(inputs, Dataset) | |
is_generator = isinstance(inputs, types.GeneratorType) | |
is_list = isinstance(inputs, list) | |
is_iterable = is_dataset or is_generator or is_list | |
# TODO make the get_iterator work also for `tf` (and `flax`). | |
can_use_iterator = self.framework == "pt" and (is_dataset or is_generator or is_list) | |
if is_list: | |
if can_use_iterator: | |
final_iterator = self.get_iterator( | |
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params | |
) | |
outputs = list(final_iterator) | |
return outputs | |
else: | |
return self.run_multi(inputs, preprocess_params, forward_params, postprocess_params) | |
elif can_use_iterator: | |
return self.get_iterator( | |
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params | |
) | |
elif is_iterable: | |
return self.iterate(inputs, preprocess_params, forward_params, postprocess_params) | |
elif self.framework == "pt" and isinstance(self, ChunkPipeline): | |
return next( | |
iter( | |
self.get_iterator( | |
[inputs], num_workers, batch_size, preprocess_params, forward_params, postprocess_params | |
) | |
) | |
) | |
else: | |
return self.run_single(inputs, preprocess_params, forward_params, postprocess_params) | |
def run_multi(self, inputs, preprocess_params, forward_params, postprocess_params): | |
return [self.run_single(item, preprocess_params, forward_params, postprocess_params) for item in inputs] | |
def run_single(self, inputs, preprocess_params, forward_params, postprocess_params): | |
model_inputs = self.preprocess(inputs, **preprocess_params) | |
model_outputs = self.forward(model_inputs, **forward_params) | |
outputs = self.postprocess(model_outputs, **postprocess_params) | |
return outputs | |
def iterate(self, inputs, preprocess_params, forward_params, postprocess_params): | |
# This function should become `get_iterator` again, this is a temporary | |
# easy solution. | |
for input_ in inputs: | |
yield self.run_single(input_, preprocess_params, forward_params, postprocess_params) | |
class ChunkPipeline(Pipeline): | |
def run_single(self, inputs, preprocess_params, forward_params, postprocess_params): | |
all_outputs = [] | |
for model_inputs in self.preprocess(inputs, **preprocess_params): | |
model_outputs = self.forward(model_inputs, **forward_params) | |
all_outputs.append(model_outputs) | |
outputs = self.postprocess(all_outputs, **postprocess_params) | |
return outputs | |
def get_iterator( | |
self, inputs, num_workers: int, batch_size: int, preprocess_params, forward_params, postprocess_params | |
): | |
if "TOKENIZERS_PARALLELISM" not in os.environ: | |
logger.info("Disabling tokenizer parallelism, we're using DataLoader multithreading already") | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
if num_workers > 1: | |
logger.warning( | |
"For ChunkPipeline using num_workers>0 is likely to result in errors since everything is iterable," | |
" setting `num_workers=1` to guarantee correctness." | |
) | |
num_workers = 1 | |
dataset = PipelineChunkIterator(inputs, self.preprocess, preprocess_params) | |
# TODO hack by collating feature_extractor and image_processor | |
feature_extractor = self.feature_extractor if self.feature_extractor is not None else self.image_processor | |
collate_fn = no_collate_fn if batch_size == 1 else pad_collate_fn(self.tokenizer, feature_extractor) | |
dataloader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, collate_fn=collate_fn) | |
model_iterator = PipelinePackIterator(dataloader, self.forward, forward_params, loader_batch_size=batch_size) | |
final_iterator = PipelineIterator(model_iterator, self.postprocess, postprocess_params) | |
return final_iterator | |
class PipelineRegistry: | |
def __init__(self, supported_tasks: Dict[str, Any], task_aliases: Dict[str, str]) -> None: | |
self.supported_tasks = supported_tasks | |
self.task_aliases = task_aliases | |
def get_supported_tasks(self) -> List[str]: | |
supported_task = list(self.supported_tasks.keys()) + list(self.task_aliases.keys()) | |
supported_task.sort() | |
return supported_task | |
def check_task(self, task: str) -> Tuple[str, Dict, Any]: | |
if task in self.task_aliases: | |
task = self.task_aliases[task] | |
if task in self.supported_tasks: | |
targeted_task = self.supported_tasks[task] | |
return task, targeted_task, None | |
if task.startswith("translation"): | |
tokens = task.split("_") | |
if len(tokens) == 4 and tokens[0] == "translation" and tokens[2] == "to": | |
targeted_task = self.supported_tasks["translation"] | |
task = "translation" | |
return task, targeted_task, (tokens[1], tokens[3]) | |
raise KeyError(f"Invalid translation task {task}, use 'translation_XX_to_YY' format") | |
raise KeyError( | |
f"Unknown task {task}, available tasks are {self.get_supported_tasks() + ['translation_XX_to_YY']}" | |
) | |
def register_pipeline( | |
self, | |
task: str, | |
pipeline_class: type, | |
pt_model: Optional[Union[type, Tuple[type]]] = None, | |
tf_model: Optional[Union[type, Tuple[type]]] = None, | |
default: Optional[Dict] = None, | |
type: Optional[str] = None, | |
) -> None: | |
if task in self.supported_tasks: | |
logger.warning(f"{task} is already registered. Overwriting pipeline for task {task}...") | |
if pt_model is None: | |
pt_model = () | |
elif not isinstance(pt_model, tuple): | |
pt_model = (pt_model,) | |
if tf_model is None: | |
tf_model = () | |
elif not isinstance(tf_model, tuple): | |
tf_model = (tf_model,) | |
task_impl = {"impl": pipeline_class, "pt": pt_model, "tf": tf_model} | |
if default is not None: | |
if "model" not in default and ("pt" in default or "tf" in default): | |
default = {"model": default} | |
task_impl["default"] = default | |
if type is not None: | |
task_impl["type"] = type | |
self.supported_tasks[task] = task_impl | |
pipeline_class._registered_impl = {task: task_impl} | |
def to_dict(self): | |
return self.supported_tasks | |