|
import json
|
|
from collections import defaultdict
|
|
from random import shuffle
|
|
from typing import Optional
|
|
|
|
from tqdm import tqdm
|
|
import click
|
|
from text.cleaner import clean_text_bert
|
|
import os
|
|
import torch
|
|
from text.symbols import symbols, num_languages, num_tones
|
|
|
|
@click.command()
|
|
@click.option(
|
|
"--metadata",
|
|
default="data/example/metadata.list",
|
|
type=click.Path(exists=True, file_okay=True, dir_okay=False),
|
|
)
|
|
@click.option("--cleaned-path", default=None)
|
|
@click.option("--train-path", default=None)
|
|
@click.option("--val-path", default=None)
|
|
@click.option(
|
|
"--config_path",
|
|
default="configs/config.json",
|
|
type=click.Path(exists=True, file_okay=True, dir_okay=False),
|
|
)
|
|
@click.option("--val-per-spk", default=4)
|
|
@click.option("--max-val-total", default=8)
|
|
@click.option("--clean/--no-clean", default=True)
|
|
def main(
|
|
metadata: str,
|
|
cleaned_path: Optional[str],
|
|
train_path: str,
|
|
val_path: str,
|
|
config_path: str,
|
|
val_per_spk: int,
|
|
max_val_total: int,
|
|
clean: bool,
|
|
):
|
|
if train_path is None:
|
|
train_path = os.path.join(os.path.dirname(metadata), 'train.list')
|
|
if val_path is None:
|
|
val_path = os.path.join(os.path.dirname(metadata), 'val.list')
|
|
out_config_path = os.path.join(os.path.dirname(metadata), 'config.json')
|
|
|
|
if cleaned_path is None:
|
|
cleaned_path = metadata + ".cleaned"
|
|
|
|
if clean:
|
|
out_file = open(cleaned_path, "w", encoding="utf-8")
|
|
new_symbols = []
|
|
for line in tqdm(open(metadata, encoding="utf-8").readlines()):
|
|
try:
|
|
utt, spk, language, text = line.strip().split("|")
|
|
norm_text, phones, tones, word2ph, bert = clean_text_bert(text, language, device='cuda:0')
|
|
for ph in phones:
|
|
if ph not in symbols and ph not in new_symbols:
|
|
new_symbols.append(ph)
|
|
print('update!, now symbols:')
|
|
print(new_symbols)
|
|
with open(f'{language}_symbol.txt', 'w') as f:
|
|
f.write(f'{new_symbols}')
|
|
|
|
assert len(phones) == len(tones)
|
|
assert len(phones) == sum(word2ph)
|
|
out_file.write(
|
|
"{}|{}|{}|{}|{}|{}|{}\n".format(
|
|
utt,
|
|
spk,
|
|
language,
|
|
norm_text,
|
|
" ".join(phones),
|
|
" ".join([str(i) for i in tones]),
|
|
" ".join([str(i) for i in word2ph]),
|
|
)
|
|
)
|
|
bert_path = utt.replace(".wav", ".bert.pt")
|
|
os.makedirs(os.path.dirname(bert_path), exist_ok=True)
|
|
torch.save(bert.cpu(), bert_path)
|
|
except Exception as error:
|
|
print("err!", line, error)
|
|
|
|
out_file.close()
|
|
|
|
metadata = cleaned_path
|
|
|
|
spk_utt_map = defaultdict(list)
|
|
spk_id_map = {}
|
|
current_sid = 0
|
|
|
|
with open(metadata, encoding="utf-8") as f:
|
|
for line in f.readlines():
|
|
utt, spk, language, text, phones, tones, word2ph = line.strip().split("|")
|
|
spk_utt_map[spk].append(line)
|
|
|
|
if spk not in spk_id_map.keys():
|
|
spk_id_map[spk] = current_sid
|
|
current_sid += 1
|
|
|
|
train_list = []
|
|
val_list = []
|
|
|
|
for spk, utts in spk_utt_map.items():
|
|
shuffle(utts)
|
|
val_list += utts[:val_per_spk]
|
|
train_list += utts[val_per_spk:]
|
|
|
|
if len(val_list) > max_val_total:
|
|
train_list += val_list[max_val_total:]
|
|
val_list = val_list[:max_val_total]
|
|
|
|
with open(train_path, "w", encoding="utf-8") as f:
|
|
for line in train_list:
|
|
f.write(line)
|
|
|
|
with open(val_path, "w", encoding="utf-8") as f:
|
|
for line in val_list:
|
|
f.write(line)
|
|
|
|
config = json.load(open(config_path, encoding="utf-8"))
|
|
config["data"]["spk2id"] = spk_id_map
|
|
|
|
config["data"]["training_files"] = train_path
|
|
config["data"]["validation_files"] = val_path
|
|
config["data"]["n_speakers"] = len(spk_id_map)
|
|
config["num_languages"] = num_languages
|
|
config["num_tones"] = num_tones
|
|
config["symbols"] = symbols
|
|
|
|
with open(out_config_path, "w", encoding="utf-8") as f:
|
|
json.dump(config, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|