Spaces:
Sleeping
Sleeping
import json | |
import math | |
import os | |
import numpy as np | |
from torch.utils.data import Dataset | |
from text import text_to_sequence | |
from utils.tools import pad_1D, pad_2D | |
class Dataset(Dataset): | |
def __init__( | |
self, filename, preprocess_config, train_config, sort=False, drop_last=False | |
): | |
self.dataset_name = preprocess_config["dataset"] | |
self.preprocessed_path = preprocess_config["path"]["preprocessed_path"] | |
self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] | |
self.batch_size = train_config["optimizer"]["batch_size"] | |
self.basename, self.speaker, self.text, self.raw_text, self.emotion = self.process_meta( | |
filename | |
) | |
with open(os.path.join(self.preprocessed_path, "speakers.json")) as f: | |
self.speaker_map = json.load(f) | |
with open(os.path.join(self.preprocessed_path, "emotions.json")) as f: | |
self.emotion_map = json.load(f) | |
self.sort = sort | |
self.drop_last = drop_last | |
def __len__(self): | |
return len(self.text) | |
def __getitem__(self, idx): | |
basename = self.basename[idx] | |
speaker = self.speaker[idx] | |
emotion = self.emotion[idx] | |
speaker_id = self.speaker_map[speaker] | |
emotion_id = self.emotion_map[emotion] | |
raw_text = self.raw_text[idx] | |
phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) | |
mel_path = os.path.join( | |
self.preprocessed_path, | |
"mel", | |
"{}-mel-{}.npy".format(speaker, basename), | |
) | |
mel = np.load(mel_path) | |
pitch_path = os.path.join( | |
self.preprocessed_path, | |
"pitch", | |
"{}-pitch-{}.npy".format(speaker, basename), | |
) | |
pitch = np.load(pitch_path) | |
energy_path = os.path.join( | |
self.preprocessed_path, | |
"energy", | |
"{}-energy-{}.npy".format(speaker, basename), | |
) | |
energy = np.load(energy_path) | |
duration_path = os.path.join( | |
self.preprocessed_path, | |
"duration", | |
"{}-duration-{}.npy".format(speaker, basename), | |
) | |
duration = np.load(duration_path) | |
sample = { | |
"id": basename, | |
"speaker": speaker_id, | |
"emotion": emotion_id, | |
"text": phone, | |
"raw_text": raw_text, | |
"mel": mel, | |
"pitch": pitch, | |
"energy": energy, | |
"duration": duration, | |
} | |
return sample | |
def process_meta(self, filename): | |
with open( | |
os.path.join(self.preprocessed_path, filename), "r", encoding="utf-8" | |
) as f: | |
name = [] | |
speaker = [] | |
emotion = [] | |
text = [] | |
raw_text = [] | |
for line in f.readlines(): | |
n, s, t, r, e = line.strip("\n").split("|") | |
name.append(n) | |
speaker.append(s) | |
text.append(t) | |
raw_text.append(r) | |
emotion.append(e) | |
return name, speaker, text, raw_text, emotion | |
def reprocess(self, data, idxs): | |
ids = [data[idx]["id"] for idx in idxs] | |
speakers = [data[idx]["speaker"] for idx in idxs] | |
emotions = [data[idx]["emotion"] for idx in idxs] | |
texts = [data[idx]["text"] for idx in idxs] | |
raw_texts = [data[idx]["raw_text"] for idx in idxs] | |
mels = [data[idx]["mel"] for idx in idxs] | |
pitches = [data[idx]["pitch"] for idx in idxs] | |
energies = [data[idx]["energy"] for idx in idxs] | |
durations = [data[idx]["duration"] for idx in idxs] | |
text_lens = np.array([text.shape[0] for text in texts]) | |
mel_lens = np.array([mel.shape[0] for mel in mels]) | |
speakers = np.array(speakers) | |
emotions = np.array(emotions) | |
texts = pad_1D(texts) | |
mels = pad_2D(mels) | |
pitches = pad_1D(pitches) | |
energies = pad_1D(energies) | |
durations = pad_1D(durations) | |
return ( | |
ids, | |
raw_texts, | |
speakers, | |
texts, | |
text_lens, | |
max(text_lens), | |
emotions, | |
mels, | |
mel_lens, | |
max(mel_lens), | |
pitches, | |
energies, | |
durations, | |
) | |
def collate_fn(self, data): | |
data_size = len(data) | |
if self.sort: | |
len_arr = np.array([d["text"].shape[0] for d in data]) | |
idx_arr = np.argsort(-len_arr) | |
else: | |
idx_arr = np.arange(data_size) | |
tail = idx_arr[len(idx_arr) - (len(idx_arr) % self.batch_size):] | |
idx_arr = idx_arr[: len(idx_arr) - (len(idx_arr) % self.batch_size)] | |
idx_arr = idx_arr.reshape((-1, self.batch_size)).tolist() | |
if not self.drop_last and len(tail) > 0: | |
idx_arr += [tail.tolist()] | |
output = list() | |
for idx in idx_arr: | |
output.append(self.reprocess(data, idx)) | |
return output | |
class TextDataset(Dataset): | |
def __init__(self, filepath, preprocess_config): | |
self.cleaners = preprocess_config["preprocessing"]["text"]["text_cleaners"] | |
self.basename, self.speaker, self.text, self.raw_text, self.emotion = self.process_meta( | |
filepath | |
) | |
with open( | |
os.path.join( | |
preprocess_config["path"]["preprocessed_path"], "speakers.json" | |
) | |
) as f: | |
self.speaker_map = json.load(f) | |
with open(os.path.join( | |
preprocess_config["path"]["preprocessed"], "emotions.json" | |
) | |
) as f: | |
self.emotion_map = json.load(f) | |
def __len__(self): | |
return len(self.text) | |
def __getitem__(self, idx): | |
basename = self.basename[idx] | |
speaker = self.speaker[idx] | |
speaker_id = self.speaker_map[speaker] | |
raw_text = self.raw_text[idx] | |
emotion = self.emotion[idx] | |
phone = np.array(text_to_sequence(self.text[idx], self.cleaners)) | |
return (basename, speaker_id, phone, raw_text, emotion) | |
def process_meta(self, filename): | |
with open(filename, "r", encoding="utf-8") as f: | |
name = [] | |
speaker = [] | |
text = [] | |
raw_text = [] | |
emotion = [] | |
for line in f.readlines(): | |
n, s, t, r, e = line.strip("\n").split("|") | |
name.append(n) | |
speaker.append(s) | |
text.append(t) | |
raw_text.append(r) | |
emotion.append(e) | |
return name, speaker, text, raw_text, emotion | |
def collate_fn(self, data): | |
ids = [d[0] for d in data] | |
speakers = np.array([d[1] for d in data]) | |
texts = [d[2] for d in data] | |
raw_texts = [d[3] for d in data] | |
emotions = [d[4] for d in data] | |
text_lens = np.array([text.shape[0] for text in texts]) | |
texts = pad_1D(texts) | |
return ids, raw_texts, speakers, texts, emotions, text_lens, max(text_lens) | |
if __name__ == "__main__": | |
# Test | |
import torch | |
import yaml | |
from torch.utils.data import DataLoader | |
from utils.utils import to_device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
preprocess_config = yaml.load( | |
open("./config/LJSpeech/preprocess.yaml", "r"), Loader=yaml.FullLoader | |
) | |
train_config = yaml.load( | |
open("./config/LJSpeech/train.yaml", "r"), Loader=yaml.FullLoader | |
) | |
train_dataset = Dataset( | |
"train.txt", preprocess_config, train_config, sort=True, drop_last=True | |
) | |
val_dataset = Dataset( | |
"val.txt", preprocess_config, train_config, sort=False, drop_last=False | |
) | |
train_loader = DataLoader( | |
train_dataset, | |
batch_size=train_config["optimizer"]["batch_size"] * 4, | |
shuffle=True, | |
collate_fn=train_dataset.collate_fn, | |
) | |
val_loader = DataLoader( | |
val_dataset, | |
batch_size=train_config["optimizer"]["batch_size"], | |
shuffle=False, | |
collate_fn=val_dataset.collate_fn, | |
) | |
n_batch = 0 | |
for batchs in train_loader: | |
for batch in batchs: | |
to_device(batch, device) | |
n_batch += 1 | |
print( | |
"Training set with size {} is composed of {} batches.".format( | |
len(train_dataset), n_batch | |
) | |
) | |
n_batch = 0 | |
for batchs in val_loader: | |
for batch in batchs: | |
to_device(batch, device) | |
n_batch += 1 | |
print( | |
"Validation set with size {} is composed of {} batches.".format( | |
len(val_dataset), n_batch | |
) | |
) | |