File size: 5,893 Bytes
0883aa1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# Copyright (c) 2023 Amphion.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

import json
from tqdm import tqdm
import os
import torchaudio
from utils import audio
import csv
import random

from utils.util import has_existed
from text import _clean_text
import librosa
import soundfile as sf
from scipy.io import wavfile

from pathlib import Path
import numpy as np


def textgird_extract(
    corpus_directory,
    output_directory,
    mfa_path=os.path.join("mfa", "montreal-forced-aligner", "bin", "mfa_align"),
    lexicon=os.path.join("mfa", "lexicon", "librispeech-lexicon.txt"),
    acoustic_model_path=os.path.join(
        "mfa", "montreal-forced-aligner", "pretrained_models", "english.zip"
    ),
    jobs="8",
):
    assert os.path.exists(
        corpus_directory
    ), "Please check the directionary contains *.wav, *.lab"
    assert (
        os.path.exists(mfa_path)
        and os.path.exists(lexicon)
        and os.path.exists(acoustic_model_path)
    ), f"Please download the MFA tools to {mfa_path} firstly"
    Path(output_directory).mkdir(parents=True, exist_ok=True)
    print(f"MFA results are save in {output_directory}")
    os.system(
        f".{os.path.sep}{mfa_path} {corpus_directory} {lexicon} {acoustic_model_path} {output_directory} -j {jobs} --clean"
    )


def get_lines(file):
    lines = []
    with open(file, encoding="utf-8") as f:
        for line in tqdm(f):
            lines.append(line.strip())
    return lines


def get_uid2utt(ljspeech_path, dataset, cfg):
    index_count = 0
    total_duration = 0

    uid2utt = []
    for l in tqdm(dataset):
        items = l.split("|")
        uid = items[0]
        text = items[2]

        res = {
            "Dataset": "LJSpeech",
            "index": index_count,
            "Singer": "LJSpeech",
            "Uid": uid,
            "Text": text,
        }

        # Duration in wav files
        audio_file = os.path.join(ljspeech_path, "wavs/{}.wav".format(uid))

        res["Path"] = audio_file

        waveform, sample_rate = torchaudio.load(audio_file)
        duration = waveform.size(-1) / sample_rate
        res["Duration"] = duration

        uid2utt.append(res)

        index_count = index_count + 1
        total_duration += duration

    return uid2utt, total_duration / 3600


def split_dataset(lines, test_rate=0.05, test_size=None):
    if test_size == None:
        test_size = int(len(lines) * test_rate)
    random.shuffle(lines)

    train_set = []
    test_set = []

    for line in lines[:test_size]:
        test_set.append(line)
    for line in lines[test_size:]:
        train_set.append(line)
    return train_set, test_set


max_wav_value = 32768.0


def prepare_align(dataset, dataset_path, cfg, output_path):
    in_dir = dataset_path
    out_dir = os.path.join(output_path, dataset, cfg.raw_data)
    sampling_rate = cfg.sample_rate
    cleaners = cfg.text_cleaners
    speaker = "LJSpeech"
    with open(os.path.join(dataset_path, "metadata.csv"), encoding="utf-8") as f:
        for line in tqdm(f):
            parts = line.strip().split("|")
            base_name = parts[0]
            text = parts[2]
            text = _clean_text(text, cleaners)

            output_wav_path = os.path.join(out_dir, speaker, "{}.wav".format(base_name))
            output_lab_path = os.path.join(out_dir, speaker, "{}.lab".format(base_name))

            if os.path.exists(output_wav_path) and os.path.exists(output_lab_path):
                continue

            wav_path = os.path.join(in_dir, "wavs", "{}.wav".format(base_name))
            if os.path.exists(wav_path):
                os.makedirs(os.path.join(out_dir, speaker), exist_ok=True)
                wav, _ = librosa.load(wav_path, sampling_rate)
                wav = wav / max(abs(wav)) * max_wav_value

                wavfile.write(
                    os.path.join(out_dir, speaker, "{}.wav".format(base_name)),
                    sampling_rate,
                    wav.astype(np.int16),
                )

                with open(
                    os.path.join(out_dir, speaker, "{}.lab".format(base_name)),
                    "w",
                ) as f1:
                    f1.write(text)
    # Extract textgird with MFA
    textgird_extract(
        corpus_directory=out_dir,
        output_directory=os.path.join(output_path, dataset, "TextGrid"),
    )


def main(output_path, dataset_path, cfg):
    print("-" * 10)
    print("Dataset splits for {}...\n".format("LJSpeech"))

    dataset = "LJSpeech"

    save_dir = os.path.join(output_path, dataset)
    os.makedirs(save_dir, exist_ok=True)
    ljspeech_path = dataset_path

    train_output_file = os.path.join(save_dir, "train.json")
    test_output_file = os.path.join(save_dir, "test.json")
    singer_dict_file = os.path.join(save_dir, "singers.json")

    speaker = "LJSpeech"
    speakers = [dataset + "_" + speaker]
    singer_lut = {name: i for i, name in enumerate(sorted(speakers))}
    with open(singer_dict_file, "w") as f:
        json.dump(singer_lut, f, indent=4, ensure_ascii=False)

    if has_existed(train_output_file) and has_existed(test_output_file):
        return

    meta_file = os.path.join(ljspeech_path, "metadata.csv")
    lines = get_lines(meta_file)

    train_set, test_set = split_dataset(lines)

    res, hours = get_uid2utt(ljspeech_path, train_set, cfg)

    # Save train
    os.makedirs(save_dir, exist_ok=True)
    with open(train_output_file, "w") as f:
        json.dump(res, f, indent=4, ensure_ascii=False)

    print("Train_hours= {}".format(hours))

    res, hours = get_uid2utt(ljspeech_path, test_set, cfg)

    # Save test
    os.makedirs(save_dir, exist_ok=True)
    with open(test_output_file, "w") as f:
        json.dump(res, f, indent=4, ensure_ascii=False)

    print("Test_hours= {}".format(hours))