|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import collections |
|
import csv |
|
import importlib |
|
import json |
|
import os |
|
import pickle |
|
import sys |
|
import traceback |
|
import types |
|
import warnings |
|
from abc import ABC, abstractmethod |
|
from collections import UserDict |
|
from contextlib import contextmanager |
|
from os.path import abspath, exists |
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union |
|
|
|
from ..dynamic_module_utils import custom_object_save |
|
from ..feature_extraction_utils import PreTrainedFeatureExtractor |
|
from ..image_processing_utils import BaseImageProcessor |
|
from ..modelcard import ModelCard |
|
from ..models.auto.configuration_auto import AutoConfig |
|
from ..tokenization_utils import PreTrainedTokenizer |
|
from ..utils import ModelOutput, add_end_docstrings, infer_framework, is_tf_available, is_torch_available, logging |
|
|
|
|
|
GenericTensor = Union[List["GenericTensor"], "torch.Tensor", "tf.Tensor"] |
|
|
|
if is_tf_available(): |
|
import tensorflow as tf |
|
|
|
from ..models.auto.modeling_tf_auto import TFAutoModel |
|
|
|
if is_torch_available(): |
|
import torch |
|
from torch.utils.data import DataLoader, Dataset |
|
|
|
from ..models.auto.modeling_auto import AutoModel |
|
|
|
|
|
from .pt_utils import KeyDataset |
|
else: |
|
Dataset = None |
|
KeyDataset = None |
|
|
|
if TYPE_CHECKING: |
|
from ..modeling_tf_utils import TFPreTrainedModel |
|
from ..modeling_utils import PreTrainedModel |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
def no_collate_fn(items): |
|
if len(items) != 1: |
|
raise ValueError("This collate_fn is meant to be used with batch_size=1") |
|
return items[0] |
|
|
|
|
|
def _pad(items, key, padding_value, padding_side): |
|
batch_size = len(items) |
|
if isinstance(items[0][key], torch.Tensor): |
|
|
|
shape = items[0][key].shape |
|
dim = len(shape) |
|
if key in ["pixel_values", "image"]: |
|
|
|
|
|
return torch.cat([item[key] for item in items], dim=0) |
|
elif dim == 4 and key == "input_features": |
|
|
|
return torch.cat([item[key] for item in items], dim=0) |
|
max_length = max(item[key].shape[1] for item in items) |
|
min_length = min(item[key].shape[1] for item in items) |
|
dtype = items[0][key].dtype |
|
|
|
if dim == 2: |
|
if max_length == min_length: |
|
|
|
|
|
return torch.cat([item[key] for item in items], dim=0) |
|
tensor = torch.zeros((batch_size, max_length), dtype=dtype) + padding_value |
|
elif dim == 3: |
|
tensor = torch.zeros((batch_size, max_length, shape[-1]), dtype=dtype) + padding_value |
|
elif dim == 4: |
|
tensor = torch.zeros((batch_size, max_length, shape[-2], shape[-1]), dtype=dtype) + padding_value |
|
|
|
for i, item in enumerate(items): |
|
if dim == 2: |
|
if padding_side == "left": |
|
tensor[i, -len(item[key][0]) :] = item[key][0].clone() |
|
else: |
|
tensor[i, : len(item[key][0])] = item[key][0].clone() |
|
elif dim == 3: |
|
if padding_side == "left": |
|
tensor[i, -len(item[key][0]) :, :] = item[key][0].clone() |
|
else: |
|
tensor[i, : len(item[key][0]), :] = item[key][0].clone() |
|
elif dim == 4: |
|
if padding_side == "left": |
|
tensor[i, -len(item[key][0]) :, :, :] = item[key][0].clone() |
|
else: |
|
tensor[i, : len(item[key][0]), :, :] = item[key][0].clone() |
|
|
|
return tensor |
|
else: |
|
return [item[key] for item in items] |
|
|
|
|
|
def pad_collate_fn(tokenizer, feature_extractor): |
|
|
|
t_padding_side = None |
|
|
|
f_padding_side = None |
|
if tokenizer is None and feature_extractor is None: |
|
raise ValueError("Pipeline without tokenizer or feature_extractor cannot do batching") |
|
if tokenizer is not None: |
|
if tokenizer.pad_token_id is None: |
|
raise ValueError( |
|
"Pipeline with tokenizer without pad_token cannot do batching. You can try to set it with " |
|
"`pipe.tokenizer.pad_token_id = model.config.eos_token_id`." |
|
) |
|
else: |
|
t_padding_value = tokenizer.pad_token_id |
|
t_padding_side = tokenizer.padding_side |
|
if feature_extractor is not None: |
|
|
|
f_padding_value = getattr(feature_extractor, "padding_value", None) |
|
f_padding_side = getattr(feature_extractor, "padding_side", None) |
|
|
|
if t_padding_side is not None and f_padding_side is not None and t_padding_side != f_padding_side: |
|
raise ValueError( |
|
f"The feature extractor, and tokenizer don't agree on padding side {t_padding_side} != {f_padding_side}" |
|
) |
|
padding_side = "right" |
|
if t_padding_side is not None: |
|
padding_side = t_padding_side |
|
if f_padding_side is not None: |
|
padding_side = f_padding_side |
|
|
|
def inner(items): |
|
keys = set(items[0].keys()) |
|
for item in items: |
|
if set(item.keys()) != keys: |
|
raise ValueError( |
|
f"The elements of the batch contain different keys. Cannot batch them ({set(item.keys())} !=" |
|
f" {keys})" |
|
) |
|
|
|
padded = {} |
|
for key in keys: |
|
if key in {"input_ids"}: |
|
|
|
if tokenizer is None and feature_extractor is not None: |
|
_padding_value = f_padding_value |
|
else: |
|
_padding_value = t_padding_value |
|
elif key in {"input_values", "pixel_values", "input_features"}: |
|
_padding_value = f_padding_value |
|
elif key in {"p_mask", "special_tokens_mask"}: |
|
_padding_value = 1 |
|
elif key in {"attention_mask", "token_type_ids"}: |
|
_padding_value = 0 |
|
else: |
|
|
|
_padding_value = 0 |
|
padded[key] = _pad(items, key, _padding_value, padding_side) |
|
return padded |
|
|
|
return inner |
|
|
|
|
|
def infer_framework_load_model( |
|
model, |
|
config: AutoConfig, |
|
model_classes: Optional[Dict[str, Tuple[type]]] = None, |
|
task: Optional[str] = None, |
|
framework: Optional[str] = None, |
|
**model_kwargs, |
|
): |
|
""" |
|
Select framework (TensorFlow or PyTorch) to use from the `model` passed. Returns a tuple (framework, model). |
|
|
|
If `model` is instantiated, this function will just infer the framework from the model class. Otherwise `model` is |
|
actually a checkpoint name and this method will try to instantiate it using `model_classes`. Since we don't want to |
|
instantiate the model twice, this model is returned for use by the pipeline. |
|
|
|
If both frameworks are installed and available for `model`, PyTorch is selected. |
|
|
|
Args: |
|
model (`str`, [`PreTrainedModel`] or [`TFPreTrainedModel`]): |
|
The model to infer the framework from. If `str`, a checkpoint name. The model to infer the framewrok from. |
|
config ([`AutoConfig`]): |
|
The config associated with the model to help using the correct class |
|
model_classes (dictionary `str` to `type`, *optional*): |
|
A mapping framework to class. |
|
task (`str`): |
|
The task defining which pipeline will be returned. |
|
model_kwargs: |
|
Additional dictionary of keyword arguments passed along to the model's `from_pretrained(..., |
|
**model_kwargs)` function. |
|
|
|
Returns: |
|
`Tuple`: A tuple framework, model. |
|
""" |
|
if not is_tf_available() and not is_torch_available(): |
|
raise RuntimeError( |
|
"At least one of TensorFlow 2.0 or PyTorch should be installed. " |
|
"To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ " |
|
"To install PyTorch, read the instructions at https://pytorch.org/." |
|
) |
|
if isinstance(model, str): |
|
model_kwargs["_from_pipeline"] = task |
|
class_tuple = () |
|
look_pt = is_torch_available() and framework in {"pt", None} |
|
look_tf = is_tf_available() and framework in {"tf", None} |
|
if model_classes: |
|
if look_pt: |
|
class_tuple = class_tuple + model_classes.get("pt", (AutoModel,)) |
|
if look_tf: |
|
class_tuple = class_tuple + model_classes.get("tf", (TFAutoModel,)) |
|
if config.architectures: |
|
classes = [] |
|
for architecture in config.architectures: |
|
transformers_module = importlib.import_module("transformers") |
|
if look_pt: |
|
_class = getattr(transformers_module, architecture, None) |
|
if _class is not None: |
|
classes.append(_class) |
|
if look_tf: |
|
_class = getattr(transformers_module, f"TF{architecture}", None) |
|
if _class is not None: |
|
classes.append(_class) |
|
class_tuple = class_tuple + tuple(classes) |
|
|
|
if len(class_tuple) == 0: |
|
raise ValueError(f"Pipeline cannot infer suitable model classes from {model}") |
|
|
|
all_traceback = {} |
|
for model_class in class_tuple: |
|
kwargs = model_kwargs.copy() |
|
if framework == "pt" and model.endswith(".h5"): |
|
kwargs["from_tf"] = True |
|
logger.warning( |
|
"Model might be a TensorFlow model (ending with `.h5`) but TensorFlow is not available. " |
|
"Trying to load the model with PyTorch." |
|
) |
|
elif framework == "tf" and model.endswith(".bin"): |
|
kwargs["from_pt"] = True |
|
logger.warning( |
|
"Model might be a PyTorch model (ending with `.bin`) but PyTorch is not available. " |
|
"Trying to load the model with Tensorflow." |
|
) |
|
|
|
try: |
|
model = model_class.from_pretrained(model, **kwargs) |
|
if hasattr(model, "eval"): |
|
model = model.eval() |
|
|
|
break |
|
except (OSError, ValueError): |
|
all_traceback[model_class.__name__] = traceback.format_exc() |
|
continue |
|
|
|
if isinstance(model, str): |
|
error = "" |
|
for class_name, trace in all_traceback.items(): |
|
error += f"while loading with {class_name}, an error is thrown:\n{trace}\n" |
|
raise ValueError( |
|
f"Could not load model {model} with any of the following classes: {class_tuple}. See the original errors:\n\n{error}\n" |
|
) |
|
|
|
if framework is None: |
|
framework = infer_framework(model.__class__) |
|
return framework, model |
|
|
|
|
|
def infer_framework_from_model( |
|
model, |
|
model_classes: Optional[Dict[str, Tuple[type]]] = None, |
|
task: Optional[str] = None, |
|
framework: Optional[str] = None, |
|
**model_kwargs, |
|
): |
|
""" |
|
Select framework (TensorFlow or PyTorch) to use from the `model` passed. Returns a tuple (framework, model). |
|
|
|
If `model` is instantiated, this function will just infer the framework from the model class. Otherwise `model` is |
|
actually a checkpoint name and this method will try to instantiate it using `model_classes`. Since we don't want to |
|
instantiate the model twice, this model is returned for use by the pipeline. |
|
|
|
If both frameworks are installed and available for `model`, PyTorch is selected. |
|
|
|
Args: |
|
model (`str`, [`PreTrainedModel`] or [`TFPreTrainedModel`]): |
|
The model to infer the framework from. If `str`, a checkpoint name. The model to infer the framewrok from. |
|
model_classes (dictionary `str` to `type`, *optional*): |
|
A mapping framework to class. |
|
task (`str`): |
|
The task defining which pipeline will be returned. |
|
model_kwargs: |
|
Additional dictionary of keyword arguments passed along to the model's `from_pretrained(..., |
|
**model_kwargs)` function. |
|
|
|
Returns: |
|
`Tuple`: A tuple framework, model. |
|
""" |
|
if isinstance(model, str): |
|
config = AutoConfig.from_pretrained(model, _from_pipeline=task, **model_kwargs) |
|
else: |
|
config = model.config |
|
return infer_framework_load_model( |
|
model, config, model_classes=model_classes, _from_pipeline=task, task=task, framework=framework, **model_kwargs |
|
) |
|
|
|
|
|
def get_framework(model, revision: Optional[str] = None): |
|
""" |
|
Select framework (TensorFlow or PyTorch) to use. |
|
|
|
Args: |
|
model (`str`, [`PreTrainedModel`] or [`TFPreTrainedModel`]): |
|
If both frameworks are installed, picks the one corresponding to the model passed (either a model class or |
|
the model name). If no specific model is provided, defaults to using PyTorch. |
|
""" |
|
warnings.warn( |
|
"`get_framework` is deprecated and will be removed in v5, use `infer_framework_from_model` instead.", |
|
FutureWarning, |
|
) |
|
if not is_tf_available() and not is_torch_available(): |
|
raise RuntimeError( |
|
"At least one of TensorFlow 2.0 or PyTorch should be installed. " |
|
"To install TensorFlow 2.0, read the instructions at https://www.tensorflow.org/install/ " |
|
"To install PyTorch, read the instructions at https://pytorch.org/." |
|
) |
|
if isinstance(model, str): |
|
if is_torch_available() and not is_tf_available(): |
|
model = AutoModel.from_pretrained(model, revision=revision) |
|
elif is_tf_available() and not is_torch_available(): |
|
model = TFAutoModel.from_pretrained(model, revision=revision) |
|
else: |
|
try: |
|
model = AutoModel.from_pretrained(model, revision=revision) |
|
except OSError: |
|
model = TFAutoModel.from_pretrained(model, revision=revision) |
|
|
|
framework = infer_framework(model.__class__) |
|
return framework |
|
|
|
|
|
def get_default_model_and_revision( |
|
targeted_task: Dict, framework: Optional[str], task_options: Optional[Any] |
|
) -> Union[str, Tuple[str, str]]: |
|
""" |
|
Select a default model to use for a given task. Defaults to pytorch if ambiguous. |
|
|
|
Args: |
|
targeted_task (`Dict` ): |
|
Dictionary representing the given task, that should contain default models |
|
|
|
framework (`str`, None) |
|
"pt", "tf" or None, representing a specific framework if it was specified, or None if we don't know yet. |
|
|
|
task_options (`Any`, None) |
|
Any further value required by the task to get fully specified, for instance (SRC, TGT) languages for |
|
translation task. |
|
|
|
Returns |
|
|
|
`str` The model string representing the default model for this pipeline |
|
""" |
|
if is_torch_available() and not is_tf_available(): |
|
framework = "pt" |
|
elif is_tf_available() and not is_torch_available(): |
|
framework = "tf" |
|
|
|
defaults = targeted_task["default"] |
|
if task_options: |
|
if task_options not in defaults: |
|
raise ValueError(f"The task does not provide any default models for options {task_options}") |
|
default_models = defaults[task_options]["model"] |
|
elif "model" in defaults: |
|
default_models = targeted_task["default"]["model"] |
|
else: |
|
|
|
|
|
raise ValueError('The task defaults can\'t be correctly selected. You probably meant "translation_XX_to_YY"') |
|
|
|
if framework is None: |
|
framework = "pt" |
|
|
|
return default_models[framework] |
|
|
|
|
|
class PipelineException(Exception): |
|
""" |
|
Raised by a [`Pipeline`] when handling __call__. |
|
|
|
Args: |
|
task (`str`): The task of the pipeline. |
|
model (`str`): The model used by the pipeline. |
|
reason (`str`): The error message to display. |
|
""" |
|
|
|
def __init__(self, task: str, model: str, reason: str): |
|
super().__init__(reason) |
|
|
|
self.task = task |
|
self.model = model |
|
|
|
|
|
class ArgumentHandler(ABC): |
|
""" |
|
Base interface for handling arguments for each [`~pipelines.Pipeline`]. |
|
""" |
|
|
|
@abstractmethod |
|
def __call__(self, *args, **kwargs): |
|
raise NotImplementedError() |
|
|
|
|
|
class PipelineDataFormat: |
|
""" |
|
Base class for all the pipeline supported data format both for reading and writing. Supported data formats |
|
currently includes: |
|
|
|
- JSON |
|
- CSV |
|
- stdin/stdout (pipe) |
|
|
|
`PipelineDataFormat` also includes some utilities to work with multi-columns like mapping from datasets columns to |
|
pipelines keyword arguments through the `dataset_kwarg_1=dataset_column_1` format. |
|
|
|
Args: |
|
output_path (`str`): Where to save the outgoing data. |
|
input_path (`str`): Where to look for the input data. |
|
column (`str`): The column to read. |
|
overwrite (`bool`, *optional*, defaults to `False`): |
|
Whether or not to overwrite the `output_path`. |
|
""" |
|
|
|
SUPPORTED_FORMATS = ["json", "csv", "pipe"] |
|
|
|
def __init__( |
|
self, |
|
output_path: Optional[str], |
|
input_path: Optional[str], |
|
column: Optional[str], |
|
overwrite: bool = False, |
|
): |
|
self.output_path = output_path |
|
self.input_path = input_path |
|
self.column = column.split(",") if column is not None else [""] |
|
self.is_multi_columns = len(self.column) > 1 |
|
|
|
if self.is_multi_columns: |
|
self.column = [tuple(c.split("=")) if "=" in c else (c, c) for c in self.column] |
|
|
|
if output_path is not None and not overwrite: |
|
if exists(abspath(self.output_path)): |
|
raise OSError(f"{self.output_path} already exists on disk") |
|
|
|
if input_path is not None: |
|
if not exists(abspath(self.input_path)): |
|
raise OSError(f"{self.input_path} doesnt exist on disk") |
|
|
|
@abstractmethod |
|
def __iter__(self): |
|
raise NotImplementedError() |
|
|
|
@abstractmethod |
|
def save(self, data: Union[dict, List[dict]]): |
|
""" |
|
Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`]. |
|
|
|
Args: |
|
data (`dict` or list of `dict`): The data to store. |
|
""" |
|
raise NotImplementedError() |
|
|
|
def save_binary(self, data: Union[dict, List[dict]]) -> str: |
|
""" |
|
Save the provided data object as a pickle-formatted binary data on the disk. |
|
|
|
Args: |
|
data (`dict` or list of `dict`): The data to store. |
|
|
|
Returns: |
|
`str`: Path where the data has been saved. |
|
""" |
|
path, _ = os.path.splitext(self.output_path) |
|
binary_path = os.path.extsep.join((path, "pickle")) |
|
|
|
with open(binary_path, "wb+") as f_output: |
|
pickle.dump(data, f_output) |
|
|
|
return binary_path |
|
|
|
@staticmethod |
|
def from_str( |
|
format: str, |
|
output_path: Optional[str], |
|
input_path: Optional[str], |
|
column: Optional[str], |
|
overwrite=False, |
|
) -> "PipelineDataFormat": |
|
""" |
|
Creates an instance of the right subclass of [`~pipelines.PipelineDataFormat`] depending on `format`. |
|
|
|
Args: |
|
format (`str`): |
|
The format of the desired pipeline. Acceptable values are `"json"`, `"csv"` or `"pipe"`. |
|
output_path (`str`, *optional*): |
|
Where to save the outgoing data. |
|
input_path (`str`, *optional*): |
|
Where to look for the input data. |
|
column (`str`, *optional*): |
|
The column to read. |
|
overwrite (`bool`, *optional*, defaults to `False`): |
|
Whether or not to overwrite the `output_path`. |
|
|
|
Returns: |
|
[`~pipelines.PipelineDataFormat`]: The proper data format. |
|
""" |
|
if format == "json": |
|
return JsonPipelineDataFormat(output_path, input_path, column, overwrite=overwrite) |
|
elif format == "csv": |
|
return CsvPipelineDataFormat(output_path, input_path, column, overwrite=overwrite) |
|
elif format == "pipe": |
|
return PipedPipelineDataFormat(output_path, input_path, column, overwrite=overwrite) |
|
else: |
|
raise KeyError(f"Unknown reader {format} (Available reader are json/csv/pipe)") |
|
|
|
|
|
class CsvPipelineDataFormat(PipelineDataFormat): |
|
""" |
|
Support for pipelines using CSV data format. |
|
|
|
Args: |
|
output_path (`str`): Where to save the outgoing data. |
|
input_path (`str`): Where to look for the input data. |
|
column (`str`): The column to read. |
|
overwrite (`bool`, *optional*, defaults to `False`): |
|
Whether or not to overwrite the `output_path`. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
output_path: Optional[str], |
|
input_path: Optional[str], |
|
column: Optional[str], |
|
overwrite=False, |
|
): |
|
super().__init__(output_path, input_path, column, overwrite=overwrite) |
|
|
|
def __iter__(self): |
|
with open(self.input_path, "r") as f: |
|
reader = csv.DictReader(f) |
|
for row in reader: |
|
if self.is_multi_columns: |
|
yield {k: row[c] for k, c in self.column} |
|
else: |
|
yield row[self.column[0]] |
|
|
|
def save(self, data: List[dict]): |
|
""" |
|
Save the provided data object with the representation for the current [`~pipelines.PipelineDataFormat`]. |
|
|
|
Args: |
|
data (`List[dict]`): The data to store. |
|
""" |
|
with open(self.output_path, "w") as f: |
|
if len(data) > 0: |
|
writer = csv.DictWriter(f, list(data[0].keys())) |
|
writer.writeheader() |
|
writer.writerows(data) |
|
|
|
|
|
class JsonPipelineDataFormat(PipelineDataFormat): |
|
""" |
|
Support for pipelines using JSON file format. |
|
|
|
Args: |
|
output_path (`str`): Where to save the outgoing data. |
|
input_path (`str`): Where to look for the input data. |
|
column (`str`): The column to read. |
|
overwrite (`bool`, *optional*, defaults to `False`): |
|
Whether or not to overwrite the `output_path`. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
output_path: Optional[str], |
|
input_path: Optional[str], |
|
column: Optional[str], |
|
overwrite=False, |
|
): |
|
super().__init__(output_path, input_path, column, overwrite=overwrite) |
|
|
|
with open(input_path, "r") as f: |
|
self._entries = json.load(f) |
|
|
|
def __iter__(self): |
|
for entry in self._entries: |
|
if self.is_multi_columns: |
|
yield {k: entry[c] for k, c in self.column} |
|
else: |
|
yield entry[self.column[0]] |
|
|
|
def save(self, data: dict): |
|
""" |
|
Save the provided data object in a json file. |
|
|
|
Args: |
|
data (`dict`): The data to store. |
|
""" |
|
with open(self.output_path, "w") as f: |
|
json.dump(data, f) |
|
|
|
|
|
class PipedPipelineDataFormat(PipelineDataFormat): |
|
""" |
|
Read data from piped input to the python process. For multi columns data, columns should separated by \t |
|
|
|
If columns are provided, then the output will be a dictionary with {column_x: value_x} |
|
|
|
Args: |
|
output_path (`str`): Where to save the outgoing data. |
|
input_path (`str`): Where to look for the input data. |
|
column (`str`): The column to read. |
|
overwrite (`bool`, *optional*, defaults to `False`): |
|
Whether or not to overwrite the `output_path`. |
|
""" |
|
|
|
def __iter__(self): |
|
for line in sys.stdin: |
|
|
|
if "\t" in line: |
|
line = line.split("\t") |
|
if self.column: |
|
|
|
yield {kwargs: l for (kwargs, _), l in zip(self.column, line)} |
|
else: |
|
yield tuple(line) |
|
|
|
|
|
else: |
|
yield line |
|
|
|
def save(self, data: dict): |
|
""" |
|
Print the data. |
|
|
|
Args: |
|
data (`dict`): The data to store. |
|
""" |
|
print(data) |
|
|
|
def save_binary(self, data: Union[dict, List[dict]]) -> str: |
|
if self.output_path is None: |
|
raise KeyError( |
|
"When using piped input on pipeline outputting large object requires an output file path. " |
|
"Please provide such output path through --output argument." |
|
) |
|
|
|
return super().save_binary(data) |
|
|
|
|
|
class _ScikitCompat(ABC): |
|
""" |
|
Interface layer for the Scikit and Keras compatibility. |
|
""" |
|
|
|
@abstractmethod |
|
def transform(self, X): |
|
raise NotImplementedError() |
|
|
|
@abstractmethod |
|
def predict(self, X): |
|
raise NotImplementedError() |
|
|
|
|
|
PIPELINE_INIT_ARGS = r""" |
|
Arguments: |
|
model ([`PreTrainedModel`] or [`TFPreTrainedModel`]): |
|
The model that will be used by the pipeline to make predictions. This needs to be a model inheriting from |
|
[`PreTrainedModel`] for PyTorch and [`TFPreTrainedModel`] for TensorFlow. |
|
tokenizer ([`PreTrainedTokenizer`]): |
|
The tokenizer that will be used by the pipeline to encode data for the model. This object inherits from |
|
[`PreTrainedTokenizer`]. |
|
modelcard (`str` or [`ModelCard`], *optional*): |
|
Model card attributed to the model for this pipeline. |
|
framework (`str`, *optional*): |
|
The framework to use, either `"pt"` for PyTorch or `"tf"` for TensorFlow. The specified framework must be |
|
installed. |
|
|
|
If no framework is specified, will default to the one currently installed. If no framework is specified and |
|
both frameworks are installed, will default to the framework of the `model`, or to PyTorch if no model is |
|
provided. |
|
task (`str`, defaults to `""`): |
|
A task-identifier for the pipeline. |
|
num_workers (`int`, *optional*, defaults to 8): |
|
When the pipeline will use *DataLoader* (when passing a dataset, on GPU for a Pytorch model), the number of |
|
workers to be used. |
|
batch_size (`int`, *optional*, defaults to 1): |
|
When the pipeline will use *DataLoader* (when passing a dataset, on GPU for a Pytorch model), the size of |
|
the batch to use, for inference this is not always beneficial, please read [Batching with |
|
pipelines](https://huggingface.co/transformers/main_classes/pipelines.html#pipeline-batching) . |
|
args_parser ([`~pipelines.ArgumentHandler`], *optional*): |
|
Reference to the object in charge of parsing supplied pipeline parameters. |
|
device (`int`, *optional*, defaults to -1): |
|
Device ordinal for CPU/GPU supports. Setting this to -1 will leverage CPU, a positive will run the model on |
|
the associated CUDA device id. You can pass native `torch.device` or a `str` too. |
|
binary_output (`bool`, *optional*, defaults to `False`): |
|
Flag indicating if the output the pipeline should happen in a binary format (i.e., pickle) or as raw text. |
|
""" |
|
|
|
if is_torch_available(): |
|
from transformers.pipelines.pt_utils import ( |
|
PipelineChunkIterator, |
|
PipelineDataset, |
|
PipelineIterator, |
|
PipelinePackIterator, |
|
) |
|
|
|
|
|
@add_end_docstrings(PIPELINE_INIT_ARGS) |
|
class Pipeline(_ScikitCompat): |
|
""" |
|
The Pipeline class is the class from which all pipelines inherit. Refer to this class for methods shared across |
|
different pipelines. |
|
|
|
Base class implementing pipelined operations. Pipeline workflow is defined as a sequence of the following |
|
operations: |
|
|
|
Input -> Tokenization -> Model Inference -> Post-Processing (task dependent) -> Output |
|
|
|
Pipeline supports running on CPU or GPU through the device argument (see below). |
|
|
|
Some pipeline, like for instance [`FeatureExtractionPipeline`] (`'feature-extraction'`) output large tensor object |
|
as nested-lists. In order to avoid dumping such large structure as textual data we provide the `binary_output` |
|
constructor argument. If set to `True`, the output will be stored in the pickle format. |
|
""" |
|
|
|
default_input_names = None |
|
|
|
def __init__( |
|
self, |
|
model: Union["PreTrainedModel", "TFPreTrainedModel"], |
|
tokenizer: Optional[PreTrainedTokenizer] = None, |
|
feature_extractor: Optional[PreTrainedFeatureExtractor] = None, |
|
image_processor: Optional[BaseImageProcessor] = None, |
|
modelcard: Optional[ModelCard] = None, |
|
framework: Optional[str] = None, |
|
task: str = "", |
|
args_parser: ArgumentHandler = None, |
|
device: Union[int, "torch.device"] = None, |
|
torch_dtype: Optional[Union[str, "torch.dtype"]] = None, |
|
binary_output: bool = False, |
|
**kwargs, |
|
): |
|
if framework is None: |
|
framework, model = infer_framework_load_model(model, config=model.config) |
|
|
|
self.task = task |
|
self.model = model |
|
self.tokenizer = tokenizer |
|
self.feature_extractor = feature_extractor |
|
self.image_processor = image_processor |
|
self.modelcard = modelcard |
|
self.framework = framework |
|
|
|
|
|
hf_device_map = getattr(self.model, "hf_device_map", None) |
|
|
|
if hf_device_map is not None and device is not None: |
|
raise ValueError( |
|
"The model has been loaded with `accelerate` and therefore cannot be moved to a specific device. Please " |
|
"discard the `device` argument when creating your pipeline object." |
|
) |
|
|
|
|
|
if self.framework == "pt" and device is not None and not (isinstance(device, int) and device < 0): |
|
self.model.to(device) |
|
|
|
if device is None: |
|
if hf_device_map is not None: |
|
|
|
device = next(iter(hf_device_map.values())) |
|
else: |
|
device = -1 |
|
|
|
if is_torch_available() and self.framework == "pt": |
|
if isinstance(device, torch.device): |
|
self.device = device |
|
elif isinstance(device, str): |
|
self.device = torch.device(device) |
|
elif device < 0: |
|
self.device = torch.device("cpu") |
|
else: |
|
self.device = torch.device(f"cuda:{device}") |
|
else: |
|
self.device = device if device is not None else -1 |
|
self.torch_dtype = torch_dtype |
|
self.binary_output = binary_output |
|
|
|
|
|
task_specific_params = self.model.config.task_specific_params |
|
if task_specific_params is not None and task in task_specific_params: |
|
self.model.config.update(task_specific_params.get(task)) |
|
if self.model.can_generate(): |
|
self.model.generation_config.update(**task_specific_params.get(task)) |
|
|
|
self.call_count = 0 |
|
self._batch_size = kwargs.pop("batch_size", None) |
|
self._num_workers = kwargs.pop("num_workers", None) |
|
self._preprocess_params, self._forward_params, self._postprocess_params = self._sanitize_parameters(**kwargs) |
|
|
|
if self.image_processor is None and self.feature_extractor is not None: |
|
if isinstance(self.feature_extractor, BaseImageProcessor): |
|
|
|
|
|
|
|
self.image_processor = self.feature_extractor |
|
|
|
def save_pretrained(self, save_directory: str, safe_serialization: bool = False): |
|
""" |
|
Save the pipeline's model and tokenizer. |
|
|
|
Args: |
|
save_directory (`str`): |
|
A path to the directory where to saved. It will be created if it doesn't exist. |
|
safe_serialization (`str`): |
|
Whether to save the model using `safetensors` or the traditional way for PyTorch or Tensorflow |
|
""" |
|
if os.path.isfile(save_directory): |
|
logger.error(f"Provided path ({save_directory}) should be a directory, not a file") |
|
return |
|
os.makedirs(save_directory, exist_ok=True) |
|
|
|
if hasattr(self, "_registered_impl"): |
|
|
|
pipeline_info = self._registered_impl.copy() |
|
custom_pipelines = {} |
|
for task, info in pipeline_info.items(): |
|
if info["impl"] != self.__class__: |
|
continue |
|
|
|
info = info.copy() |
|
module_name = info["impl"].__module__ |
|
last_module = module_name.split(".")[-1] |
|
|
|
info["impl"] = f"{last_module}.{info['impl'].__name__}" |
|
info["pt"] = tuple(c.__name__ for c in info["pt"]) |
|
info["tf"] = tuple(c.__name__ for c in info["tf"]) |
|
|
|
custom_pipelines[task] = info |
|
self.model.config.custom_pipelines = custom_pipelines |
|
|
|
custom_object_save(self, save_directory) |
|
|
|
self.model.save_pretrained(save_directory, safe_serialization=safe_serialization) |
|
|
|
if self.tokenizer is not None: |
|
self.tokenizer.save_pretrained(save_directory) |
|
|
|
if self.feature_extractor is not None: |
|
self.feature_extractor.save_pretrained(save_directory) |
|
|
|
if self.image_processor is not None: |
|
self.image_processor.save_pretrained(save_directory) |
|
|
|
if self.modelcard is not None: |
|
self.modelcard.save_pretrained(save_directory) |
|
|
|
def transform(self, X): |
|
""" |
|
Scikit / Keras interface to transformers' pipelines. This method will forward to __call__(). |
|
""" |
|
return self(X) |
|
|
|
def predict(self, X): |
|
""" |
|
Scikit / Keras interface to transformers' pipelines. This method will forward to __call__(). |
|
""" |
|
return self(X) |
|
|
|
@contextmanager |
|
def device_placement(self): |
|
""" |
|
Context Manager allowing tensor allocation on the user-specified device in framework agnostic way. |
|
|
|
Returns: |
|
Context manager |
|
|
|
Examples: |
|
|
|
```python |
|
# Explicitly ask for tensor allocation on CUDA device :0 |
|
pipe = pipeline(..., device=0) |
|
with pipe.device_placement(): |
|
# Every framework specific tensor allocation will be done on the request device |
|
output = pipe(...) |
|
```""" |
|
if self.framework == "tf": |
|
with tf.device("/CPU:0" if self.device == -1 else f"/device:GPU:{self.device}"): |
|
yield |
|
else: |
|
if self.device.type == "cuda": |
|
with torch.cuda.device(self.device): |
|
yield |
|
else: |
|
yield |
|
|
|
def ensure_tensor_on_device(self, **inputs): |
|
""" |
|
Ensure PyTorch tensors are on the specified device. |
|
|
|
Args: |
|
inputs (keyword arguments that should be `torch.Tensor`, the rest is ignored): |
|
The tensors to place on `self.device`. |
|
Recursive on lists **only**. |
|
|
|
Return: |
|
`Dict[str, torch.Tensor]`: The same as `inputs` but on the proper device. |
|
""" |
|
return self._ensure_tensor_on_device(inputs, self.device) |
|
|
|
def _ensure_tensor_on_device(self, inputs, device): |
|
if isinstance(inputs, ModelOutput): |
|
return ModelOutput( |
|
{name: self._ensure_tensor_on_device(tensor, device) for name, tensor in inputs.items()} |
|
) |
|
elif isinstance(inputs, dict): |
|
return {name: self._ensure_tensor_on_device(tensor, device) for name, tensor in inputs.items()} |
|
elif isinstance(inputs, UserDict): |
|
return UserDict({name: self._ensure_tensor_on_device(tensor, device) for name, tensor in inputs.items()}) |
|
elif isinstance(inputs, list): |
|
return [self._ensure_tensor_on_device(item, device) for item in inputs] |
|
elif isinstance(inputs, tuple): |
|
return tuple([self._ensure_tensor_on_device(item, device) for item in inputs]) |
|
elif isinstance(inputs, torch.Tensor): |
|
if device == torch.device("cpu") and inputs.dtype in {torch.float16, torch.bfloat16}: |
|
inputs = inputs.float() |
|
return inputs.to(device) |
|
else: |
|
return inputs |
|
|
|
def check_model_type(self, supported_models: Union[List[str], dict]): |
|
""" |
|
Check if the model class is in supported by the pipeline. |
|
|
|
Args: |
|
supported_models (`List[str]` or `dict`): |
|
The list of models supported by the pipeline, or a dictionary with model class values. |
|
""" |
|
if not isinstance(supported_models, list): |
|
supported_models_names = [] |
|
for _, model_name in supported_models.items(): |
|
|
|
if isinstance(model_name, tuple): |
|
supported_models_names.extend(list(model_name)) |
|
else: |
|
supported_models_names.append(model_name) |
|
if hasattr(supported_models, "_model_mapping"): |
|
for _, model in supported_models._model_mapping._extra_content.items(): |
|
if isinstance(model_name, tuple): |
|
supported_models_names.extend([m.__name__ for m in model]) |
|
else: |
|
supported_models_names.append(model.__name__) |
|
supported_models = supported_models_names |
|
if self.model.__class__.__name__ not in supported_models: |
|
logger.error( |
|
f"The model '{self.model.__class__.__name__}' is not supported for {self.task}. Supported models are" |
|
f" {supported_models}." |
|
) |
|
|
|
@abstractmethod |
|
def _sanitize_parameters(self, **pipeline_parameters): |
|
""" |
|
_sanitize_parameters will be called with any excessive named arguments from either `__init__` or `__call__` |
|
methods. It should return 3 dictionnaries of the resolved parameters used by the various `preprocess`, |
|
`forward` and `postprocess` methods. Do not fill dictionnaries if the caller didn't specify a kwargs. This |
|
let's you keep defaults in function signatures, which is more "natural". |
|
|
|
It is not meant to be called directly, it will be automatically called and the final parameters resolved by |
|
`__init__` and `__call__` |
|
""" |
|
raise NotImplementedError("_sanitize_parameters not implemented") |
|
|
|
@abstractmethod |
|
def preprocess(self, input_: Any, **preprocess_parameters: Dict) -> Dict[str, GenericTensor]: |
|
""" |
|
Preprocess will take the `input_` of a specific pipeline and return a dictionary of everything necessary for |
|
`_forward` to run properly. It should contain at least one tensor, but might have arbitrary other items. |
|
""" |
|
raise NotImplementedError("preprocess not implemented") |
|
|
|
@abstractmethod |
|
def _forward(self, input_tensors: Dict[str, GenericTensor], **forward_parameters: Dict) -> ModelOutput: |
|
""" |
|
_forward will receive the prepared dictionary from `preprocess` and run it on the model. This method might |
|
involve the GPU or the CPU and should be agnostic to it. Isolating this function is the reason for `preprocess` |
|
and `postprocess` to exist, so that the hot path, this method generally can run as fast as possible. |
|
|
|
It is not meant to be called directly, `forward` is preferred. It is basically the same but contains additional |
|
code surrounding `_forward` making sure tensors and models are on the same device, disabling the training part |
|
of the code (leading to faster inference). |
|
""" |
|
raise NotImplementedError("_forward not implemented") |
|
|
|
@abstractmethod |
|
def postprocess(self, model_outputs: ModelOutput, **postprocess_parameters: Dict) -> Any: |
|
""" |
|
Postprocess will receive the raw outputs of the `_forward` method, generally tensors, and reformat them into |
|
something more friendly. Generally it will output a list or a dict or results (containing just strings and |
|
numbers). |
|
""" |
|
raise NotImplementedError("postprocess not implemented") |
|
|
|
def get_inference_context(self): |
|
return torch.no_grad |
|
|
|
def forward(self, model_inputs, **forward_params): |
|
with self.device_placement(): |
|
if self.framework == "tf": |
|
model_inputs["training"] = False |
|
model_outputs = self._forward(model_inputs, **forward_params) |
|
elif self.framework == "pt": |
|
inference_context = self.get_inference_context() |
|
with inference_context(): |
|
model_inputs = self._ensure_tensor_on_device(model_inputs, device=self.device) |
|
model_outputs = self._forward(model_inputs, **forward_params) |
|
model_outputs = self._ensure_tensor_on_device(model_outputs, device=torch.device("cpu")) |
|
else: |
|
raise ValueError(f"Framework {self.framework} is not supported") |
|
return model_outputs |
|
|
|
def get_iterator( |
|
self, inputs, num_workers: int, batch_size: int, preprocess_params, forward_params, postprocess_params |
|
): |
|
if isinstance(inputs, collections.abc.Sized): |
|
dataset = PipelineDataset(inputs, self.preprocess, preprocess_params) |
|
else: |
|
if num_workers > 1: |
|
logger.warning( |
|
"For iterable dataset using num_workers>1 is likely to result" |
|
" in errors since everything is iterable, setting `num_workers=1`" |
|
" to guarantee correctness." |
|
) |
|
num_workers = 1 |
|
dataset = PipelineIterator(inputs, self.preprocess, preprocess_params) |
|
if "TOKENIZERS_PARALLELISM" not in os.environ: |
|
logger.info("Disabling tokenizer parallelism, we're using DataLoader multithreading already") |
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
feature_extractor = self.feature_extractor if self.feature_extractor is not None else self.image_processor |
|
collate_fn = no_collate_fn if batch_size == 1 else pad_collate_fn(self.tokenizer, feature_extractor) |
|
dataloader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, collate_fn=collate_fn) |
|
model_iterator = PipelineIterator(dataloader, self.forward, forward_params, loader_batch_size=batch_size) |
|
final_iterator = PipelineIterator(model_iterator, self.postprocess, postprocess_params) |
|
return final_iterator |
|
|
|
def __call__(self, inputs, *args, num_workers=None, batch_size=None, **kwargs): |
|
if args: |
|
logger.warning(f"Ignoring args : {args}") |
|
|
|
if num_workers is None: |
|
if self._num_workers is None: |
|
num_workers = 0 |
|
else: |
|
num_workers = self._num_workers |
|
if batch_size is None: |
|
if self._batch_size is None: |
|
batch_size = 1 |
|
else: |
|
batch_size = self._batch_size |
|
|
|
preprocess_params, forward_params, postprocess_params = self._sanitize_parameters(**kwargs) |
|
|
|
|
|
preprocess_params = {**self._preprocess_params, **preprocess_params} |
|
forward_params = {**self._forward_params, **forward_params} |
|
postprocess_params = {**self._postprocess_params, **postprocess_params} |
|
|
|
self.call_count += 1 |
|
if self.call_count > 10 and self.framework == "pt" and self.device.type == "cuda": |
|
warnings.warn( |
|
"You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a" |
|
" dataset", |
|
UserWarning, |
|
) |
|
|
|
is_dataset = Dataset is not None and isinstance(inputs, Dataset) |
|
is_generator = isinstance(inputs, types.GeneratorType) |
|
is_list = isinstance(inputs, list) |
|
|
|
is_iterable = is_dataset or is_generator or is_list |
|
|
|
|
|
can_use_iterator = self.framework == "pt" and (is_dataset or is_generator or is_list) |
|
|
|
if is_list: |
|
if can_use_iterator: |
|
final_iterator = self.get_iterator( |
|
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params |
|
) |
|
outputs = list(final_iterator) |
|
return outputs |
|
else: |
|
return self.run_multi(inputs, preprocess_params, forward_params, postprocess_params) |
|
elif can_use_iterator: |
|
return self.get_iterator( |
|
inputs, num_workers, batch_size, preprocess_params, forward_params, postprocess_params |
|
) |
|
elif is_iterable: |
|
return self.iterate(inputs, preprocess_params, forward_params, postprocess_params) |
|
elif self.framework == "pt" and isinstance(self, ChunkPipeline): |
|
return next( |
|
iter( |
|
self.get_iterator( |
|
[inputs], num_workers, batch_size, preprocess_params, forward_params, postprocess_params |
|
) |
|
) |
|
) |
|
else: |
|
return self.run_single(inputs, preprocess_params, forward_params, postprocess_params) |
|
|
|
def run_multi(self, inputs, preprocess_params, forward_params, postprocess_params): |
|
return [self.run_single(item, preprocess_params, forward_params, postprocess_params) for item in inputs] |
|
|
|
def run_single(self, inputs, preprocess_params, forward_params, postprocess_params): |
|
model_inputs = self.preprocess(inputs, **preprocess_params) |
|
model_outputs = self.forward(model_inputs, **forward_params) |
|
outputs = self.postprocess(model_outputs, **postprocess_params) |
|
return outputs |
|
|
|
def iterate(self, inputs, preprocess_params, forward_params, postprocess_params): |
|
|
|
|
|
for input_ in inputs: |
|
yield self.run_single(input_, preprocess_params, forward_params, postprocess_params) |
|
|
|
|
|
class ChunkPipeline(Pipeline): |
|
def run_single(self, inputs, preprocess_params, forward_params, postprocess_params): |
|
all_outputs = [] |
|
for model_inputs in self.preprocess(inputs, **preprocess_params): |
|
model_outputs = self.forward(model_inputs, **forward_params) |
|
all_outputs.append(model_outputs) |
|
outputs = self.postprocess(all_outputs, **postprocess_params) |
|
return outputs |
|
|
|
def get_iterator( |
|
self, inputs, num_workers: int, batch_size: int, preprocess_params, forward_params, postprocess_params |
|
): |
|
if "TOKENIZERS_PARALLELISM" not in os.environ: |
|
logger.info("Disabling tokenizer parallelism, we're using DataLoader multithreading already") |
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
if num_workers > 1: |
|
logger.warning( |
|
"For ChunkPipeline using num_workers>0 is likely to result in errors since everything is iterable," |
|
" setting `num_workers=1` to guarantee correctness." |
|
) |
|
num_workers = 1 |
|
dataset = PipelineChunkIterator(inputs, self.preprocess, preprocess_params) |
|
|
|
|
|
feature_extractor = self.feature_extractor if self.feature_extractor is not None else self.image_processor |
|
collate_fn = no_collate_fn if batch_size == 1 else pad_collate_fn(self.tokenizer, feature_extractor) |
|
dataloader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, collate_fn=collate_fn) |
|
model_iterator = PipelinePackIterator(dataloader, self.forward, forward_params, loader_batch_size=batch_size) |
|
final_iterator = PipelineIterator(model_iterator, self.postprocess, postprocess_params) |
|
return final_iterator |
|
|
|
|
|
class PipelineRegistry: |
|
def __init__(self, supported_tasks: Dict[str, Any], task_aliases: Dict[str, str]) -> None: |
|
self.supported_tasks = supported_tasks |
|
self.task_aliases = task_aliases |
|
|
|
def get_supported_tasks(self) -> List[str]: |
|
supported_task = list(self.supported_tasks.keys()) + list(self.task_aliases.keys()) |
|
supported_task.sort() |
|
return supported_task |
|
|
|
def check_task(self, task: str) -> Tuple[str, Dict, Any]: |
|
if task in self.task_aliases: |
|
task = self.task_aliases[task] |
|
if task in self.supported_tasks: |
|
targeted_task = self.supported_tasks[task] |
|
return task, targeted_task, None |
|
|
|
if task.startswith("translation"): |
|
tokens = task.split("_") |
|
if len(tokens) == 4 and tokens[0] == "translation" and tokens[2] == "to": |
|
targeted_task = self.supported_tasks["translation"] |
|
task = "translation" |
|
return task, targeted_task, (tokens[1], tokens[3]) |
|
raise KeyError(f"Invalid translation task {task}, use 'translation_XX_to_YY' format") |
|
|
|
raise KeyError( |
|
f"Unknown task {task}, available tasks are {self.get_supported_tasks() + ['translation_XX_to_YY']}" |
|
) |
|
|
|
def register_pipeline( |
|
self, |
|
task: str, |
|
pipeline_class: type, |
|
pt_model: Optional[Union[type, Tuple[type]]] = None, |
|
tf_model: Optional[Union[type, Tuple[type]]] = None, |
|
default: Optional[Dict] = None, |
|
type: Optional[str] = None, |
|
) -> None: |
|
if task in self.supported_tasks: |
|
logger.warning(f"{task} is already registered. Overwriting pipeline for task {task}...") |
|
|
|
if pt_model is None: |
|
pt_model = () |
|
elif not isinstance(pt_model, tuple): |
|
pt_model = (pt_model,) |
|
|
|
if tf_model is None: |
|
tf_model = () |
|
elif not isinstance(tf_model, tuple): |
|
tf_model = (tf_model,) |
|
|
|
task_impl = {"impl": pipeline_class, "pt": pt_model, "tf": tf_model} |
|
|
|
if default is not None: |
|
if "model" not in default and ("pt" in default or "tf" in default): |
|
default = {"model": default} |
|
task_impl["default"] = default |
|
|
|
if type is not None: |
|
task_impl["type"] = type |
|
|
|
self.supported_tasks[task] = task_impl |
|
pipeline_class._registered_impl = {task: task_impl} |
|
|
|
def to_dict(self): |
|
return self.supported_tasks |
|
|