#!/usr/bin/env python3 import argparse import re from typing import Dict from datasets import Audio, Dataset, load_dataset, load_metric from transformers import AutoFeatureExtractor, pipeline, AutomaticSpeechRecognitionPipeline from transformers import Wav2Vec2CTCTokenizer class Wav2Vec2WordpieceTokenizer(Wav2Vec2CTCTokenizer): def __init__( self, vocab_file, bos_token="", eos_token="", unk_token="", pad_token="", word_delimiter_token="|", do_lower_case=False, **kwargs ): super().__init__( vocab_file=vocab_file, unk_token=unk_token, bos_token=bos_token, eos_token=eos_token, pad_token=pad_token, do_lower_case=do_lower_case, word_delimiter_token=word_delimiter_token, **kwargs, ) self._create_trie(self.all_special_tokens_extended) def _tokenize(self, text, **kwargs): """ Converts a string in a sequence of tokens (string), using the tokenizer. """ special_cases = set(['gia', 'qui', 'quy', 'que', 'qua']) output_tokens = [] for token_idx, token in enumerate(text.split()): if token in special_cases: sub_tokens = [token[:2], token[2:]] else: end = len(token) sub_tokens = [] while end > 0: start = 0 cur_substr = None while start < end: substr = token[start:end] if substr in self.encoder: cur_substr = substr break start += 1 if cur_substr is None: sub_tokens.insert(0, self.unk_token) end = start - 1 else: sub_tokens.insert(0, cur_substr) end = start if token_idx > 0: output_tokens.append(self.word_delimiter_token) output_tokens.extend(sub_tokens) return output_tokens def decode_ids( self, token_ids, skip_special_tokens = False, clean_up_tokenization_spaces = True, group_tokens: bool = True, spaces_between_special_tokens: bool = False, ) -> str: # For compatible with speechbrain interfaces return self.decode( token_ids, skip_special_tokens=skip_special_tokens, clean_up_tokenization_spaces=clean_up_tokenization_spaces, group_tokens=group_tokens, spaces_between_special_tokens=spaces_between_special_tokens ) def log_results(result: Dataset, args: Dict[str, str]): """DO NOT CHANGE. This function computes and logs the result metrics.""" log_outputs = args.log_outputs dataset_id = "_".join(args.dataset.split("/") + [args.config, args.split]) # load metric wer = load_metric("wer") cer = load_metric("cer") # compute metrics wer_result = wer.compute(references=result["target"], predictions=result["prediction"]) cer_result = cer.compute(references=result["target"], predictions=result["prediction"]) # print & log results result_str = f"WER: {wer_result}\n" f"CER: {cer_result}" print(result_str) with open(f"{dataset_id}_eval_results.txt", "w") as f: f.write(result_str) # log all results in text file. Possibly interesting for analysis if log_outputs is not None: pred_file = f"log_{dataset_id}_predictions.txt" target_file = f"log_{dataset_id}_targets.txt" with open(pred_file, "w") as p, open(target_file, "w") as t: # mapping function to write output def write_to_file(batch, i): p.write(f"{i}" + "\n") p.write(batch["prediction"] + "\n") t.write(f"{i}" + "\n") t.write(batch["target"] + "\n") result.map(write_to_file, with_indices=True) def normalize_text(text: str) -> str: """DO ADAPT FOR YOUR USE CASE. this function normalizes the target text.""" chars_to_ignore_regex = '[,?.!\-\;\:"“%‘”�—’…–|]' # noqa: W605 IMPORTANT: this should correspond to the chars that were ignored during training text = re.sub(chars_to_ignore_regex, "", text.lower()) # In addition, we can normalize the target text, e.g. removing new lines characters etc... # note that order is important here! token_sequences_to_ignore = ["\n\n", "\n", " ", " "] for t in token_sequences_to_ignore: text = " ".join(text.split(t)) return text def main(args): # load dataset dataset = load_dataset(args.dataset, args.config, split=args.split, use_auth_token=True) # for testing: only process the first two examples as a test dataset = dataset.select(range(10)) # load processor feature_extractor = AutoFeatureExtractor.from_pretrained(args.model_id) sampling_rate = feature_extractor.sampling_rate # load tokenizer tokenizer = Wav2Vec2WordpieceTokenizer( vocab_file = args.model_id + 'vocab.json', ) # resample audio dataset = dataset.cast_column("audio", Audio(sampling_rate=sampling_rate)) # load eval pipeline asr = pipeline( "automatic-speech-recognition", model=args.model_id, tokenizer = tokenizer ) # asr = AutomaticSpeechRecognitionPipeline( # ) # map function to decode audio def map_to_pred(batch): prediction = asr( batch["audio"]["array"], chunk_length_s=args.chunk_length_s, stride_length_s=args.stride_length_s ) batch["prediction"] = prediction["text"] batch["target"] = normalize_text(batch["sentence"]) return batch # run inference on all examples result = dataset.map(map_to_pred, remove_columns=dataset.column_names) # compute and log_results # do not change function below log_results(result, args) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--model_id", type=str, required=True, help="Model identifier. Should be loadable with 🤗 Transformers" ) parser.add_argument( "--dataset", type=str, required=True, help="Dataset name to evaluate the `model_id`. Should be loadable with 🤗 Datasets", ) parser.add_argument( "--config", type=str, required=True, help="Config of the dataset. *E.g.* `'en'` for Common Voice" ) parser.add_argument("--split", type=str, required=True, help="Split of the dataset. *E.g.* `'test'`") parser.add_argument( "--chunk_length_s", type=float, default=None, help="Chunk length in seconds. Defaults to 5 seconds." ) parser.add_argument( "--stride_length_s", type=float, default=None, help="Stride of the audio chunks. Defaults to 1 second." ) parser.add_argument( "--log_outputs", action="store_true", help="If defined, write outputs to log file for analysis." ) args = parser.parse_args() main(args)