# Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import unittest from typing import Dict, List import tests.utils as test_utils import torch from fairseq import utils from fairseq.data import ( Dictionary, LanguagePairDataset, TransformEosDataset, data_utils, noising, ) class TestDataNoising(unittest.TestCase): def _get_test_data_with_bpe_cont_marker(self, append_eos=True): """ Args: append_eos: if True, each input sentence in the source tokens tensor will have an EOS appended to the end. Returns: vocabs: BPE vocab with continuation markers as suffixes to denote non-end of word tokens. This is the standard BPE format used in fairseq's preprocessing. x: input tensor containing numberized source tokens, with EOS at the end if append_eos is true src_lengths: and source lengths. """ vocab = Dictionary() vocab.add_symbol("he@@") vocab.add_symbol("llo") vocab.add_symbol("how") vocab.add_symbol("are") vocab.add_symbol("y@@") vocab.add_symbol("ou") vocab.add_symbol("n@@") vocab.add_symbol("ew") vocab.add_symbol("or@@") vocab.add_symbol("k") src_tokens = [ ["he@@", "llo", "n@@", "ew", "y@@", "or@@", "k"], ["how", "are", "y@@", "ou"], ] x, src_lengths = x, src_lengths = self._convert_src_tokens_to_tensor( vocab=vocab, src_tokens=src_tokens, append_eos=append_eos ) return vocab, x, src_lengths def _get_test_data_with_bpe_end_marker(self, append_eos=True): """ Args: append_eos: if True, each input sentence in the source tokens tensor will have an EOS appended to the end. Returns: vocabs: BPE vocab with end-of-word markers as suffixes to denote tokens at the end of a word. This is an alternative to fairseq's standard preprocessing framework and is not generally supported within fairseq. x: input tensor containing numberized source tokens, with EOS at the end if append_eos is true src_lengths: and source lengths. """ vocab = Dictionary() vocab.add_symbol("he") vocab.add_symbol("llo_EOW") vocab.add_symbol("how_EOW") vocab.add_symbol("are_EOW") vocab.add_symbol("y") vocab.add_symbol("ou_EOW") vocab.add_symbol("n") vocab.add_symbol("ew_EOW") vocab.add_symbol("or") vocab.add_symbol("k_EOW") src_tokens = [ ["he", "llo_EOW", "n", "ew_EOW", "y", "or", "k_EOW"], ["how_EOW", "are_EOW", "y", "ou_EOW"], ] x, src_lengths = x, src_lengths = self._convert_src_tokens_to_tensor( vocab=vocab, src_tokens=src_tokens, append_eos=append_eos ) return vocab, x, src_lengths def _get_test_data_with_word_vocab(self, append_eos=True): """ Args: append_eos: if True, each input sentence in the source tokens tensor will have an EOS appended to the end. Returns: vocabs: word vocab x: input tensor containing numberized source tokens, with EOS at the end if append_eos is true src_lengths: and source lengths. """ vocab = Dictionary() vocab.add_symbol("hello") vocab.add_symbol("how") vocab.add_symbol("are") vocab.add_symbol("you") vocab.add_symbol("new") vocab.add_symbol("york") src_tokens = [ ["hello", "new", "york", "you"], ["how", "are", "you", "new", "york"], ] x, src_lengths = self._convert_src_tokens_to_tensor( vocab=vocab, src_tokens=src_tokens, append_eos=append_eos ) return vocab, x, src_lengths def _convert_src_tokens_to_tensor( self, vocab: Dictionary, src_tokens: List[List[str]], append_eos: bool ): src_len = [len(x) for x in src_tokens] # If we have to append EOS, we include EOS in counting src length if append_eos: src_len = [length + 1 for length in src_len] x = torch.LongTensor(len(src_tokens), max(src_len)).fill_(vocab.pad()) for i in range(len(src_tokens)): for j in range(len(src_tokens[i])): x[i][j] = vocab.index(src_tokens[i][j]) if append_eos: x[i][j + 1] = vocab.eos() x = x.transpose(1, 0) return x, torch.LongTensor(src_len) def assert_eos_at_end(self, x, x_len, eos): """Asserts last token of every sentence in x is EOS """ for i in range(len(x_len)): self.assertEqual( x[x_len[i] - 1][i], eos, ( "Expected eos (token id {eos}) at the end of sentence {i} " "but got {other} instead" ).format(i=i, eos=eos, other=x[i][-1]), ) def assert_word_dropout_correct(self, x, x_noised, x_len, l_noised): # Expect only the first word (2 bpe tokens) of the first example # was dropped out self.assertEqual(x_len[0] - 2, l_noised[0]) for i in range(l_noised[0]): self.assertEqual(x_noised[i][0], x[i + 2][0]) def test_word_dropout_with_eos(self): vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=True) with data_utils.numpy_seed(1234): noising_gen = noising.WordDropout(vocab) x_noised, l_noised = noising_gen.noising(x, x_len, 0.2) self.assert_word_dropout_correct( x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised ) self.assert_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) def assert_word_blanking_correct(self, x, x_noised, x_len, l_noised, unk): # Expect only the first word (2 bpe tokens) of the first example # was blanked out self.assertEqual(x_len[0], l_noised[0]) for i in range(l_noised[0]): if i < 2: self.assertEqual(x_noised[i][0], unk) else: self.assertEqual(x_noised[i][0], x[i][0]) def test_word_blank_with_eos(self): vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=True) with data_utils.numpy_seed(1234): noising_gen = noising.WordDropout(vocab) x_noised, l_noised = noising_gen.noising(x, x_len, 0.2, vocab.unk()) self.assert_word_blanking_correct( x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised, unk=vocab.unk() ) self.assert_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) def generate_unchanged_shuffle_map(self, length): return {i: i for i in range(length)} def assert_word_shuffle_matches_expected( self, x, x_len, max_shuffle_distance: int, vocab: Dictionary, expected_shufle_maps: List[Dict[int, int]], expect_eos_at_end: bool, bpe_end_marker=None, ): """ This verifies that with a given x, x_len, max_shuffle_distance, and vocab, we get the expected shuffle result. Args: x: Tensor of shape (T x B) = (sequence_length, batch_size) x_len: Tensor of length B = batch_size max_shuffle_distance: arg to pass to noising expected_shuffle_maps: List[mapping] where mapping is a Dict[old_index, new_index], mapping x's elements from their old positions in x to their new positions in x. expect_eos_at_end: if True, check the output to make sure there is an EOS at the end. bpe_end_marker: str denoting the BPE end token. If this is not None, we set the BPE cont token to None in the noising classes. """ bpe_cont_marker = None if bpe_end_marker is None: bpe_cont_marker = "@@" with data_utils.numpy_seed(1234): word_shuffle = noising.WordShuffle( vocab, bpe_cont_marker=bpe_cont_marker, bpe_end_marker=bpe_end_marker ) x_noised, l_noised = word_shuffle.noising( x, x_len, max_shuffle_distance=max_shuffle_distance ) # For every example, we have a different expected shuffle map. We check # that each example is shuffled as expected according to each # corresponding shuffle map. for i in range(len(expected_shufle_maps)): shuffle_map = expected_shufle_maps[i] for k, v in shuffle_map.items(): self.assertEqual(x[k][i], x_noised[v][i]) # Shuffling should not affect the length of each example for pre_shuffle_length, post_shuffle_length in zip(x_len, l_noised): self.assertEqual(pre_shuffle_length, post_shuffle_length) if expect_eos_at_end: self.assert_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) def test_word_shuffle_with_eos(self): vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=True) # Assert word shuffle with max shuffle distance 0 causes input to be # unchanged self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, max_shuffle_distance=0, vocab=vocab, expected_shufle_maps=[ self.generate_unchanged_shuffle_map(example_len) for example_len in x_len ], expect_eos_at_end=True, ) # Assert word shuffle with max shuffle distance 3 matches our expected # shuffle order self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, vocab=vocab, max_shuffle_distance=3, expected_shufle_maps=[ self.generate_unchanged_shuffle_map(x_len[0]), {0: 0, 1: 3, 2: 1, 3: 2}, ], expect_eos_at_end=True, ) def test_word_shuffle_with_eos_nonbpe(self): """The purpose of this is to test shuffling logic with word vocabs""" vocab, x, x_len = self._get_test_data_with_word_vocab(append_eos=True) # Assert word shuffle with max shuffle distance 0 causes input to be # unchanged self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, max_shuffle_distance=0, vocab=vocab, expected_shufle_maps=[ self.generate_unchanged_shuffle_map(example_len) for example_len in x_len ], expect_eos_at_end=True, ) # Assert word shuffle with max shuffle distance 3 matches our expected # shuffle order self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, vocab=vocab, max_shuffle_distance=3, expected_shufle_maps=[ {0: 0, 1: 1, 2: 3, 3: 2}, {0: 0, 1: 2, 2: 1, 3: 3, 4: 4}, ], expect_eos_at_end=True, ) def test_word_shuffle_without_eos(self): """Same result as word shuffle with eos except no EOS at end""" vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=False) # Assert word shuffle with max shuffle distance 0 causes input to be # unchanged self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, max_shuffle_distance=0, vocab=vocab, expected_shufle_maps=[ self.generate_unchanged_shuffle_map(example_len) for example_len in x_len ], expect_eos_at_end=False, ) # Assert word shuffle with max shuffle distance 3 matches our expected # shuffle order self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, vocab=vocab, max_shuffle_distance=3, expected_shufle_maps=[ self.generate_unchanged_shuffle_map(x_len[0]), {0: 0, 1: 3, 2: 1, 3: 2}, ], expect_eos_at_end=False, ) def test_word_shuffle_without_eos_with_bpe_end_marker(self): """Same result as word shuffle without eos except using BPE end token""" vocab, x, x_len = self._get_test_data_with_bpe_end_marker(append_eos=False) # Assert word shuffle with max shuffle distance 0 causes input to be # unchanged self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, max_shuffle_distance=0, vocab=vocab, expected_shufle_maps=[ self.generate_unchanged_shuffle_map(example_len) for example_len in x_len ], expect_eos_at_end=False, bpe_end_marker="_EOW", ) # Assert word shuffle with max shuffle distance 3 matches our expected # shuffle order self.assert_word_shuffle_matches_expected( x=x, x_len=x_len, vocab=vocab, max_shuffle_distance=3, expected_shufle_maps=[ self.generate_unchanged_shuffle_map(x_len[0]), {0: 0, 1: 3, 2: 1, 3: 2}, ], expect_eos_at_end=False, bpe_end_marker="_EOW", ) def assert_no_eos_at_end(self, x, x_len, eos): """Asserts that the last token of each sentence in x is not EOS """ for i in range(len(x_len)): self.assertNotEqual( x[x_len[i] - 1][i], eos, "Expected no eos (token id {eos}) at the end of sentence {i}.".format( eos=eos, i=i ), ) def test_word_dropout_without_eos(self): """Same result as word dropout with eos except no EOS at end""" vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=False) with data_utils.numpy_seed(1234): noising_gen = noising.WordDropout(vocab) x_noised, l_noised = noising_gen.noising(x, x_len, 0.2) self.assert_word_dropout_correct( x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised ) self.assert_no_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) def test_word_blank_without_eos(self): """Same result as word blank with eos except no EOS at end""" vocab, x, x_len = self._get_test_data_with_bpe_cont_marker(append_eos=False) with data_utils.numpy_seed(1234): noising_gen = noising.WordDropout(vocab) x_noised, l_noised = noising_gen.noising(x, x_len, 0.2, vocab.unk()) self.assert_word_blanking_correct( x=x, x_noised=x_noised, x_len=x_len, l_noised=l_noised, unk=vocab.unk() ) self.assert_no_eos_at_end(x=x_noised, x_len=l_noised, eos=vocab.eos()) def _get_noising_dataset_batch( self, src_tokens_no_pad, src_dict, append_eos_to_tgt=False, ): """ Constructs a NoisingDataset and the corresponding ``LanguagePairDataset(NoisingDataset(src), src)``. If *append_eos_to_tgt* is True, wrap the source dataset in :class:`TransformEosDataset` to append EOS to the clean source when using it as the target. """ src_dataset = test_utils.TestDataset(data=src_tokens_no_pad) noising_dataset = noising.NoisingDataset( src_dataset=src_dataset, src_dict=src_dict, seed=1234, max_word_shuffle_distance=3, word_dropout_prob=0.2, word_blanking_prob=0.2, noising_class=noising.UnsupervisedMTNoising, ) tgt = src_dataset language_pair_dataset = LanguagePairDataset( src=noising_dataset, tgt=tgt, src_sizes=None, src_dict=src_dict ) language_pair_dataset = TransformEosDataset( language_pair_dataset, src_dict.eos(), append_eos_to_tgt=append_eos_to_tgt, ) dataloader = torch.utils.data.DataLoader( dataset=language_pair_dataset, batch_size=2, collate_fn=language_pair_dataset.collater, ) denoising_batch_result = next(iter(dataloader)) return denoising_batch_result def test_noising_dataset_with_eos(self): src_dict, src_tokens, _ = self._get_test_data_with_bpe_cont_marker( append_eos=True ) # Format data for src_dataset src_tokens = torch.t(src_tokens) src_tokens_no_pad = [] for src_sentence in src_tokens: src_tokens_no_pad.append( utils.strip_pad(tensor=src_sentence, pad=src_dict.pad()) ) denoising_batch_result = self._get_noising_dataset_batch( src_tokens_no_pad=src_tokens_no_pad, src_dict=src_dict ) eos, pad = src_dict.eos(), src_dict.pad() # Generated noisy source as source expected_src = torch.LongTensor( [[4, 5, 10, 11, 8, 12, 13, eos], [pad, pad, pad, 6, 8, 9, 7, eos]] ) # Original clean source as target (right-padded) expected_tgt = torch.LongTensor( [[4, 5, 10, 11, 8, 12, 13, eos], [6, 7, 8, 9, eos, pad, pad, pad]] ) generated_src = denoising_batch_result["net_input"]["src_tokens"] tgt_tokens = denoising_batch_result["target"] self.assertTensorEqual(expected_src, generated_src) self.assertTensorEqual(expected_tgt, tgt_tokens) def test_noising_dataset_without_eos(self): """ Similar to test noising dataset with eos except that we have to set *append_eos_to_tgt* to ``True``. """ src_dict, src_tokens, _ = self._get_test_data_with_bpe_cont_marker( append_eos=False ) # Format data for src_dataset src_tokens = torch.t(src_tokens) src_tokens_no_pad = [] for src_sentence in src_tokens: src_tokens_no_pad.append( utils.strip_pad(tensor=src_sentence, pad=src_dict.pad()) ) denoising_batch_result = self._get_noising_dataset_batch( src_tokens_no_pad=src_tokens_no_pad, src_dict=src_dict, append_eos_to_tgt=True, ) eos, pad = src_dict.eos(), src_dict.pad() # Generated noisy source as source expected_src = torch.LongTensor( [[4, 5, 10, 11, 8, 12, 13], [pad, pad, pad, 6, 8, 9, 7]] ) # Original clean source as target (right-padded) expected_tgt = torch.LongTensor( [[4, 5, 10, 11, 8, 12, 13, eos], [6, 7, 8, 9, eos, pad, pad, pad]] ) generated_src = denoising_batch_result["net_input"]["src_tokens"] tgt_tokens = denoising_batch_result["target"] self.assertTensorEqual(expected_src, generated_src) self.assertTensorEqual(expected_tgt, tgt_tokens) def assertTensorEqual(self, t1, t2): self.assertEqual(t1.size(), t2.size(), "size mismatch") self.assertEqual(t1.ne(t2).long().sum(), 0) if __name__ == "__main__": unittest.main()