|
|
|
import argparse |
|
import re |
|
from typing import Dict |
|
|
|
import torch |
|
from datasets import Audio, Dataset, load_dataset, load_metric |
|
|
|
from transformers import AutoFeatureExtractor, pipeline |
|
|
|
import re |
|
from num2words import num2words |
|
|
|
|
|
def log_results(result: Dataset, args: Dict[str, str]): |
|
"""DO NOT CHANGE. This function computes and logs the result metrics.""" |
|
|
|
log_outputs = args.log_outputs |
|
dataset_id = "_".join(args.dataset.split("/") + [args.config, args.split]) |
|
|
|
|
|
wer = load_metric("wer") |
|
cer = load_metric("cer") |
|
|
|
|
|
wer_result = wer.compute(references=result["target"], predictions=result["prediction"]) |
|
cer_result = cer.compute(references=result["target"], predictions=result["prediction"]) |
|
|
|
|
|
result_str = f"WER: {wer_result}\n" f"CER: {cer_result}" |
|
print(result_str) |
|
|
|
with open(f"{dataset_id}_eval_results.txt", "w") as f: |
|
f.write(result_str) |
|
|
|
|
|
if log_outputs is not None: |
|
pred_file = f"log_{dataset_id}_predictions.txt" |
|
target_file = f"log_{dataset_id}_targets.txt" |
|
|
|
with open(pred_file, "w") as p, open(target_file, "w") as t: |
|
|
|
|
|
def write_to_file(batch, i): |
|
p.write(f"{i}" + "\n") |
|
p.write(batch["prediction"] + "\n") |
|
t.write(f"{i}" + "\n") |
|
t.write(batch["target"] + "\n") |
|
|
|
result.map(write_to_file, with_indices=True) |
|
|
|
|
|
def spell_num(text): |
|
l = [] |
|
for t in text.split(): |
|
if t.isdigit(): |
|
l.append(num2words(t, lang='de')) |
|
else: |
|
l.append(t) |
|
|
|
return ' '.join(l) |
|
|
|
ALLOWED_CHARS = { |
|
'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', |
|
'ä', 'ö', 'ü', |
|
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', |
|
' ', |
|
',', ';', ':', '.', '?', '!', |
|
} |
|
WHITESPACE_REGEX = re.compile(r'[ \t]+') |
|
|
|
|
|
def preprocess_transcript_for_corpus(transcript): |
|
transcript = transcript.lower() |
|
transcript = transcript.replace('á', 'a') |
|
transcript = transcript.replace('à', 'a') |
|
transcript = transcript.replace('â', 'a') |
|
transcript = transcript.replace('ç', 'c') |
|
transcript = transcript.replace('é', 'e') |
|
transcript = transcript.replace('è', 'e') |
|
transcript = transcript.replace('ê', 'e') |
|
transcript = transcript.replace('í', 'i') |
|
transcript = transcript.replace('ì', 'i') |
|
transcript = transcript.replace('î', 'i') |
|
transcript = transcript.replace('ñ', 'n') |
|
transcript = transcript.replace('ó', 'o') |
|
transcript = transcript.replace('ò', 'o') |
|
transcript = transcript.replace('ô', 'o') |
|
transcript = transcript.replace('ú', 'u') |
|
transcript = transcript.replace('ù', 'u') |
|
transcript = transcript.replace('û', 'u') |
|
transcript = transcript.replace('ș', 's') |
|
transcript = transcript.replace('ş', 's') |
|
transcript = transcript.replace('ß', 'ss') |
|
transcript = transcript.replace('-', ' ') |
|
|
|
transcript = transcript.replace('–', ' ') |
|
transcript = transcript.replace('/', ' ') |
|
transcript = WHITESPACE_REGEX.sub(' ', transcript) |
|
transcript = ''.join([char for char in transcript if char in ALLOWED_CHARS]) |
|
transcript = WHITESPACE_REGEX.sub(' ', transcript) |
|
transcript = spell_num(transcript) |
|
transcript = transcript.replace('ß', 'ss') |
|
transcript = transcript.strip() |
|
|
|
return transcript |
|
|
|
def normalize_text(text: str) -> str: |
|
"""DO ADAPT FOR YOUR USE CASE. this function normalizes the target text.""" |
|
|
|
text = preprocess_transcript_for_corpus(txt) |
|
chars_to_ignore_regex = '[,?.!\-\;\:"“%‘”�—’…–]' |
|
text = re.sub(chars_to_ignore_regex, "", text.lower()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def main(args): |
|
|
|
dataset = load_dataset(args.dataset, args.config, split=args.split, use_auth_token=True) |
|
|
|
|
|
|
|
|
|
|
|
feature_extractor = AutoFeatureExtractor.from_pretrained(args.model_id) |
|
sampling_rate = feature_extractor.sampling_rate |
|
|
|
|
|
dataset = dataset.cast_column("audio", Audio(sampling_rate=sampling_rate)) |
|
|
|
|
|
if args.device is None: |
|
args.device = 0 if torch.cuda.is_available() else -1 |
|
asr = pipeline("automatic-speech-recognition", model=args.model_id, device=args.device) |
|
|
|
|
|
def map_to_pred(batch): |
|
prediction = asr( |
|
batch["audio"]["array"], chunk_length_s=args.chunk_length_s, stride_length_s=args.stride_length_s |
|
) |
|
|
|
batch["prediction"] = prediction["text"] |
|
batch["target"] = normalize_text(batch["sentence"]) |
|
return batch |
|
|
|
|
|
result = dataset.map(map_to_pred, remove_columns=dataset.column_names) |
|
|
|
|
|
|
|
log_results(result, args) |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument( |
|
"--model_id", type=str, required=True, help="Model identifier. Should be loadable with 🤗 Transformers" |
|
) |
|
parser.add_argument( |
|
"--dataset", |
|
type=str, |
|
required=True, |
|
help="Dataset name to evaluate the `model_id`. Should be loadable with 🤗 Datasets", |
|
) |
|
parser.add_argument( |
|
"--config", type=str, required=True, help="Config of the dataset. *E.g.* `'en'` for Common Voice" |
|
) |
|
parser.add_argument("--split", type=str, required=True, help="Split of the dataset. *E.g.* `'test'`") |
|
parser.add_argument( |
|
"--chunk_length_s", type=float, default=None, help="Chunk length in seconds. Defaults to 5 seconds." |
|
) |
|
parser.add_argument( |
|
"--stride_length_s", type=float, default=None, help="Stride of the audio chunks. Defaults to 1 second." |
|
) |
|
parser.add_argument( |
|
"--log_outputs", action="store_true", help="If defined, write outputs to log file for analysis." |
|
) |
|
parser.add_argument( |
|
"--device", |
|
type=int, |
|
default=None, |
|
help="The device to run the pipeline on. -1 for CPU (default), 0 for the first GPU and so on.", |
|
) |
|
args = parser.parse_args() |
|
|
|
main(args) |
|
|