|
import torch |
|
from enum import IntEnum |
|
import numpy as np |
|
from transformers import RobertaTokenizer, BatchEncoding, RobertaTokenizerFast |
|
import warnings |
|
|
|
|
|
def get_tokenizer(parent_class): |
|
class TokenizerClass(parent_class): |
|
class TaskTypes(IntEnum): |
|
NULL = (0,) |
|
QUERY = 1 |
|
DOCUMENT = 2 |
|
STS = 3 |
|
CLUSTERING = (4,) |
|
CLASSIFICATION = 5 |
|
|
|
def __init__(self, *args, **kwargs): |
|
""" |
|
This class dynamically extends a given tokenizer class from the HF |
|
Transformers library (RobertaTokenizer or RobertaTokenizerFast). |
|
The task_type_ids are used to pass instruction information to the model. |
|
A task_type should either be an integer or a sequence of integers with the same |
|
length as the batch size. |
|
""" |
|
super().__init__(*args, **kwargs) |
|
|
|
def __call__(self, *args, task_type: TaskTypes = None, **kwargs): |
|
batch_encoding = super().__call__(*args, **kwargs) |
|
if task_type is not None: |
|
batch_encoding = self._add_task_type_ids( |
|
batch_encoding, task_type, kwargs.get('return_tensors') |
|
) |
|
return batch_encoding |
|
|
|
def _batch_encode_plus(self, *args, task_type: TaskTypes = None, **kwargs): |
|
batch_encoding = super()._batch_encode_plus(*args, **kwargs) |
|
if task_type is not None: |
|
batch_encoding = self._add_task_type_ids( |
|
batch_encoding, task_type, kwargs.get('return_tensors') |
|
) |
|
return batch_encoding |
|
|
|
def _encode_plus(self, *args, task_type: TaskTypes = None, **kwargs): |
|
batch_encoding = super()._encode_plus(*args, **kwargs) |
|
if task_type is not None: |
|
batch_encoding = self._add_task_type_ids( |
|
batch_encoding, task_type, kwargs.get('return_tensors') |
|
) |
|
return batch_encoding |
|
|
|
@classmethod |
|
def _add_task_type_ids( |
|
cls, batch_encoding: BatchEncoding, task_type: TaskTypes, tensor_type: str |
|
): |
|
return BatchEncoding( |
|
{ |
|
'task_type_ids': cls._get_task_type_ids(batch_encoding, task_type), |
|
**batch_encoding, |
|
}, |
|
tensor_type=tensor_type, |
|
) |
|
|
|
@staticmethod |
|
def _get_task_type_ids(batch_encoding: BatchEncoding, task_type: TaskTypes): |
|
def apply_task_type(m, x): |
|
x = torch.tensor(x) |
|
assert ( |
|
len(x.shape) == 0 or x.shape[0] == m.shape[0] |
|
), 'The shape of task_type does not match the size of the batch.' |
|
return m * x if len(x.shape) == 0 else m * x[:, None] |
|
|
|
if isinstance(batch_encoding['input_ids'], torch.Tensor): |
|
shape = batch_encoding['input_ids'].shape |
|
return apply_task_type(torch.ones(shape, dtype=torch.long), task_type) |
|
else: |
|
try: |
|
shape = torch.tensor(batch_encoding['input_ids']).shape |
|
except: |
|
raise ValueError( |
|
"Unable to create tensor, you should probably " |
|
"activate truncation and/or padding with " |
|
"'padding=True' 'truncation=True' to have batched " |
|
"tensors with the same length." |
|
) |
|
if isinstance(batch_encoding['input_ids'], list): |
|
return ( |
|
apply_task_type(torch.ones(shape, dtype=torch.long), task_type) |
|
).tolist() |
|
elif isinstance(batch_encoding['input_ids'], np.array): |
|
return ( |
|
apply_task_type(torch.ones(shape, dtype=torch.long), task_type) |
|
).numpy() |
|
else: |
|
warnings.warn( |
|
'input_ids is not a torch tensor, numpy array, or list. Returning torch tensor' |
|
) |
|
return apply_task_type( |
|
torch.ones(shape, dtype=torch.long), task_type |
|
) |
|
|
|
return TokenizerClass |
|
|
|
|
|
JinaTokenizer = get_tokenizer(RobertaTokenizer) |
|
JinaTokenizerFast = get_tokenizer(RobertaTokenizerFast) |
|
|
|
|