|
import copy |
|
import warnings |
|
from typing import Any, Dict, List, Optional, Union |
|
|
|
import torch |
|
import torchvision.transforms.functional as F |
|
from transformers import BatchEncoding |
|
from transformers.processing_utils import ProcessorMixin |
|
|
|
|
|
def to_tensor(x): |
|
""" |
|
Convert a nested structure of numpy arrays or tensors (including lists and tuples of them) |
|
into a tensor. Assumes that all nested structures can be converted into a tensor directly. |
|
|
|
:param x: Nested structure containing numpy arrays, tensors, lists, or tuples |
|
:return: torch.Tensor |
|
""" |
|
with warnings.catch_warnings(): |
|
|
|
warnings.filterwarnings( |
|
"error", |
|
category=UserWarning, |
|
message=".*Creating a tensor from a list of numpy.ndarrays is extremely slow.*", |
|
) |
|
try: |
|
return torch.Tensor(x) |
|
except Exception: |
|
if isinstance(x, list): |
|
return torch.stack([to_tensor(item) for item in x]) |
|
else: |
|
raise TypeError("Unsupported type for conversion to tensor") |
|
|
|
|
|
def truncate( |
|
encoding: Dict[str, List[List[Any]]], max_length: int, truncation_side: str = "right", preserve: bool = False |
|
) -> Dict[str, List[List[Any]]]: |
|
""" |
|
Truncate the sequences in the encoding to the specified maximum length. |
|
|
|
This function is designed to process batch of sequences represented in the encoding dictionary. |
|
Depending on the chosen strategy, sequences are either truncated with loss of residual data or with preservation |
|
and incorporation of residual data into the batch. |
|
|
|
Args: |
|
encoding (`Mapping`): |
|
A dictionary where each key-value pair consists of a feature name and its corresponding batch of sequences. |
|
The sequences are expected to be lists. |
|
max_length (`int`): |
|
The maximum allowable length for the sequences. |
|
truncation_side (`str`, **optional**): |
|
The strategy to use for truncation. Can be `"left"` or `"right"`. Defaults to `"right"`. |
|
preserve (`bool`, **optional**): |
|
Whether to preserve the residual data by adding them as new sequences in the batch. Defaults to `False`. |
|
|
|
Returns: |
|
`Dict[str, List[List[Any]]]`: |
|
A dictionary with the same keys as the input `encoding`, containing the truncated batch of sequences. |
|
If `preserve` is set to `True`, the batch size may increase due to the addition of new sequences formed |
|
from the residual data. |
|
|
|
Example: |
|
|
|
>>> encoding = {'feature1': [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]} |
|
>>> truncate(encoding, 3, preserve=False) |
|
{'feature1': [[1, 2, 3], [6, 7, 8]]} |
|
|
|
>>> truncate(encoding, 3, preserve=True) |
|
{'feature1': [[1, 2, 3], [4, 5], [6, 7, 8], [9, 10]]} |
|
""" |
|
truncated_encoding = {} |
|
|
|
for key, sequences in encoding.items(): |
|
if not all(isinstance(seq, list) for seq in sequences): |
|
raise TypeError(f"All sequences under key {key} should be of type list.") |
|
|
|
truncated_sequences = [] |
|
|
|
for seq in sequences: |
|
if len(seq) <= max_length: |
|
truncated_sequences.append(seq) |
|
continue |
|
|
|
if preserve: |
|
if truncation_side == "right": |
|
truncated_sequences.extend([seq[i : i + max_length] for i in range(0, len(seq), max_length)]) |
|
elif truncation_side == "left": |
|
n = len(seq) // max_length + int(len(seq) % max_length > 0) |
|
low, high = len(seq) - n * max_length, len(seq) |
|
truncated_sequences.extend( |
|
[seq[max(0, i - max_length) : i] for i in range(high, low, -max_length)] |
|
) |
|
else: |
|
raise ValueError(f"Invalid truncation_side: {truncation_side}") |
|
else: |
|
if truncation_side == "right": |
|
truncated_sequences.append(seq[:max_length]) |
|
elif truncation_side == "left": |
|
truncated_sequences.append(seq[-max_length:]) |
|
|
|
truncated_encoding[key] = truncated_sequences |
|
|
|
return truncated_encoding |
|
|
|
|
|
def pad(encoding: Dict[str, List[List[Any]]], target_length: int) -> Dict[str, List[List[Any]]]: |
|
""" |
|
Pad the sequences in the encoding to the specified maximum length. |
|
|
|
This function is designed to process batch of sequences represented in the encoding dictionary. |
|
The padding value is set to be the first element in the sequence. |
|
|
|
Args: |
|
encoding (`Mapping`): |
|
A dictionary where each key-value pair consists of a feature name and its corresponding batch of sequences. |
|
The sequences are expected to be lists. |
|
target_length (`int`): |
|
The desired length for the sequences. |
|
|
|
Returns: |
|
`Dict[str, List[List[Any]]]`: |
|
A dictionary with the same keys as the input `encoding`, containing the padded batch of sequences. |
|
An additional key `attention_mask` is added to the dictionary to indicate the positions of the non-padding |
|
elements with 1s and the padding elements with 0s. If the input `encoding` already contains an |
|
`attention_mask` key, the corresponding mask will be updated such that the original masking is preserved, |
|
and the newly added padding elements will be masked with 0s. In other words, the resulting |
|
`attention_mask` is a logical "AND" between the provided mask and the mask created due to padding, ensuring |
|
that any element masked originally remains masked. |
|
|
|
Example: |
|
|
|
>>> encoding = {'feature1': [[1, 2], [3, 4, 5]]} |
|
>>> pad(encoding, 4) |
|
{'feature1': [[1, 2, 1, 1], [3, 4, 5, 3]], 'attention_mask': [[1, 1, 0, 0], [1, 1, 1, 0]]} |
|
|
|
>>> encoding = {'feature1': [[1, 2], [3, 4, 5]], "attention_mask": [[1, 0], [0, 1, 1]]} |
|
>>> pad(encoding, 4) |
|
{'feature1': [[1, 2, 1, 1], [3, 4, 5, 3]], 'attention_mask': [[1, 0, 0, 0], [0, 1, 1, 0]]} |
|
""" |
|
padded_encoding = {} |
|
|
|
for key, sequences in encoding.items(): |
|
if not all(isinstance(seq, (list, torch.Tensor)) for seq in sequences): |
|
raise TypeError(f"All sequences under key {key} should be of type list or tensor.") |
|
if key == "attention_mask": |
|
continue |
|
|
|
padded_sequences = [] |
|
pad_mask = [] |
|
|
|
for seq in sequences: |
|
pad_len = target_length - len(seq) |
|
padded_seq = list(seq) + [seq[0]] * max(0, pad_len) |
|
mask = [1] * len(seq) + [0] * max(0, pad_len) |
|
|
|
padded_sequences.append(padded_seq) |
|
pad_mask.append(mask) |
|
|
|
padded_encoding[key] = padded_sequences |
|
|
|
if "attention_mask" in encoding: |
|
padded_encoding["attention_mask"] = [ |
|
[a * (b[i] if i < len(b) else 0) for i, a in enumerate(row)] |
|
for row, b in zip(pad_mask, encoding["attention_mask"]) |
|
] |
|
else: |
|
padded_encoding["attention_mask"] = pad_mask |
|
|
|
return padded_encoding |
|
|
|
|
|
class JatProcessor(ProcessorMixin): |
|
r""" |
|
JAT processor which wraps a CLIP image processor and a BERT tokenizer into a single processor. |
|
|
|
[`JatProcessor`] offers all the functionalities of [`CLIPImageProcessor`] and [`BertTokenizerFast`]. See the |
|
[`~JatProcessor.__call__`] and [`~JatProcessor.decode`] for more information. |
|
|
|
Args: |
|
image_processor ([`AutoImageProcessor`]): |
|
The image processor is a required input. |
|
tokenizer ([`AutoTokenizer`]): |
|
The tokenizer is a required input. |
|
""" |
|
attributes = ["image_processor", "tokenizer"] |
|
image_processor_class = "AutoImageProcessor" |
|
tokenizer_class = "AutoTokenizer" |
|
|
|
DONT_TRUNCATE_OR_PAD = {"pixel_values"} |
|
|
|
def __init__(self, image_processor, tokenizer): |
|
super().__init__(image_processor, tokenizer) |
|
self.current_processor = self.image_processor |
|
|
|
def _truncate_and_pad( |
|
self, |
|
encoding: dict, |
|
padding: Union[bool, str], |
|
truncation: Union[bool, str], |
|
truncation_side: str = "right", |
|
max_length: Optional[int] = None, |
|
) -> dict: |
|
|
|
if max_length is None: |
|
max_length = self.tokenizer.model_max_length |
|
|
|
|
|
excluded = {key: value for key, value in encoding.items() if key in self.DONT_TRUNCATE_OR_PAD} |
|
encoding = {key: value for key, value in encoding.items() if key not in self.DONT_TRUNCATE_OR_PAD} |
|
|
|
|
|
if truncation in [True, "lossy"]: |
|
encoding = truncate(encoding, max_length, truncation_side, preserve=False) |
|
elif truncation == "preserve": |
|
encoding = truncate(encoding, max_length, truncation_side, preserve=True) |
|
elif truncation in [False, "do_not_truncate"]: |
|
pass |
|
else: |
|
raise ValueError("Invalid truncation strategy:" + str(truncation)) |
|
|
|
|
|
if padding in [True, "longest"]: |
|
target_length = max(len(seq) for sequences in encoding.values() for seq in sequences) |
|
encoding = pad(encoding, target_length) |
|
elif padding == "max_length": |
|
encoding = pad(encoding, max_length) |
|
elif padding in [False, "do_not_pad"]: |
|
pass |
|
else: |
|
raise ValueError("Invalid padding strategy:" + str(padding)) |
|
|
|
|
|
encoding.update(excluded) |
|
|
|
|
|
|
|
if "image_observations" in encoding: |
|
encoding["image_observations"] = to_tensor(encoding["image_observations"]) |
|
|
|
return encoding |
|
|
|
def __call__( |
|
self, |
|
text=None, |
|
images=None, |
|
continuous_observations=None, |
|
discrete_observations=None, |
|
text_observations=None, |
|
image_observations=None, |
|
continuous_actions=None, |
|
discrete_actions=None, |
|
rewards=None, |
|
return_tensors=None, |
|
**kwargs, |
|
): |
|
""" |
|
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` |
|
and `kwargs` arguments to BertTokenizerFast's [`~BertTokenizerFast.__call__`] if `text` is not `None` to encode |
|
the text. To prepare the image(s), this method forwards the `images` and `kwrags` arguments to |
|
CLIPImageProcessor's [`~CLIPImageProcessor.__call__`] if `images` is not `None`. Please refer to the doctsring |
|
of the above two methods for more information. |
|
|
|
Args: |
|
text (`str`, `List[str]`, `List[List[str]]`): |
|
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings |
|
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set |
|
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences). |
|
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, |
|
`List[np.ndarray]`, `List[torch.Tensor]`): |
|
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch |
|
tensor. In case of a NumPy array/PyTorch tensor, each image should be of shape (C, H, W), where C is a |
|
number of channels, H and W are image height and width. |
|
continuous_observations (`List[List[List[float]]]`): |
|
The continuous observations or batch of continuous observations to be encoded. |
|
discrete_observations (`List[List[List[int]]]`): |
|
The discrete observations or batch of discrete observations to be encoded. |
|
text_observations (`List[List[str]]`): |
|
The text observations or batch of text observations to be encoded. |
|
image_observations (`List[List[PIL.Image.Image]]`, `List[List[np.ndarray]]`, `List[List[torch.Tensor]]`): |
|
The image observations or batch of image observations to be encoded. |
|
continuous_actions (`List[List[List[float]]]`): |
|
The continuous actions or batch of continuous actions to be encoded. |
|
discrete_actions (``List[List[int]]`): |
|
The discrete actions or batch of discrete actions to be encoded. |
|
rewards (``List[List[float]]`): |
|
The rewards or batch of rewards to be encoded. |
|
return_tensors (`str` or [`~utils.TensorType`], *optional*): |
|
If set, will return tensors of a particular framework. Acceptable values are: |
|
|
|
- `'tf'`: Return TensorFlow `tf.constant` objects. |
|
- `'pt'`: Return PyTorch `torch.Tensor` objects. |
|
- `'np'`: Return NumPy `np.ndarray` objects. |
|
- `'jax'`: Return JAX `jnp.ndarray` objects. |
|
|
|
Returns: |
|
[`BatchEncoding`]: A [`BatchEncoding`] with the following fields: |
|
|
|
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. |
|
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when |
|
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not |
|
`None`). |
|
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. |
|
""" |
|
|
|
padding = kwargs.pop("padding", False) |
|
truncation = kwargs.pop("truncation", False) |
|
truncation_side = kwargs.pop("truncation_side", "right") |
|
max_length = kwargs.pop("max_length", None) |
|
|
|
|
|
if text is not None and not isinstance(text, list): |
|
text = [text] |
|
|
|
encoding = {} |
|
if text is not None: |
|
encoding["input_ids"] = self.tokenizer(text, **kwargs)["input_ids"] |
|
if images is not None: |
|
encoding["pixel_values"] = self.image_processor(images, **kwargs).pixel_values |
|
if continuous_observations is not None: |
|
encoding["continuous_observations"] = copy.deepcopy(continuous_observations) |
|
if discrete_observations is not None: |
|
encoding["discrete_observations"] = copy.deepcopy(discrete_observations) |
|
if text_observations is not None: |
|
if "discrete_observations" not in encoding: |
|
raise ValueError("discrete_observations must be provided if text_observations is provided") |
|
for batch_idx, sequence in enumerate(text_observations): |
|
encoded_text = self.tokenizer(sequence, max_length=64, padding="max_length")["input_ids"] |
|
for timestep, text_tokens in enumerate(encoded_text): |
|
encoding["discrete_observations"][batch_idx][timestep].extend(text_tokens) |
|
if image_observations is not None: |
|
image_observations = [[(F.to_tensor(im) - 0.5) / 0.5 for im in ep] for ep in image_observations] |
|
encoding["image_observations"] = image_observations |
|
if continuous_actions is not None: |
|
encoding["continuous_actions"] = copy.deepcopy(continuous_actions) |
|
if discrete_actions is not None: |
|
encoding["discrete_actions"] = copy.deepcopy(discrete_actions) |
|
|
|
if rewards is not None: |
|
encoding["rewards"] = [[float(r) for r in ep] for ep in rewards] |
|
|
|
|
|
if text is not None and images is not None: |
|
if max_length is None: |
|
max_length = self.tokenizer.model_max_length |
|
max_length -= (224 // 16) ** 2 |
|
elif ( |
|
continuous_observations is not None |
|
or discrete_observations is not None |
|
or text_observations is not None |
|
or image_observations is not None |
|
): |
|
if max_length is None: |
|
max_length = self.tokenizer.model_max_length |
|
max_length //= 2 |
|
|
|
encoding = self._truncate_and_pad(encoding, padding, truncation, truncation_side, max_length) |
|
|
|
return BatchEncoding(encoding, tensor_type=return_tensors) |
|
|
|
def batch_decode(self, *args, **kwargs): |
|
""" |
|
This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please |
|
refer to the docstring of this method for more information. |
|
""" |
|
return self.tokenizer.batch_decode(*args, **kwargs) |
|
|
|
def decode(self, *args, **kwargs): |
|
""" |
|
This method forwards all its arguments to BertTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to |
|
the docstring of this method for more information. |
|
""" |
|
return self.tokenizer.decode(*args, **kwargs) |
|
|
|
def pad(self, *args, **kwargs): |
|
inputs = args[0] |
|
keys = [key for key in inputs[0].keys() if inputs[0][key] is not None] |
|
inputs = {key: [arg[key] for arg in inputs] for key in keys} |
|
elmt = next(iter(inputs.values())) |
|
if isinstance(elmt[0], torch.Tensor) and not isinstance(elmt, torch.Tensor): |
|
encoding = {key: torch.stack(inputs[key]) for key in inputs.keys()} |
|
else: |
|
encoding = self._truncate_and_pad( |
|
inputs, padding=kwargs.get("padding", False), truncation=False, max_length=kwargs.get("max_length") |
|
) |
|
|
|
return BatchEncoding(encoding, tensor_type=kwargs.get("return_tensors")) |
|
|
|
@property |
|
def model_input_names(self): |
|
return [ |
|
"input_ids", |
|
"attention_mask", |
|
"pixel_values", |
|
"continuous_observations", |
|
"discrete_observations", |
|
"image_observations", |
|
"continuous_actions", |
|
"discrete_actions", |
|
"rewards", |
|
] |
|
|
|
|
|
JatProcessor.register_for_auto_class("AutoProcessor") |
|
|