import os import json import copy import math from collections import OrderedDict import torch import torch.nn as nn import numpy as np import torch.nn.functional as F from utils.tools import get_mask_from_lengths, pad device = torch.device("cuda" if torch.cuda.is_available() else "cpu") class VarianceAdaptor(nn.Module): """Variance Adaptor""" def __init__(self, preprocess_config, model_config): super(VarianceAdaptor, self).__init__() self.duration_predictor = VariancePredictor(model_config) self.length_regulator = LengthRegulator() self.pitch_predictor = VariancePredictor(model_config) self.energy_predictor = VariancePredictor(model_config) self.pitch_feature_level = preprocess_config["preprocessing"]["pitch"][ "feature" ] self.energy_feature_level = preprocess_config["preprocessing"]["energy"][ "feature" ] assert self.pitch_feature_level in ["phoneme_level", "frame_level"] assert self.energy_feature_level in ["phoneme_level", "frame_level"] pitch_quantization = model_config["variance_embedding"]["pitch_quantization"] energy_quantization = model_config["variance_embedding"]["energy_quantization"] n_bins = model_config["variance_embedding"]["n_bins"] assert pitch_quantization in ["linear", "log"] assert energy_quantization in ["linear", "log"] with open( os.path.join(preprocess_config["path"]["preprocessed_path"], "stats.json") ) as f: stats = json.load(f) pitch_min, pitch_max = stats["pitch"][:2] energy_min, energy_max = stats["energy"][:2] if pitch_quantization == "log": self.pitch_bins = nn.Parameter( torch.exp( torch.linspace(np.log(pitch_min), np.log(pitch_max), n_bins - 1) ), requires_grad=False, ) else: self.pitch_bins = nn.Parameter( torch.linspace(pitch_min, pitch_max, n_bins - 1), requires_grad=False, ) if energy_quantization == "log": self.energy_bins = nn.Parameter( torch.exp( torch.linspace(np.log(energy_min), np.log(energy_max), n_bins - 1) ), requires_grad=False, ) else: self.energy_bins = nn.Parameter( torch.linspace(energy_min, energy_max, n_bins - 1), requires_grad=False, ) self.pitch_embedding = nn.Embedding( n_bins, model_config["transformer"]["encoder_hidden"] ) self.energy_embedding = nn.Embedding( n_bins, model_config["transformer"]["encoder_hidden"] ) def get_pitch_embedding(self, x, target, mask, control): prediction = self.pitch_predictor(x, mask) if target is not None: embedding = self.pitch_embedding(torch.bucketize(target, self.pitch_bins)) else: prediction = prediction * control embedding = self.pitch_embedding( torch.bucketize(prediction, self.pitch_bins) ) return prediction, embedding def get_energy_embedding(self, x, target, mask, control): prediction = self.energy_predictor(x, mask) if target is not None: embedding = self.energy_embedding(torch.bucketize(target, self.energy_bins)) else: prediction = prediction * control embedding = self.energy_embedding( torch.bucketize(prediction, self.energy_bins) ) return prediction, embedding def forward( self, x, src_mask, mel_mask=None, max_len=None, pitch_target=None, energy_target=None, duration_target=None, p_control=1.0, e_control=1.0, d_control=1.0, ): log_duration_prediction = self.duration_predictor(x, src_mask) if self.pitch_feature_level == "phoneme_level": pitch_prediction, pitch_embedding = self.get_pitch_embedding( x, pitch_target, src_mask, p_control ) x = x + pitch_embedding if self.energy_feature_level == "phoneme_level": energy_prediction, energy_embedding = self.get_energy_embedding( x, energy_target, src_mask, p_control ) x = x + energy_embedding if duration_target is not None: x, mel_len = self.length_regulator(x, duration_target, max_len) duration_rounded = duration_target else: duration_rounded = torch.clamp( (torch.round(torch.exp(log_duration_prediction) - 1) * d_control), min=0, ) x, mel_len = self.length_regulator(x, duration_rounded, max_len) mel_mask = get_mask_from_lengths(mel_len) if self.pitch_feature_level == "frame_level": pitch_prediction, pitch_embedding = self.get_pitch_embedding( x, pitch_target, mel_mask, p_control ) x = x + pitch_embedding if self.energy_feature_level == "frame_level": energy_prediction, energy_embedding = self.get_energy_embedding( x, energy_target, mel_mask, p_control ) x = x + energy_embedding return ( x, pitch_prediction, energy_prediction, log_duration_prediction, duration_rounded, mel_len, mel_mask, ) class LengthRegulator(nn.Module): """Length Regulator""" def __init__(self): super(LengthRegulator, self).__init__() def LR(self, x, duration, max_len): output = list() mel_len = list() for batch, expand_target in zip(x, duration): expanded = self.expand(batch, expand_target) output.append(expanded) mel_len.append(expanded.shape[0]) if max_len is not None: output = pad(output, max_len) else: output = pad(output) return output, torch.LongTensor(mel_len).to(device) def expand(self, batch, predicted): out = list() for i, vec in enumerate(batch): expand_size = predicted[i].item() out.append(vec.expand(max(int(expand_size), 0), -1)) out = torch.cat(out, 0) return out def forward(self, x, duration, max_len): output, mel_len = self.LR(x, duration, max_len) return output, mel_len class VariancePredictor(nn.Module): """Duration, Pitch and Energy Predictor""" def __init__(self, model_config): super(VariancePredictor, self).__init__() self.input_size = model_config["transformer"]["encoder_hidden"] self.filter_size = model_config["variance_predictor"]["filter_size"] self.kernel = model_config["variance_predictor"]["kernel_size"] self.conv_output_size = model_config["variance_predictor"]["filter_size"] self.dropout = model_config["variance_predictor"]["dropout"] self.conv_layer = nn.Sequential( OrderedDict( [ ( "conv1d_1", Conv( self.input_size, self.filter_size, kernel_size=self.kernel, padding=(self.kernel - 1) // 2, ), ), ("relu_1", nn.ReLU()), ("layer_norm_1", nn.LayerNorm(self.filter_size)), ("dropout_1", nn.Dropout(self.dropout)), ( "conv1d_2", Conv( self.filter_size, self.filter_size, kernel_size=self.kernel, padding=1, ), ), ("relu_2", nn.ReLU()), ("layer_norm_2", nn.LayerNorm(self.filter_size)), ("dropout_2", nn.Dropout(self.dropout)), ] ) ) self.linear_layer = nn.Linear(self.conv_output_size, 1) def forward(self, encoder_output, mask): out = self.conv_layer(encoder_output) out = self.linear_layer(out) out = out.squeeze(-1) if mask is not None: out = out.masked_fill(mask, 0.0) return out class Conv(nn.Module): """ Convolution Module """ def __init__( self, in_channels, out_channels, kernel_size=1, stride=1, padding=0, dilation=1, bias=True, w_init="linear", ): """ :param in_channels: dimension of input :param out_channels: dimension of output :param kernel_size: size of kernel :param stride: size of stride :param padding: size of padding :param dilation: dilation rate :param bias: boolean. if True, bias is included. :param w_init: str. weight inits with xavier initialization. """ super(Conv, self).__init__() self.conv = nn.Conv1d( in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, bias=bias, ) def forward(self, x): x = x.contiguous().transpose(1, 2) x = self.conv(x) x = x.contiguous().transpose(1, 2) return x