Spaces:
Running
Running
# Copyright (c) Facebook, Inc. and its affiliates. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import argparse | |
import os | |
import os.path as op | |
from collections import namedtuple | |
from multiprocessing import cpu_count | |
from typing import List, Optional | |
import sentencepiece as sp | |
from fairseq.data.encoders.byte_bpe import ByteBPE | |
from fairseq.data.encoders.byte_utils import byte_encode | |
from fairseq.data.encoders.bytes import Bytes | |
from fairseq.data.encoders.characters import Characters | |
from fairseq.data.encoders.moses_tokenizer import MosesTokenizer | |
from fairseq.data.encoders.sentencepiece_bpe import SentencepieceBPE | |
SPLITS = ["train", "valid", "test"] | |
def _convert_xml(in_path: str, out_path: str): | |
with open(in_path) as f, open(out_path, "w") as f_o: | |
for s in f: | |
ss = s.strip() | |
if not ss.startswith("<seg"): | |
continue | |
ss = ss.replace("</seg>", "").split('">') | |
assert len(ss) == 2 | |
f_o.write(ss[1].strip() + "\n") | |
def _convert_train(in_path: str, out_path: str): | |
with open(in_path) as f, open(out_path, "w") as f_o: | |
for s in f: | |
ss = s.strip() | |
if ss.startswith("<"): | |
continue | |
f_o.write(ss.strip() + "\n") | |
def _get_bytes(in_path: str, out_path: str): | |
with open(in_path) as f, open(out_path, "w") as f_o: | |
for s in f: | |
f_o.write(Bytes.encode(s.strip()) + "\n") | |
def _get_chars(in_path: str, out_path: str): | |
with open(in_path) as f, open(out_path, "w") as f_o: | |
for s in f: | |
f_o.write(Characters.encode(s.strip()) + "\n") | |
def pretokenize(in_path: str, out_path: str, src: str, tgt: str): | |
Args = namedtuple( | |
"Args", | |
[ | |
"moses_source_lang", | |
"moses_target_lang", | |
"moses_no_dash_splits", | |
"moses_no_escape", | |
], | |
) | |
args = Args( | |
moses_source_lang=src, | |
moses_target_lang=tgt, | |
moses_no_dash_splits=False, | |
moses_no_escape=False, | |
) | |
pretokenizer = MosesTokenizer(args) | |
with open(in_path) as f, open(out_path, "w") as f_o: | |
for s in f: | |
f_o.write(pretokenizer.encode(s.strip()) + "\n") | |
def _convert_to_bchar(in_path_prefix: str, src: str, tgt: str, out_path: str): | |
with open(out_path, "w") as f_o: | |
for lang in [src, tgt]: | |
with open(f"{in_path_prefix}.{lang}") as f: | |
for s in f: | |
f_o.write(byte_encode(s.strip()) + "\n") | |
def _get_bpe(in_path: str, model_prefix: str, vocab_size: int): | |
arguments = [ | |
f"--input={in_path}", | |
f"--model_prefix={model_prefix}", | |
f"--model_type=bpe", | |
f"--vocab_size={vocab_size}", | |
"--character_coverage=1.0", | |
"--normalization_rule_name=identity", | |
f"--num_threads={cpu_count()}", | |
] | |
sp.SentencePieceTrainer.Train(" ".join(arguments)) | |
def _apply_bbpe(model_path: str, in_path: str, out_path: str): | |
Args = namedtuple("Args", ["sentencepiece_model_path"]) | |
args = Args(sentencepiece_model_path=model_path) | |
tokenizer = ByteBPE(args) | |
with open(in_path) as f, open(out_path, "w") as f_o: | |
for s in f: | |
f_o.write(tokenizer.encode(s.strip()) + "\n") | |
def _apply_bpe(model_path: str, in_path: str, out_path: str): | |
Args = namedtuple("Args", ["sentencepiece_model"]) | |
args = Args(sentencepiece_model=model_path) | |
tokenizer = SentencepieceBPE(args) | |
with open(in_path) as f, open(out_path, "w") as f_o: | |
for s in f: | |
f_o.write(tokenizer.encode(s.strip()) + "\n") | |
def _concat_files(in_paths: List[str], out_path: str): | |
with open(out_path, "w") as f_o: | |
for p in in_paths: | |
with open(p) as f: | |
for r in f: | |
f_o.write(r) | |
def preprocess_iwslt17( | |
root: str, | |
src: str, | |
tgt: str, | |
bpe_size: Optional[int], | |
need_chars: bool, | |
bbpe_size: Optional[int], | |
need_bytes: bool, | |
): | |
# extract bitext | |
in_root = op.join(root, f"{src}-{tgt}") | |
for lang in [src, tgt]: | |
_convert_train( | |
op.join(in_root, f"train.tags.{src}-{tgt}.{lang}"), | |
op.join(root, f"train.{lang}"), | |
) | |
_convert_xml( | |
op.join(in_root, f"IWSLT17.TED.dev2010.{src}-{tgt}.{lang}.xml"), | |
op.join(root, f"valid.{lang}"), | |
) | |
_convert_xml( | |
op.join(in_root, f"IWSLT17.TED.tst2015.{src}-{tgt}.{lang}.xml"), | |
op.join(root, f"test.{lang}"), | |
) | |
# pre-tokenize | |
for lang in [src, tgt]: | |
for split in SPLITS: | |
pretokenize( | |
op.join(root, f"{split}.{lang}"), | |
op.join(root, f"{split}.moses.{lang}"), | |
src, | |
tgt, | |
) | |
# tokenize with BPE vocabulary | |
if bpe_size is not None: | |
# learn vocabulary | |
concated_train_path = op.join(root, "train.all") | |
_concat_files( | |
[op.join(root, "train.moses.fr"), op.join(root, "train.moses.en")], | |
concated_train_path, | |
) | |
bpe_model_prefix = op.join(root, f"spm_bpe{bpe_size}") | |
_get_bpe(concated_train_path, bpe_model_prefix, bpe_size) | |
os.remove(concated_train_path) | |
# apply | |
for lang in [src, tgt]: | |
for split in SPLITS: | |
_apply_bpe( | |
bpe_model_prefix + ".model", | |
op.join(root, f"{split}.moses.{lang}"), | |
op.join(root, f"{split}.moses.bpe{bpe_size}.{lang}"), | |
) | |
# tokenize with bytes vocabulary | |
if need_bytes: | |
for lang in [src, tgt]: | |
for split in SPLITS: | |
_get_bytes( | |
op.join(root, f"{split}.moses.{lang}"), | |
op.join(root, f"{split}.moses.bytes.{lang}"), | |
) | |
# tokenize with characters vocabulary | |
if need_chars: | |
for lang in [src, tgt]: | |
for split in SPLITS: | |
_get_chars( | |
op.join(root, f"{split}.moses.{lang}"), | |
op.join(root, f"{split}.moses.chars.{lang}"), | |
) | |
# tokenize with byte-level BPE vocabulary | |
if bbpe_size is not None: | |
# learn vocabulary | |
bchar_path = op.join(root, "train.bchar") | |
_convert_to_bchar(op.join(root, "train.moses"), src, tgt, bchar_path) | |
bbpe_model_prefix = op.join(root, f"spm_bbpe{bbpe_size}") | |
_get_bpe(bchar_path, bbpe_model_prefix, bbpe_size) | |
os.remove(bchar_path) | |
# apply | |
for lang in [src, tgt]: | |
for split in SPLITS: | |
_apply_bbpe( | |
bbpe_model_prefix + ".model", | |
op.join(root, f"{split}.moses.{lang}"), | |
op.join(root, f"{split}.moses.bbpe{bbpe_size}.{lang}"), | |
) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--root", type=str, default="data") | |
parser.add_argument( | |
"--bpe-vocab", | |
default=None, | |
type=int, | |
help="Generate tokenized bitext with BPE of size K." | |
"Default to None (disabled).", | |
) | |
parser.add_argument( | |
"--bbpe-vocab", | |
default=None, | |
type=int, | |
help="Generate tokenized bitext with BBPE of size K." | |
"Default to None (disabled).", | |
) | |
parser.add_argument( | |
"--byte-vocab", | |
action="store_true", | |
help="Generate tokenized bitext with bytes vocabulary", | |
) | |
parser.add_argument( | |
"--char-vocab", | |
action="store_true", | |
help="Generate tokenized bitext with chars vocabulary", | |
) | |
args = parser.parse_args() | |
preprocess_iwslt17( | |
args.root, | |
"fr", | |
"en", | |
args.bpe_vocab, | |
args.char_vocab, | |
args.bbpe_vocab, | |
args.byte_vocab, | |
) | |
if __name__ == "__main__": | |
main() | |