Spaces:
Running
Running
import os | |
import sys | |
from collections import Counter | |
from pathlib import Path | |
from typing import Callable, Dict, List, Tuple, Union | |
import numpy as np | |
from TTS.tts.datasets.dataset import * | |
from TTS.tts.datasets.formatters import * | |
def split_dataset(items, eval_split_max_size=None, eval_split_size=0.01): | |
"""Split a dataset into train and eval. Consider speaker distribution in multi-speaker training. | |
Args: | |
items (List[List]): | |
A list of samples. Each sample is a list of `[audio_path, text, speaker_id]`. | |
eval_split_max_size (int): | |
Number maximum of samples to be used for evaluation in proportion split. Defaults to None (Disabled). | |
eval_split_size (float): | |
If between 0.0 and 1.0 represents the proportion of the dataset to include in the evaluation set. | |
If > 1, represents the absolute number of evaluation samples. Defaults to 0.01 (1%). | |
""" | |
speakers = [item["speaker_name"] for item in items] | |
is_multi_speaker = len(set(speakers)) > 1 | |
if eval_split_size > 1: | |
eval_split_size = int(eval_split_size) | |
else: | |
if eval_split_max_size: | |
eval_split_size = min(eval_split_max_size, int(len(items) * eval_split_size)) | |
else: | |
eval_split_size = int(len(items) * eval_split_size) | |
assert ( | |
eval_split_size > 0 | |
), " [!] You do not have enough samples for the evaluation set. You can work around this setting the 'eval_split_size' parameter to a minimum of {}".format( | |
1 / len(items) | |
) | |
np.random.seed(0) | |
np.random.shuffle(items) | |
if is_multi_speaker: | |
items_eval = [] | |
speakers = [item["speaker_name"] for item in items] | |
speaker_counter = Counter(speakers) | |
while len(items_eval) < eval_split_size: | |
item_idx = np.random.randint(0, len(items)) | |
speaker_to_be_removed = items[item_idx]["speaker_name"] | |
if speaker_counter[speaker_to_be_removed] > 1: | |
items_eval.append(items[item_idx]) | |
speaker_counter[speaker_to_be_removed] -= 1 | |
del items[item_idx] | |
return items_eval, items | |
return items[:eval_split_size], items[eval_split_size:] | |
def add_extra_keys(metadata, language, dataset_name): | |
for item in metadata: | |
# add language name | |
item["language"] = language | |
# add unique audio name | |
relfilepath = os.path.splitext(os.path.relpath(item["audio_file"], item["root_path"]))[0] | |
audio_unique_name = f"{dataset_name}#{relfilepath}" | |
item["audio_unique_name"] = audio_unique_name | |
return metadata | |
def load_tts_samples( | |
datasets: Union[List[Dict], Dict], | |
eval_split=True, | |
formatter: Callable = None, | |
eval_split_max_size=None, | |
eval_split_size=0.01, | |
) -> Tuple[List[List], List[List]]: | |
"""Parse the dataset from the datasets config, load the samples as a List and load the attention alignments if provided. | |
If `formatter` is not None, apply the formatter to the samples else pick the formatter from the available ones based | |
on the dataset name. | |
Args: | |
datasets (List[Dict], Dict): A list of datasets or a single dataset dictionary. If multiple datasets are | |
in the list, they are all merged. | |
eval_split (bool, optional): If true, create a evaluation split. If an eval split provided explicitly, generate | |
an eval split automatically. Defaults to True. | |
formatter (Callable, optional): The preprocessing function to be applied to create the list of samples. It | |
must take the root_path and the meta_file name and return a list of samples in the format of | |
`[[text, audio_path, speaker_id], ...]]`. See the available formatters in `TTS.tts.dataset.formatter` as | |
example. Defaults to None. | |
eval_split_max_size (int): | |
Number maximum of samples to be used for evaluation in proportion split. Defaults to None (Disabled). | |
eval_split_size (float): | |
If between 0.0 and 1.0 represents the proportion of the dataset to include in the evaluation set. | |
If > 1, represents the absolute number of evaluation samples. Defaults to 0.01 (1%). | |
Returns: | |
Tuple[List[List], List[List]: training and evaluation splits of the dataset. | |
""" | |
meta_data_train_all = [] | |
meta_data_eval_all = [] if eval_split else None | |
if not isinstance(datasets, list): | |
datasets = [datasets] | |
for dataset in datasets: | |
formatter_name = dataset["formatter"] | |
dataset_name = dataset["dataset_name"] | |
root_path = dataset["path"] | |
meta_file_train = dataset["meta_file_train"] | |
meta_file_val = dataset["meta_file_val"] | |
ignored_speakers = dataset["ignored_speakers"] | |
language = dataset["language"] | |
# setup the right data processor | |
if formatter is None: | |
formatter = _get_formatter_by_name(formatter_name) | |
# load train set | |
meta_data_train = formatter(root_path, meta_file_train, ignored_speakers=ignored_speakers) | |
assert len(meta_data_train) > 0, f" [!] No training samples found in {root_path}/{meta_file_train}" | |
meta_data_train = add_extra_keys(meta_data_train, language, dataset_name) | |
print(f" | > Found {len(meta_data_train)} files in {Path(root_path).resolve()}") | |
# load evaluation split if set | |
if eval_split: | |
if meta_file_val: | |
meta_data_eval = formatter(root_path, meta_file_val, ignored_speakers=ignored_speakers) | |
meta_data_eval = add_extra_keys(meta_data_eval, language, dataset_name) | |
else: | |
eval_size_per_dataset = eval_split_max_size // len(datasets) if eval_split_max_size else None | |
meta_data_eval, meta_data_train = split_dataset(meta_data_train, eval_size_per_dataset, eval_split_size) | |
meta_data_eval_all += meta_data_eval | |
meta_data_train_all += meta_data_train | |
# load attention masks for the duration predictor training | |
if dataset.meta_file_attn_mask: | |
meta_data = dict(load_attention_mask_meta_data(dataset["meta_file_attn_mask"])) | |
for idx, ins in enumerate(meta_data_train_all): | |
attn_file = meta_data[ins["audio_file"]].strip() | |
meta_data_train_all[idx].update({"alignment_file": attn_file}) | |
if meta_data_eval_all: | |
for idx, ins in enumerate(meta_data_eval_all): | |
attn_file = meta_data[ins["audio_file"]].strip() | |
meta_data_eval_all[idx].update({"alignment_file": attn_file}) | |
# set none for the next iter | |
formatter = None | |
return meta_data_train_all, meta_data_eval_all | |
def load_attention_mask_meta_data(metafile_path): | |
"""Load meta data file created by compute_attention_masks.py""" | |
with open(metafile_path, "r", encoding="utf-8") as f: | |
lines = f.readlines() | |
meta_data = [] | |
for line in lines: | |
wav_file, attn_file = line.split("|") | |
meta_data.append([wav_file, attn_file]) | |
return meta_data | |
def _get_formatter_by_name(name): | |
"""Returns the respective preprocessing function.""" | |
thismodule = sys.modules[__name__] | |
return getattr(thismodule, name.lower()) | |
def find_unique_chars(data_samples, verbose=True): | |
texts = "".join(item[0] for item in data_samples) | |
chars = set(texts) | |
lower_chars = filter(lambda c: c.islower(), chars) | |
chars_force_lower = [c.lower() for c in chars] | |
chars_force_lower = set(chars_force_lower) | |
if verbose: | |
print(f" > Number of unique characters: {len(chars)}") | |
print(f" > Unique characters: {''.join(sorted(chars))}") | |
print(f" > Unique lower characters: {''.join(sorted(lower_chars))}") | |
print(f" > Unique all forced to lower characters: {''.join(sorted(chars_force_lower))}") | |
return chars_force_lower | |