# Copyright (c) 2023 Amphion. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import json from tqdm import tqdm import os import torchaudio from utils import audio import csv import random from utils.util import has_existed from text import _clean_text import librosa import soundfile as sf from scipy.io import wavfile from pathlib import Path import numpy as np def textgird_extract( corpus_directory, output_directory, mfa_path=os.path.join("mfa", "montreal-forced-aligner", "bin", "mfa_align"), lexicon=os.path.join("mfa", "lexicon", "librispeech-lexicon.txt"), acoustic_model_path=os.path.join( "mfa", "montreal-forced-aligner", "pretrained_models", "english.zip" ), jobs="8", ): assert os.path.exists( corpus_directory ), "Please check the directionary contains *.wav, *.lab" assert ( os.path.exists(mfa_path) and os.path.exists(lexicon) and os.path.exists(acoustic_model_path) ), f"Please download the MFA tools to {mfa_path} firstly" Path(output_directory).mkdir(parents=True, exist_ok=True) print(f"MFA results are save in {output_directory}") os.system( f".{os.path.sep}{mfa_path} {corpus_directory} {lexicon} {acoustic_model_path} {output_directory} -j {jobs} --clean" ) def get_lines(file): lines = [] with open(file, encoding="utf-8") as f: for line in tqdm(f): lines.append(line.strip()) return lines def get_uid2utt(ljspeech_path, dataset, cfg): index_count = 0 total_duration = 0 uid2utt = [] for l in tqdm(dataset): items = l.split("|") uid = items[0] text = items[2] res = { "Dataset": "LJSpeech", "index": index_count, "Singer": "LJSpeech", "Uid": uid, "Text": text, } # Duration in wav files audio_file = os.path.join(ljspeech_path, "wavs/{}.wav".format(uid)) res["Path"] = audio_file waveform, sample_rate = torchaudio.load(audio_file) duration = waveform.size(-1) / sample_rate res["Duration"] = duration uid2utt.append(res) index_count = index_count + 1 total_duration += duration return uid2utt, total_duration / 3600 def split_dataset(lines, test_rate=0.05, test_size=None): if test_size == None: test_size = int(len(lines) * test_rate) random.shuffle(lines) train_set = [] test_set = [] for line in lines[:test_size]: test_set.append(line) for line in lines[test_size:]: train_set.append(line) return train_set, test_set max_wav_value = 32768.0 def prepare_align(dataset, dataset_path, cfg, output_path): in_dir = dataset_path out_dir = os.path.join(output_path, dataset, cfg.raw_data) sampling_rate = cfg.sample_rate cleaners = cfg.text_cleaners speaker = "LJSpeech" with open(os.path.join(dataset_path, "metadata.csv"), encoding="utf-8") as f: for line in tqdm(f): parts = line.strip().split("|") base_name = parts[0] text = parts[2] text = _clean_text(text, cleaners) output_wav_path = os.path.join(out_dir, speaker, "{}.wav".format(base_name)) output_lab_path = os.path.join(out_dir, speaker, "{}.lab".format(base_name)) if os.path.exists(output_wav_path) and os.path.exists(output_lab_path): continue wav_path = os.path.join(in_dir, "wavs", "{}.wav".format(base_name)) if os.path.exists(wav_path): os.makedirs(os.path.join(out_dir, speaker), exist_ok=True) wav, _ = librosa.load(wav_path, sampling_rate) wav = wav / max(abs(wav)) * max_wav_value wavfile.write( os.path.join(out_dir, speaker, "{}.wav".format(base_name)), sampling_rate, wav.astype(np.int16), ) with open( os.path.join(out_dir, speaker, "{}.lab".format(base_name)), "w", ) as f1: f1.write(text) # Extract textgird with MFA textgird_extract( corpus_directory=out_dir, output_directory=os.path.join(output_path, dataset, "TextGrid"), ) def main(output_path, dataset_path, cfg): print("-" * 10) print("Dataset splits for {}...\n".format("LJSpeech")) dataset = "LJSpeech" save_dir = os.path.join(output_path, dataset) os.makedirs(save_dir, exist_ok=True) ljspeech_path = dataset_path train_output_file = os.path.join(save_dir, "train.json") test_output_file = os.path.join(save_dir, "test.json") singer_dict_file = os.path.join(save_dir, "singers.json") speaker = "LJSpeech" speakers = [dataset + "_" + speaker] singer_lut = {name: i for i, name in enumerate(sorted(speakers))} with open(singer_dict_file, "w") as f: json.dump(singer_lut, f, indent=4, ensure_ascii=False) if has_existed(train_output_file) and has_existed(test_output_file): return meta_file = os.path.join(ljspeech_path, "metadata.csv") lines = get_lines(meta_file) train_set, test_set = split_dataset(lines) res, hours = get_uid2utt(ljspeech_path, train_set, cfg) # Save train os.makedirs(save_dir, exist_ok=True) with open(train_output_file, "w") as f: json.dump(res, f, indent=4, ensure_ascii=False) print("Train_hours= {}".format(hours)) res, hours = get_uid2utt(ljspeech_path, test_set, cfg) # Save test os.makedirs(save_dir, exist_ok=True) with open(test_output_file, "w") as f: json.dump(res, f, indent=4, ensure_ascii=False) print("Test_hours= {}".format(hours))