malmarz's picture
Training in progress, step 1000
d09a13e
import datasets
import os
_DESCRIPTION = "MGB2 speech recognition dataset AR"
_HOMEPAGE = "https://arabicspeech.org/mgb2/"
_LICENSE = "MGB-2 License agreement"
_CITATION = """@misc{https://doi.org/10.48550/arxiv.1609.05625,
doi = {10.48550/ARXIV.1609.05625},
url = {https://arxiv.org/abs/1609.05625},
author = {Ali, Ahmed and Bell, Peter and Glass, James and Messaoui, Yacine and Mubarak, Hamdy and Renals, Steve and Zhang, Yifan},
keywords = {Computation and Language (cs.CL), FOS: Computer and information sciences, FOS: Computer and information sciences},
title = {The MGB-2 Challenge: Arabic Multi-Dialect Broadcast Media Recognition},
publisher = {arXiv},
year = {2016},
copyright = {arXiv.org perpetual, non-exclusive license}
}
"""
_DATA_ARCHIVE_ROOT = "archives/"
_DATA_URL = {
"test": _DATA_ARCHIVE_ROOT + "mgb2_wav.test.tar.gz",
"dev": _DATA_ARCHIVE_ROOT + "mgb2_wav.dev.tar.gz",
"train": [_DATA_ARCHIVE_ROOT + f"mgb2_wav_{x}.train.tar.gz" for x in range(48)], # we have 48 archives
}
_TEXT_URL = {
"test": _DATA_ARCHIVE_ROOT + "mgb2_txt.test.tar.gz",
"dev": _DATA_ARCHIVE_ROOT + "mgb2_txt.dev.tar.gz",
"train": _DATA_ARCHIVE_ROOT + "mgb2_txt.train.tar.gz",
}
class MGDB2Dataset(datasets.GeneratorBasedBuilder):
def _info(self):
return datasets.DatasetInfo(
description=_DESCRIPTION,
features=datasets.Features(
{
"path": datasets.Value("string"),
"audio": datasets.Audio(sampling_rate=16_000),
"text": datasets.Value("string"),
}
),
supervised_keys=None,
homepage=_HOMEPAGE,
license=_LICENSE,
citation=_CITATION,
)
def _split_generators(self, dl_manager):
wav_archive = dl_manager.download(_DATA_URL)
txt_archive = dl_manager.download(_TEXT_URL)
test_dir = "dataset/test"
dev_dir = "dataset/dev"
train_dir = "dataset/train"
if dl_manager.is_streaming:
return [
datasets.SplitGenerator(
name=datasets.Split.TEST,
gen_kwargs={
"path_to_txt": test_dir + "/txt",
"path_to_wav": test_dir + "/wav",
"wav_files": [dl_manager.iter_archive(wav_archive['test'])],
"txt_files": dl_manager.iter_archive(txt_archive['test']),
},
),
datasets.SplitGenerator(
name=datasets.Split.VALIDATION,
gen_kwargs={
"path_to_txt": dev_dir + "/txt",
"path_to_wav": dev_dir + "/wav",
"wav_files": [dl_manager.iter_archive(wav_archive['dev'])],
"txt_files": dl_manager.iter_archive(txt_archive['dev']),
},
),
datasets.SplitGenerator(
name=datasets.Split.TRAIN,
gen_kwargs={
"path_to_txt": train_dir + "/txt",
"path_to_wav": train_dir + "/wav",
"wav_files": [dl_manager.iter_archive(a) for a in wav_archive['train']],
"txt_files": dl_manager.iter_archive(txt_archive['train']),
},
),
]
else:
return [
datasets.SplitGenerator(
name=datasets.Split.TEST,
gen_kwargs={
"path_to_txt": test_dir + "/txt",
"path_to_wav": test_dir + "/wav",
"wav_files": [dl_manager.extract(wav_archive['test'])],
"txt_files": dl_manager.extract(txt_archive['test']),
},
),
datasets.SplitGenerator(
name=datasets.Split.VALIDATION,
gen_kwargs={
"path_to_txt": dev_dir + "/txt",
"path_to_wav": dev_dir + "/wav",
"wav_files": [dl_manager.extract(wav_archive['dev'])],
"txt_files": dl_manager.extract(txt_archive['dev']),
},
),
datasets.SplitGenerator(
name=datasets.Split.TRAIN,
gen_kwargs={
"path_to_txt": train_dir + "/txt",
"path_to_wav": train_dir + "/wav",
"wav_files": [dl_manager.extract(a) for a in wav_archive['train']],
"txt_files": dl_manager.extract(txt_archive['train']),
},
),
]
def _generate_examples(self, path_to_txt, path_to_wav, wav_files, txt_files):
"""
This assumes that the text directory alphabetically precedes the wav dir
The file names for wav and text seem to match and are unique
We can use them for the dictionary matching them
"""
examples = {}
id_ = 0
# need to prepare the transcript - wave map
for item in txt_files:
if type(item) is tuple:
# iter_archive will return path and file
path, f = item
txt = f.read().decode(encoding="utf-8").strip()
else:
# extract will return path only
path = item
with open(path, encoding="utf-8") as f:
txt = f.read().strip()
if path.find(path_to_txt) > -1:
# construct the wav path
# which is used as an identifier
wav_path = os.path.split(path)[1].replace("_utf8", "").replace(".txt", ".wav").strip()
examples[wav_path] = {
"text": txt,
"path": wav_path,
}
for wf in wav_files:
for item in wf:
if type(item) is tuple:
path, f = item
wav_data = f.read()
else:
path = item
with open(path, "rb") as f:
wav_data = f.read()
if path.find(path_to_wav) > -1:
wav_path = os.path.split(path)[1].strip()
audio = {"path": path, "bytes": wav_data}
yield id_, {**examples[wav_path], "audio": audio}
id_ += 1