|
import json |
|
import math |
|
import os |
|
|
|
import numpy as np |
|
from torch.utils.data import Dataset |
|
|
|
from text import text_to_sequence |
|
from utils.tools import pad_1D, pad_2D |
|
|
|
|
|
class Dataset(Dataset): |
|
def __init__( |
|
self, filename, preprocess_config, train_config, sort=False, drop_last=False |
|
): |
|
self.dataset_name = preprocess_config["dataset"] |
|
self.preprocessed_path = preprocess_config["path"]["preprocessed_path"] |
|
self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] |
|
self.batch_size = train_config["optimizer"]["batch_size"] |
|
|
|
self.basename, self.speaker, self.text, self.raw_text, self.emotion = self.process_meta( |
|
filename |
|
) |
|
with open(os.path.join(self.preprocessed_path, "speakers.json")) as f: |
|
self.speaker_map = json.load(f) |
|
with open(os.path.join(self.preprocessed_path, "emotions.json")) as f: |
|
self.emotion_map = json.load(f) |
|
|
|
self.sort = sort |
|
self.drop_last = drop_last |
|
|
|
def __len__(self): |
|
return len(self.text) |
|
|
|
def __getitem__(self, idx): |
|
basename = self.basename[idx] |
|
speaker = self.speaker[idx] |
|
emotion = self.emotion[idx] |
|
speaker_id = self.speaker_map[speaker] |
|
emotion_id = self.emotion_map[emotion] |
|
raw_text = self.raw_text[idx] |
|
phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) |
|
mel_path = os.path.join( |
|
self.preprocessed_path, |
|
"mel", |
|
"{}-mel-{}.npy".format(speaker, basename), |
|
) |
|
mel = np.load(mel_path) |
|
pitch_path = os.path.join( |
|
self.preprocessed_path, |
|
"pitch", |
|
"{}-pitch-{}.npy".format(speaker, basename), |
|
) |
|
pitch = np.load(pitch_path) |
|
energy_path = os.path.join( |
|
self.preprocessed_path, |
|
"energy", |
|
"{}-energy-{}.npy".format(speaker, basename), |
|
) |
|
energy = np.load(energy_path) |
|
duration_path = os.path.join( |
|
self.preprocessed_path, |
|
"duration", |
|
"{}-duration-{}.npy".format(speaker, basename), |
|
) |
|
duration = np.load(duration_path) |
|
|
|
sample = { |
|
"id": basename, |
|
"speaker": speaker_id, |
|
"emotion": emotion_id, |
|
"text": phone, |
|
"raw_text": raw_text, |
|
"mel": mel, |
|
"pitch": pitch, |
|
"energy": energy, |
|
"duration": duration, |
|
} |
|
|
|
return sample |
|
|
|
def process_meta(self, filename): |
|
with open( |
|
os.path.join(self.preprocessed_path, filename), "r", encoding="utf-8" |
|
) as f: |
|
name = [] |
|
speaker = [] |
|
emotion = [] |
|
text = [] |
|
raw_text = [] |
|
for line in f.readlines(): |
|
n, s, t, r, e = line.strip("\n").split("|") |
|
name.append(n) |
|
speaker.append(s) |
|
text.append(t) |
|
raw_text.append(r) |
|
emotion.append(e) |
|
return name, speaker, text, raw_text, emotion |
|
|
|
def reprocess(self, data, idxs): |
|
ids = [data[idx]["id"] for idx in idxs] |
|
speakers = [data[idx]["speaker"] for idx in idxs] |
|
emotions = [data[idx]["emotion"] for idx in idxs] |
|
texts = [data[idx]["text"] for idx in idxs] |
|
raw_texts = [data[idx]["raw_text"] for idx in idxs] |
|
mels = [data[idx]["mel"] for idx in idxs] |
|
pitches = [data[idx]["pitch"] for idx in idxs] |
|
energies = [data[idx]["energy"] for idx in idxs] |
|
durations = [data[idx]["duration"] for idx in idxs] |
|
|
|
text_lens = np.array([text.shape[0] for text in texts]) |
|
mel_lens = np.array([mel.shape[0] for mel in mels]) |
|
|
|
speakers = np.array(speakers) |
|
emotions = np.array(emotions) |
|
texts = pad_1D(texts) |
|
mels = pad_2D(mels) |
|
pitches = pad_1D(pitches) |
|
energies = pad_1D(energies) |
|
durations = pad_1D(durations) |
|
|
|
return ( |
|
ids, |
|
raw_texts, |
|
speakers, |
|
texts, |
|
text_lens, |
|
max(text_lens), |
|
emotions, |
|
mels, |
|
mel_lens, |
|
max(mel_lens), |
|
pitches, |
|
energies, |
|
durations, |
|
) |
|
|
|
def collate_fn(self, data): |
|
data_size = len(data) |
|
|
|
if self.sort: |
|
len_arr = np.array([d["text"].shape[0] for d in data]) |
|
idx_arr = np.argsort(-len_arr) |
|
else: |
|
idx_arr = np.arange(data_size) |
|
|
|
tail = idx_arr[len(idx_arr) - (len(idx_arr) % self.batch_size):] |
|
idx_arr = idx_arr[: len(idx_arr) - (len(idx_arr) % self.batch_size)] |
|
idx_arr = idx_arr.reshape((-1, self.batch_size)).tolist() |
|
if not self.drop_last and len(tail) > 0: |
|
idx_arr += [tail.tolist()] |
|
|
|
output = list() |
|
for idx in idx_arr: |
|
output.append(self.reprocess(data, idx)) |
|
|
|
return output |
|
|
|
|
|
class TextDataset(Dataset): |
|
def __init__(self, filepath, preprocess_config): |
|
self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] |
|
|
|
self.basename, self.speaker, self.text, self.raw_text, self.emotion = self.process_meta( |
|
filepath |
|
) |
|
with open( |
|
os.path.join( |
|
preprocess_config["path"]["preprocessed_path"], "speakers.json" |
|
) |
|
) as f: |
|
self.speaker_map = json.load(f) |
|
with open(os.path.join( |
|
preprocess_config["path"]["preprocessed"], "emotions.json" |
|
) |
|
) as f: |
|
self.emotion_map = json.load(f) |
|
|
|
def __len__(self): |
|
return len(self.text) |
|
|
|
def __getitem__(self, idx): |
|
basename = self.basename[idx] |
|
speaker = self.speaker[idx] |
|
speaker_id = self.speaker_map[speaker] |
|
raw_text = self.raw_text[idx] |
|
emotion = self.emotion[idx] |
|
phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) |
|
|
|
return (basename, speaker_id, phone, raw_text, emotion) |
|
|
|
def process_meta(self, filename): |
|
with open(filename, "r", encoding="utf-8") as f: |
|
name = [] |
|
speaker = [] |
|
text = [] |
|
raw_text = [] |
|
emotion = [] |
|
for line in f.readlines(): |
|
n, s, t, r, e = line.strip("\n").split("|") |
|
name.append(n) |
|
speaker.append(s) |
|
text.append(t) |
|
raw_text.append(r) |
|
emotion.append(e) |
|
return name, speaker, text, raw_text, emotion |
|
|
|
def collate_fn(self, data): |
|
ids = [d[0] for d in data] |
|
speakers = np.array([d[1] for d in data]) |
|
texts = [d[2] for d in data] |
|
raw_texts = [d[3] for d in data] |
|
emotions = [d[4] for d in data] |
|
text_lens = np.array([text.shape[0] for text in texts]) |
|
|
|
texts = pad_1D(texts) |
|
|
|
return ids, raw_texts, speakers, texts, emotions, text_lens, max(text_lens) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
import torch |
|
import yaml |
|
from torch.utils.data import DataLoader |
|
from utils.utils import to_device |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
preprocess_config = yaml.load( |
|
open("./config/LJSpeech/preprocess.yaml", "r"), Loader=yaml.FullLoader |
|
) |
|
train_config = yaml.load( |
|
open("./config/LJSpeech/train.yaml", "r"), Loader=yaml.FullLoader |
|
) |
|
|
|
train_dataset = Dataset( |
|
"train.txt", preprocess_config, train_config, sort=True, drop_last=True |
|
) |
|
val_dataset = Dataset( |
|
"val.txt", preprocess_config, train_config, sort=False, drop_last=False |
|
) |
|
train_loader = DataLoader( |
|
train_dataset, |
|
batch_size=train_config["optimizer"]["batch_size"] * 4, |
|
shuffle=True, |
|
collate_fn=train_dataset.collate_fn, |
|
) |
|
val_loader = DataLoader( |
|
val_dataset, |
|
batch_size=train_config["optimizer"]["batch_size"], |
|
shuffle=False, |
|
collate_fn=val_dataset.collate_fn, |
|
) |
|
|
|
n_batch = 0 |
|
for batchs in train_loader: |
|
for batch in batchs: |
|
to_device(batch, device) |
|
n_batch += 1 |
|
print( |
|
"Training set with size {} is composed of {} batches.".format( |
|
len(train_dataset), n_batch |
|
) |
|
) |
|
|
|
n_batch = 0 |
|
for batchs in val_loader: |
|
for batch in batchs: |
|
to_device(batch, device) |
|
n_batch += 1 |
|
print( |
|
"Validation set with size {} is composed of {} batches.".format( |
|
len(val_dataset), n_batch |
|
) |
|
) |
|
|