# coding=utf-8 # Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. team. # Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import csv import dataclasses import json from dataclasses import dataclass from typing import List, Optional, Union from ...utils import is_tf_available, is_torch_available, logging logger = logging.get_logger(__name__) @dataclass class InputExample: """ A single training/test example for simple sequence classification. Args: guid: Unique id for the example. text_a: string. The untokenized text of the first sequence. For single sequence tasks, only this sequence must be specified. text_b: (Optional) string. The untokenized text of the second sequence. Only must be specified for sequence pair tasks. label: (Optional) string. The label of the example. This should be specified for train and dev examples, but not for test examples. """ guid: str text_a: str text_b: Optional[str] = None label: Optional[str] = None def to_json_string(self): """Serializes this instance to a JSON string.""" return json.dumps(dataclasses.asdict(self), indent=2) + "\n" @dataclass(frozen=True) class InputFeatures: """ A single set of features of data. Property names are the same names as the corresponding inputs to a model. Args: input_ids: Indices of input sequence tokens in the vocabulary. attention_mask: Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: Usually `1` for tokens that are NOT MASKED, `0` for MASKED (padded) tokens. token_type_ids: (Optional) Segment token indices to indicate first and second portions of the inputs. Only some models use them. label: (Optional) Label corresponding to the input. Int for classification problems, float for regression problems. """ input_ids: List[int] attention_mask: Optional[List[int]] = None token_type_ids: Optional[List[int]] = None label: Optional[Union[int, float]] = None def to_json_string(self): """Serializes this instance to a JSON string.""" return json.dumps(dataclasses.asdict(self)) + "\n" class DataProcessor: """Base class for data converters for sequence classification data sets.""" def get_example_from_tensor_dict(self, tensor_dict): """ Gets an example from a dict with tensorflow tensors. Args: tensor_dict: Keys and values should match the corresponding Glue tensorflow_dataset examples. """ raise NotImplementedError() def get_train_examples(self, data_dir): """Gets a collection of [`InputExample`] for the train set.""" raise NotImplementedError() def get_dev_examples(self, data_dir): """Gets a collection of [`InputExample`] for the dev set.""" raise NotImplementedError() def get_test_examples(self, data_dir): """Gets a collection of [`InputExample`] for the test set.""" raise NotImplementedError() def get_labels(self): """Gets the list of labels for this data set.""" raise NotImplementedError() def tfds_map(self, example): """ Some tensorflow_datasets datasets are not formatted the same way the GLUE datasets are. This method converts examples to the correct format. """ if len(self.get_labels()) > 1: example.label = self.get_labels()[int(example.label)] return example @classmethod def _read_tsv(cls, input_file, quotechar=None): """Reads a tab separated value file.""" with open(input_file, "r", encoding="utf-8-sig") as f: return list(csv.reader(f, delimiter="\t", quotechar=quotechar)) class SingleSentenceClassificationProcessor(DataProcessor): """Generic processor for a single sentence classification data set.""" def __init__(self, labels=None, examples=None, mode="classification", verbose=False): self.labels = [] if labels is None else labels self.examples = [] if examples is None else examples self.mode = mode self.verbose = verbose def __len__(self): return len(self.examples) def __getitem__(self, idx): if isinstance(idx, slice): return SingleSentenceClassificationProcessor(labels=self.labels, examples=self.examples[idx]) return self.examples[idx] @classmethod def create_from_csv( cls, file_name, split_name="", column_label=0, column_text=1, column_id=None, skip_first_row=False, **kwargs ): processor = cls(**kwargs) processor.add_examples_from_csv( file_name, split_name=split_name, column_label=column_label, column_text=column_text, column_id=column_id, skip_first_row=skip_first_row, overwrite_labels=True, overwrite_examples=True, ) return processor @classmethod def create_from_examples(cls, texts_or_text_and_labels, labels=None, **kwargs): processor = cls(**kwargs) processor.add_examples(texts_or_text_and_labels, labels=labels) return processor def add_examples_from_csv( self, file_name, split_name="", column_label=0, column_text=1, column_id=None, skip_first_row=False, overwrite_labels=False, overwrite_examples=False, ): lines = self._read_tsv(file_name) if skip_first_row: lines = lines[1:] texts = [] labels = [] ids = [] for i, line in enumerate(lines): texts.append(line[column_text]) labels.append(line[column_label]) if column_id is not None: ids.append(line[column_id]) else: guid = f"{split_name}-{i}" if split_name else str(i) ids.append(guid) return self.add_examples( texts, labels, ids, overwrite_labels=overwrite_labels, overwrite_examples=overwrite_examples ) def add_examples( self, texts_or_text_and_labels, labels=None, ids=None, overwrite_labels=False, overwrite_examples=False ): if labels is not None and len(texts_or_text_and_labels) != len(labels): raise ValueError( f"Text and labels have mismatched lengths {len(texts_or_text_and_labels)} and {len(labels)}" ) if ids is not None and len(texts_or_text_and_labels) != len(ids): raise ValueError(f"Text and ids have mismatched lengths {len(texts_or_text_and_labels)} and {len(ids)}") if ids is None: ids = [None] * len(texts_or_text_and_labels) if labels is None: labels = [None] * len(texts_or_text_and_labels) examples = [] added_labels = set() for text_or_text_and_label, label, guid in zip(texts_or_text_and_labels, labels, ids): if isinstance(text_or_text_and_label, (tuple, list)) and label is None: text, label = text_or_text_and_label else: text = text_or_text_and_label added_labels.add(label) examples.append(InputExample(guid=guid, text_a=text, text_b=None, label=label)) # Update examples if overwrite_examples: self.examples = examples else: self.examples.extend(examples) # Update labels if overwrite_labels: self.labels = list(added_labels) else: self.labels = list(set(self.labels).union(added_labels)) return self.examples def get_features( self, tokenizer, max_length=None, pad_on_left=False, pad_token=0, mask_padding_with_zero=True, return_tensors=None, ): """ Convert examples in a list of `InputFeatures` Args: tokenizer: Instance of a tokenizer that will tokenize the examples max_length: Maximum example length pad_on_left: If set to `True`, the examples will be padded on the left rather than on the right (default) pad_token: Padding token mask_padding_with_zero: If set to `True`, the attention mask will be filled by `1` for actual values and by `0` for padded values. If set to `False`, inverts it (`1` for padded values, `0` for actual values) Returns: If the `examples` input is a `tf.data.Dataset`, will return a `tf.data.Dataset` containing the task-specific features. If the input is a list of `InputExamples`, will return a list of task-specific `InputFeatures` which can be fed to the model. """ if max_length is None: max_length = tokenizer.max_len label_map = {label: i for i, label in enumerate(self.labels)} all_input_ids = [] for ex_index, example in enumerate(self.examples): if ex_index % 10000 == 0: logger.info(f"Tokenizing example {ex_index}") input_ids = tokenizer.encode( example.text_a, add_special_tokens=True, max_length=min(max_length, tokenizer.max_len), ) all_input_ids.append(input_ids) batch_length = max(len(input_ids) for input_ids in all_input_ids) features = [] for ex_index, (input_ids, example) in enumerate(zip(all_input_ids, self.examples)): if ex_index % 10000 == 0: logger.info(f"Writing example {ex_index}/{len(self.examples)}") # The mask has 1 for real tokens and 0 for padding tokens. Only real # tokens are attended to. attention_mask = [1 if mask_padding_with_zero else 0] * len(input_ids) # Zero-pad up to the sequence length. padding_length = batch_length - len(input_ids) if pad_on_left: input_ids = ([pad_token] * padding_length) + input_ids attention_mask = ([0 if mask_padding_with_zero else 1] * padding_length) + attention_mask else: input_ids = input_ids + ([pad_token] * padding_length) attention_mask = attention_mask + ([0 if mask_padding_with_zero else 1] * padding_length) if len(input_ids) != batch_length: raise ValueError(f"Error with input length {len(input_ids)} vs {batch_length}") if len(attention_mask) != batch_length: raise ValueError(f"Error with input length {len(attention_mask)} vs {batch_length}") if self.mode == "classification": label = label_map[example.label] elif self.mode == "regression": label = float(example.label) else: raise ValueError(self.mode) if ex_index < 5 and self.verbose: logger.info("*** Example ***") logger.info(f"guid: {example.guid}") logger.info(f"input_ids: {' '.join([str(x) for x in input_ids])}") logger.info(f"attention_mask: {' '.join([str(x) for x in attention_mask])}") logger.info(f"label: {example.label} (id = {label})") features.append(InputFeatures(input_ids=input_ids, attention_mask=attention_mask, label=label)) if return_tensors is None: return features elif return_tensors == "tf": if not is_tf_available(): raise RuntimeError("return_tensors set to 'tf' but TensorFlow 2.0 can't be imported") import tensorflow as tf def gen(): for ex in features: yield ({"input_ids": ex.input_ids, "attention_mask": ex.attention_mask}, ex.label) dataset = tf.data.Dataset.from_generator( gen, ({"input_ids": tf.int32, "attention_mask": tf.int32}, tf.int64), ({"input_ids": tf.TensorShape([None]), "attention_mask": tf.TensorShape([None])}, tf.TensorShape([])), ) return dataset elif return_tensors == "pt": if not is_torch_available(): raise RuntimeError("return_tensors set to 'pt' but PyTorch can't be imported") import torch from torch.utils.data import TensorDataset all_input_ids = torch.tensor([f.input_ids for f in features], dtype=torch.long) all_attention_mask = torch.tensor([f.attention_mask for f in features], dtype=torch.long) if self.mode == "classification": all_labels = torch.tensor([f.label for f in features], dtype=torch.long) elif self.mode == "regression": all_labels = torch.tensor([f.label for f in features], dtype=torch.float) dataset = TensorDataset(all_input_ids, all_attention_mask, all_labels) return dataset else: raise ValueError("return_tensors should be one of 'tf' or 'pt'")