spoken-norm-taggen / data_handling.py
HuyenNguyen's picture
Duplicate from nguyenvulebinh/spoken-norm-taggen
394811b
import datasets
import model_handling
from transformers import PreTrainedTokenizerBase
from typing import Optional, Union, Any
from transformers.file_utils import PaddingStrategy
import re
import os
from tqdm import tqdm
# import time
import json
import random
import regtag
from dataclasses import dataclass
import validators
import utils
regexp = re.compile(r"\d{4}[\-/]\d{2}[\-/]\d{2}t\d{2}:\d{2}:\d{2}")
target_bias_words = set(regtag.get_general_en_word())
tokenizer = None
def get_bias_words():
regtag.augment.get_random_oov()
return list(regtag.augment.oov_dict.keys())
def check_common_phrase(word):
if validators.email(word.replace(' @', '@')):
return True
if validators.domain(word):
return True
if validators.url(word):
return True
if word in regtag.get_general_en_word():
return True
return False
@dataclass
class DataCollatorForNormSeq2Seq:
tokenizer: PreTrainedTokenizerBase
model: Optional[Any] = None
padding: Union[bool, str, PaddingStrategy] = True
max_length: Optional[int] = None
pad_to_multiple_of: Optional[int] = None
label_pad_token_id: int = -100
return_tensors: str = "pt"
def bias_phrases_extractor(self, features, max_bias_per_sample=30):
# src_ids, src_length, tgt_ids, tgt_length
phrase_candidate = []
sample_output_words = []
bias_labels = []
for sample in features:
words = []
for idx, (src_word_len, tgt_word_len) in enumerate(zip(sample['inputs_length'], sample['outputs_length'])):
src_start_idx = sum(sample['inputs_length'][:idx])
tgt_start_idx = sum(sample['outputs_length'][:idx])
word_input = self.tokenizer.decode(sample['input_ids'][src_start_idx: src_start_idx + src_word_len])
word_output = self.tokenizer.decode(sample['outputs'][tgt_start_idx: tgt_start_idx + tgt_word_len])
words.append(word_output)
if word_input != word_output and not any(map(str.isdigit, word_output)):
phrase_candidate.append(word_output)
sample_output_words.append(words)
phrase_candidate = list(set(phrase_candidate))
phrase_candidate_revised = []
phrase_candidate_common = []
raw_phrase_candidate = []
for item in phrase_candidate:
raw_item = self.tokenizer.sp_model.DecodePieces(item.split())
if check_common_phrase(raw_item):
phrase_candidate_common.append(raw_item)
else:
phrase_candidate_revised.append(item)
raw_phrase_candidate.append(raw_item)
remain_phrase = max(0, max_bias_per_sample * len(features) - len(phrase_candidate_revised))
if remain_phrase > 0:
words_candidate = list(
set(get_bias_words()) - set(raw_phrase_candidate))
random.shuffle(words_candidate)
phrase_candidate_revised += [' '.join(self.tokenizer.sp_model.EncodeAsPieces(item)[:5]) for item in
words_candidate[:remain_phrase]]
for i in range(len(features)):
sample_bias_lables = []
for w_idx, w in enumerate(sample_output_words[i]):
try:
sample_bias_lables.extend(
[phrase_candidate_revised.index(w) + 1] * features[i]['outputs_length'][w_idx])
except:
# random ignore 0 label
if random.random() < 0.5:
sample_bias_lables.extend([0] * features[i]['outputs_length'][w_idx])
else:
sample_bias_lables.extend([self.label_pad_token_id] * features[i]['outputs_length'][w_idx])
bias_labels.append(sample_bias_lables)
assert len(sample_bias_lables) == len(features[i]['outputs']), "{} vs {}".format(sample_bias_lables,
features[i]['outputs'])
# phrase_candidate_ids = [self.tokenizer.encode(item) for item in phrase_candidate]
phrase_candidate_ids = [self.tokenizer.encode(self.tokenizer.sp_model.DecodePieces(item.split())) for item in
phrase_candidate_revised]
phrase_candidate_mask = [[self.tokenizer.pad_token_id] * len(item) for item in phrase_candidate_ids]
return phrase_candidate_ids, phrase_candidate_mask, bias_labels
# pass
def encode_list_string(self, list_text):
text_tokenized = self.tokenizer(list_text)
return self.tokenizer.pad(
text_tokenized,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors='pt',
)
def __call__(self, features, return_tensors=None):
# start_time = time.time()
batch_src, batch_tgt = [], []
for item in features:
src_spans, tgt_spans = utils.make_spoken(item['text'])
batch_src.append(src_spans)
batch_tgt.append(tgt_spans)
# print("Make src-tgt {}s".format(time.time() - start_time))
# start_time = time.time()
features = preprocess_function({"src": batch_src, "tgt": batch_tgt})
# print("Make feature {}s".format(time.time() - start_time))
# start_time = time.time()
phrase_candidate_ids, phrase_candidate_mask, samples_bias_labels = self.bias_phrases_extractor(features)
# print("Make bias {}s".format(time.time() - start_time))
# start_time = time.time()
if return_tensors is None:
return_tensors = self.return_tensors
labels = [feature["outputs"] for feature in features] if "outputs" in features[0].keys() else None
spoken_labels = [feature["spoken_label"] for feature in features] if "spoken_label" in features[0].keys() else None
spoken_idx = [feature["src_spoken_idx"] for feature in features] if "src_spoken_idx" in features[0].keys() else None
word_src_lengths = [feature["inputs_length"] for feature in features] if "inputs_length" in features[0].keys() else None
word_tgt_lengths = [feature["outputs_length"] for feature in features] if "outputs_length" in features[0].keys() else None
# We have to pad the labels before calling `tokenizer.pad` as this method won't pad them and needs them of the
# same length to return tensors.
if labels is not None:
max_label_length = max(len(l) for l in labels)
max_src_length = max(len(l) for l in spoken_labels)
max_spoken_idx_length = max(len(l) for l in spoken_idx)
max_word_src_length = max(len(l) for l in word_src_lengths)
max_word_tgt_length = max(len(l) for l in word_tgt_lengths)
padding_side = self.tokenizer.padding_side
for feature, bias_labels in zip(features, samples_bias_labels):
remainder = [self.label_pad_token_id] * (max_label_length - len(feature["outputs"]))
remainder_word_tgt_length = [0] * (max_word_tgt_length - len(feature["outputs_length"]))
remainder_spoken = [self.label_pad_token_id] * (max_src_length - len(feature["spoken_label"]))
remainder_spoken_idx = [self.label_pad_token_id] * (max_spoken_idx_length - len(feature["src_spoken_idx"]))
remainder_word_src_length = [0] * (max_word_src_length - len(feature["inputs_length"]))
feature["labels"] = (
feature["outputs"] + [
self.tokenizer.eos_token_id] + remainder if padding_side == "right" else remainder + feature[
"outputs"] + [self.tokenizer.eos_token_id]
)
feature["labels_bias"] = (
bias_labels + [0] + remainder if padding_side == "right" else remainder + bias_labels + [0]
)
feature["spoken_label"] = [self.label_pad_token_id] + feature["spoken_label"] + [self.label_pad_token_id]
feature["spoken_label"] = feature["spoken_label"] + remainder_spoken if padding_side == "right" else remainder_spoken + feature["spoken_label"]
feature["src_spoken_idx"] = feature["src_spoken_idx"] + remainder_spoken_idx
feature['inputs_length'] = [1] + feature['inputs_length'] + [1]
feature['outputs_length'] = feature['outputs_length'] + [1]
feature["inputs_length"] = feature["inputs_length"] + remainder_word_src_length
feature["outputs_length"] = feature["outputs_length"] + remainder_word_tgt_length
features_inputs = [{
"input_ids": [self.tokenizer.bos_token_id] + item["input_ids"] + [self.tokenizer.eos_token_id],
"attention_mask": [self.tokenizer.pad_token_id] + item["attention_mask"] + [self.tokenizer.pad_token_id]
} for item in features]
features_inputs = self.tokenizer.pad(
features_inputs,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=return_tensors,
)
bias_phrases_inputs = [{
"input_ids": ids,
"attention_mask": mask
} for ids, mask in zip(phrase_candidate_ids, phrase_candidate_mask)]
bias_phrases_inputs = self.tokenizer.pad(
bias_phrases_inputs,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=return_tensors,
)
outputs = self.tokenizer.pad({"input_ids": [feature["labels"] for feature in features]},
return_tensors=return_tensors)['input_ids']
outputs_bias = self.tokenizer.pad({"input_ids": [feature["labels_bias"] for feature in features]},
return_tensors=return_tensors)['input_ids']
spoken_label = self.tokenizer.pad({"input_ids": [feature["spoken_label"] for feature in features]},
return_tensors=return_tensors)['input_ids']
spoken_idx = self.tokenizer.pad({"input_ids": [feature["src_spoken_idx"] for feature in features]},
return_tensors=return_tensors)['input_ids'] + 1 # 1 for bos token
word_src_lengths = self.tokenizer.pad({"input_ids": [feature["inputs_length"] for feature in features]},
return_tensors=return_tensors)['input_ids']
word_tgt_lengths = self.tokenizer.pad({"input_ids": [feature["outputs_length"] for feature in features]},
return_tensors=return_tensors)['input_ids']
features = {
"input_ids": features_inputs["input_ids"],
"spoken_label": spoken_label,
"spoken_idx": spoken_idx,
"word_src_lengths": word_src_lengths,
"word_tgt_lengths": word_tgt_lengths,
"attention_mask": features_inputs["attention_mask"],
"bias_input_ids": bias_phrases_inputs["input_ids"],
"bias_attention_mask": bias_phrases_inputs["attention_mask"],
"labels": outputs,
"labels_bias": outputs_bias
}
# print("Make batch {}s".format(time.time() - start_time))
# start_time = time.time()
# prepare decoder_input_ids
if self.model is not None and hasattr(self.model, "prepare_decoder_input_ids_from_labels"):
decoder_input_ids = self.model.prepare_decoder_input_ids_from_labels(labels=features["labels"])
features["decoder_input_ids"] = decoder_input_ids
return features
# data init
def init_data(train_corpus_path='./data-bin/raw/train_raw.txt',
test_corpus_path='./data-bin/raw/valid_raw.txt'):
dataset_oov = datasets.load_dataset('text', data_files={"train": train_corpus_path,
"test": test_corpus_path})
print(dataset_oov)
return dataset_oov
def preprocess_function(batch):
global tokenizer
if tokenizer is None:
tokenizer = model_handling.init_tokenizer()
features = []
for src_words, tgt_words in zip(batch["src"], batch["tgt"]):
src_ids, pad_ids, src_lengths, tgt_ids, tgt_lengths = [], [], [], [], []
src_spoken_label = [] # 0: "O", 1: "B", 2: "I"
src_spoken_idx = []
tgt_spoken_ids = []
for idx, (src, tgt) in enumerate(zip(src_words, tgt_words)):
is_remain = False
if src == tgt:
is_remain = True
src_tokenized = tokenizer(src)
if len(src_tokenized['input_ids']) < 3:
continue
# hardcode fix tokenizer email
if validators.email(tgt):
tgt_tokenized = tokenizer(tgt.replace('@', ' @'))
else:
tgt_tokenized = tokenizer(tgt)
if len(tgt_tokenized['input_ids']) < 3:
continue
src_ids.extend(src_tokenized["input_ids"][1:-1])
if is_remain:
src_spoken_label.extend([0 if random.random() < 0.5 else -100 for _ in range(len(src_tokenized["input_ids"][1:-1]))])
if random.random() < 0.1:
# Random pick normal word for spoken norm
src_spoken_idx.append(idx)
tgt_spoken_ids.append(tgt_tokenized["input_ids"][1:-1])
else:
src_spoken_label.extend([1] + [2] * (len(src_tokenized["input_ids"][1:-1]) - 1))
src_spoken_idx.append(idx)
tgt_spoken_ids.append(tgt_tokenized["input_ids"][1:-1])
pad_ids.extend(src_tokenized["attention_mask"][1:-1])
src_lengths.append(len(src_tokenized["input_ids"]) - 2)
tgt_ids.extend(tgt_tokenized["input_ids"][1:-1])
tgt_lengths.append(len(tgt_tokenized["input_ids"]) - 2)
if len(src_ids) > 70 or len(tgt_ids) > 70:
# print("Ignore sample")
break
if len(src_ids) < 1 or len(tgt_ids) < 1:
continue
# else:
# print("ignore")
features.append({
"input_ids": src_ids,
"attention_mask": pad_ids,
"spoken_label": src_spoken_label,
"inputs_length": src_lengths,
"outputs": tgt_ids,
"outputs_length": tgt_lengths,
"src_spoken_idx": src_spoken_idx,
"tgt_spoken_ids": tgt_spoken_ids
})
return features
if __name__ == "__main__":
split_datasets = init_data()
model, model_tokenizer = model_handling.init_model()
data_collator = DataCollatorForNormSeq2Seq(model_tokenizer, model=model)
# start = time.time()
batch = data_collator([split_datasets["train"][i] for i in [random.randint(0, 900) for _ in range(0, 12)]])
print(batch)
# print("{}s".format(time.time() - start))