import torch import torch.nn as nn import torch.nn.functional as F from collections import OrderedDict import numpy as np import copy import math import hparams as hp import utils device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def clones(module, N): return nn.ModuleList([copy.deepcopy(module) for _ in range(N)]) class VarianceAdaptor(nn.Module): """ Variance Adaptor """ def __init__(self): super(VarianceAdaptor, self).__init__() self.duration_predictor = VariancePredictor() self.length_regulator = LengthRegulator() self.pitch_predictor = VariancePredictor() self.energy_predictor = VariancePredictor() self.energy_embedding_producer = Conv(1, hp.encoder_hidden, kernel_size=9, bias=False, padding=4) self.pitch_embedding_producer = Conv(1, hp.encoder_hidden, kernel_size=9, bias=False, padding=4) def forward(self, x, src_mask, mel_mask=None, duration_target=None, pitch_target=None, energy_target=None, max_len=None): log_duration_prediction = self.duration_predictor(x, src_mask) pitch_prediction = self.pitch_predictor(x, src_mask) if pitch_target is not None: pitch_embedding = self.pitch_embedding_producer(pitch_target.unsqueeze(2)) else: pitch_embedding = self.pitch_embedding_producer(pitch_prediction.unsqueeze(2)) energy_prediction = self.energy_predictor(x, src_mask) if energy_target is not None: energy_embedding = self.energy_embedding_producer(energy_target.unsqueeze(2)) else: energy_embedding = self.energy_embedding_producer(energy_prediction.unsqueeze(2)) x = x + pitch_embedding + energy_embedding if duration_target is not None: x, mel_len = self.length_regulator(x, duration_target, max_len) else: duration_rounded = torch.clamp(torch.round(torch.exp(log_duration_prediction)-hp.log_offset), min=0) x, mel_len = self.length_regulator(x, duration_rounded, max_len) mel_mask = utils.get_mask_from_lengths(mel_len) return x, log_duration_prediction, pitch_prediction, energy_prediction, mel_len, mel_mask class LengthRegulator(nn.Module): """ Length Regulator """ def __init__(self): super(LengthRegulator, self).__init__() def LR(self, x, duration, max_len): output = list() mel_len = list() for batch, expand_target in zip(x, duration): expanded = self.expand(batch, expand_target) output.append(expanded) mel_len.append(expanded.shape[0]) if max_len is not None: output = utils.pad(output, max_len) else: output = utils.pad(output) return output, torch.LongTensor(mel_len).to(device) def expand(self, batch, predicted): out = list() for i, vec in enumerate(batch): expand_size = predicted[i].item() out.append(vec.expand(int(expand_size), -1)) out = torch.cat(out, 0) return out def forward(self, x, duration, max_len): output, mel_len = self.LR(x, duration, max_len) return output, mel_len class VariancePredictor(nn.Module): """ Duration, Pitch and Energy Predictor """ def __init__(self): super(VariancePredictor, self).__init__() self.input_size = hp.encoder_hidden self.filter_size = hp.variance_predictor_filter_size self.kernel = hp.variance_predictor_kernel_size self.conv_output_size = hp.variance_predictor_filter_size self.dropout = hp.variance_predictor_dropout self.conv_layer = nn.Sequential(OrderedDict([ ("conv1d_1", Conv(self.input_size, self.filter_size, kernel_size=self.kernel, padding=(self.kernel-1)//2)), ("relu_1", nn.ReLU()), ("layer_norm_1", nn.LayerNorm(self.filter_size)), ("dropout_1", nn.Dropout(self.dropout)), ("conv1d_2", Conv(self.filter_size, self.filter_size, kernel_size=self.kernel, padding=1)), ("relu_2", nn.ReLU()), ("layer_norm_2", nn.LayerNorm(self.filter_size)), ("dropout_2", nn.Dropout(self.dropout)) ])) self.linear_layer = nn.Linear(self.conv_output_size, 1) def forward(self, encoder_output, mask): out = self.conv_layer(encoder_output) out = self.linear_layer(out) out = out.squeeze(-1) if mask is not None: out = out.masked_fill(mask, 0.) return out class Conv(nn.Module): """ Convolution Module """ def __init__(self, in_channels, out_channels, kernel_size=1, stride=1, padding=0, dilation=1, bias=True, w_init='linear'): """ :param in_channels: dimension of input :param out_channels: dimension of output :param kernel_size: size of kernel :param stride: size of stride :param padding: size of padding :param dilation: dilation rate :param bias: boolean. if True, bias is included. :param w_init: str. weight inits with xavier initialization. """ super(Conv, self).__init__() self.conv = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, bias=bias) def forward(self, x): x = x.contiguous().transpose(1, 2) x = self.conv(x) x = x.contiguous().transpose(1, 2) return x