|
|
|
import argparse |
|
import re |
|
from typing import Dict |
|
|
|
from datasets import Audio, Dataset, load_dataset, load_metric |
|
from transformers import AutoFeatureExtractor, pipeline |
|
from transformers import Wav2Vec2CTCTokenizer |
|
|
|
class Wav2Vec2WordpieceTokenizer(Wav2Vec2CTCTokenizer): |
|
def __init__( |
|
self, |
|
vocab_file, |
|
bos_token="<s>", |
|
eos_token="</s>", |
|
unk_token="<unk>", |
|
pad_token="<pad>", |
|
word_delimiter_token="|", |
|
do_lower_case=False, |
|
**kwargs |
|
): |
|
super().__init__( |
|
vocab_file=vocab_file, |
|
unk_token=unk_token, |
|
bos_token=bos_token, |
|
eos_token=eos_token, |
|
pad_token=pad_token, |
|
do_lower_case=do_lower_case, |
|
word_delimiter_token=word_delimiter_token, |
|
**kwargs, |
|
) |
|
|
|
self._create_trie(self.all_special_tokens_extended) |
|
|
|
def _tokenize(self, text, **kwargs): |
|
""" |
|
Converts a string in a sequence of tokens (string), using the tokenizer. |
|
""" |
|
special_cases = set(['gia', 'qui', 'quy', 'que', 'qua']) |
|
output_tokens = [] |
|
for token_idx, token in enumerate(text.split()): |
|
if token in special_cases: |
|
sub_tokens = [token[:2], token[2:]] |
|
else: |
|
end = len(token) |
|
sub_tokens = [] |
|
while end > 0: |
|
start = 0 |
|
cur_substr = None |
|
while start < end: |
|
substr = token[start:end] |
|
if substr in self.encoder: |
|
cur_substr = substr |
|
break |
|
start += 1 |
|
if cur_substr is None: |
|
sub_tokens.insert(0, self.unk_token) |
|
end = start - 1 |
|
else: |
|
sub_tokens.insert(0, cur_substr) |
|
end = start |
|
|
|
if token_idx > 0: |
|
output_tokens.append(self.word_delimiter_token) |
|
output_tokens.extend(sub_tokens) |
|
return output_tokens |
|
|
|
def decode_ids( |
|
self, |
|
token_ids, |
|
skip_special_tokens = False, |
|
clean_up_tokenization_spaces = True, |
|
group_tokens: bool = True, |
|
spaces_between_special_tokens: bool = False, |
|
) -> str: |
|
|
|
return self.decode( |
|
token_ids, |
|
skip_special_tokens=skip_special_tokens, |
|
clean_up_tokenization_spaces=clean_up_tokenization_spaces, |
|
group_tokens=group_tokens, |
|
spaces_between_special_tokens=spaces_between_special_tokens |
|
) |
|
|
|
def log_results(result: Dataset, args: Dict[str, str]): |
|
"""DO NOT CHANGE. This function computes and logs the result metrics.""" |
|
|
|
log_outputs = args.log_outputs |
|
dataset_id = "_".join(args.dataset.split("/") + [args.config, args.split]) |
|
|
|
|
|
wer = load_metric("wer") |
|
cer = load_metric("cer") |
|
|
|
|
|
wer_result = wer.compute(references=result["target"], predictions=result["prediction"]) |
|
cer_result = cer.compute(references=result["target"], predictions=result["prediction"]) |
|
|
|
|
|
result_str = f"WER: {wer_result}\n" f"CER: {cer_result}" |
|
print(result_str) |
|
|
|
with open(f"{dataset_id}_eval_results.txt", "w") as f: |
|
f.write(result_str) |
|
|
|
|
|
if log_outputs is not None: |
|
pred_file = f"log_{dataset_id}_predictions.txt" |
|
target_file = f"log_{dataset_id}_targets.txt" |
|
|
|
with open(pred_file, "w") as p, open(target_file, "w") as t: |
|
|
|
|
|
def write_to_file(batch, i): |
|
p.write(f"{i}" + "\n") |
|
p.write(batch["prediction"] + "\n") |
|
t.write(f"{i}" + "\n") |
|
t.write(batch["target"] + "\n") |
|
|
|
result.map(write_to_file, with_indices=True) |
|
|
|
|
|
def normalize_text(text: str) -> str: |
|
"""DO ADAPT FOR YOUR USE CASE. this function normalizes the target text.""" |
|
|
|
chars_to_ignore_regex = '[,?.!\-\;\:"β%ββοΏ½βββ¦β|]' |
|
|
|
text = re.sub(chars_to_ignore_regex, "", text.lower()) |
|
|
|
|
|
|
|
token_sequences_to_ignore = ["\n\n", "\n", " ", " "] |
|
|
|
for t in token_sequences_to_ignore: |
|
text = " ".join(text.split(t)) |
|
|
|
return text |
|
|
|
|
|
def main(args): |
|
|
|
dataset = load_dataset(args.dataset, args.config, split=args.split, use_auth_token=True) |
|
|
|
|
|
dataset = dataset.select(range(10)) |
|
|
|
|
|
feature_extractor = AutoFeatureExtractor.from_pretrained(args.model_id) |
|
sampling_rate = feature_extractor.sampling_rate |
|
|
|
|
|
tokenizer = Wav2Vec2WordpieceTokenizer( |
|
vocab_file = args.model_id + 'vocab.json', |
|
) |
|
|
|
|
|
dataset = dataset.cast_column("audio", Audio(sampling_rate=sampling_rate)) |
|
|
|
|
|
asr = pipeline( |
|
"automatic-speech-recognition", |
|
model=args.model_id, |
|
tokenizer = tokenizer |
|
) |
|
|
|
def map_to_pred(batch): |
|
prediction = asr( |
|
batch["audio"]["array"], chunk_length_s=args.chunk_length_s, stride_length_s=args.stride_length_s |
|
) |
|
|
|
batch["prediction"] = prediction["text"] |
|
batch["target"] = normalize_text(batch["sentence"]) |
|
return batch |
|
|
|
|
|
result = dataset.map(map_to_pred, remove_columns=dataset.column_names) |
|
|
|
|
|
|
|
log_results(result, args) |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument( |
|
"--model_id", type=str, required=True, help="Model identifier. Should be loadable with π€ Transformers" |
|
) |
|
parser.add_argument( |
|
"--dataset", |
|
type=str, |
|
required=True, |
|
help="Dataset name to evaluate the `model_id`. Should be loadable with π€ Datasets", |
|
) |
|
parser.add_argument( |
|
"--config", type=str, required=True, help="Config of the dataset. *E.g.* `'en'` for Common Voice" |
|
) |
|
parser.add_argument("--split", type=str, required=True, help="Split of the dataset. *E.g.* `'test'`") |
|
parser.add_argument( |
|
"--chunk_length_s", type=float, default=None, help="Chunk length in seconds. Defaults to 5 seconds." |
|
) |
|
parser.add_argument( |
|
"--stride_length_s", type=float, default=None, help="Stride of the audio chunks. Defaults to 1 second." |
|
) |
|
parser.add_argument( |
|
"--log_outputs", action="store_true", help="If defined, write outputs to log file for analysis." |
|
) |
|
args = parser.parse_args() |
|
|
|
main(args) |
|
|