# Copyright 2020 Nagoya University (Tomoki Hayashi) # Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0) """Fastspeech2 related modules for ESPnet2.""" import logging from typing import Dict from typing import Sequence from typing import Tuple import torch import torch.nn.functional as F from typeguard import check_argument_types from espnet.nets.pytorch_backend.conformer.encoder import ( Encoder as ConformerEncoder, # noqa: H301 ) from espnet.nets.pytorch_backend.fastspeech.duration_predictor import DurationPredictor from espnet.nets.pytorch_backend.fastspeech.duration_predictor import ( DurationPredictorLoss, # noqa: H301 ) from espnet.nets.pytorch_backend.fastspeech.length_regulator import LengthRegulator from espnet.nets.pytorch_backend.nets_utils import make_non_pad_mask from espnet.nets.pytorch_backend.nets_utils import make_pad_mask from espnet.nets.pytorch_backend.tacotron2.decoder import Postnet from espnet.nets.pytorch_backend.transformer.embedding import PositionalEncoding from espnet.nets.pytorch_backend.transformer.embedding import ScaledPositionalEncoding from espnet.nets.pytorch_backend.transformer.encoder import ( Encoder as TransformerEncoder, # noqa: H301 ) from espnet2.torch_utils.device_funcs import force_gatherable from espnet2.torch_utils.initialize import initialize from espnet2.tts.abs_tts import AbsTTS from espnet2.tts.gst.style_encoder import StyleEncoder from espnet2.tts.variance_predictor import VariancePredictor class FastSpeech2(AbsTTS): """FastSpeech2 module. This is a module of FastSpeech2 described in `FastSpeech 2: Fast and High-Quality End-to-End Text to Speech`_. Instead of quantized pitch and energy, we use token-averaged value introduced in `FastPitch: Parallel Text-to-speech with Pitch Prediction`_. .. _`FastSpeech 2: Fast and High-Quality End-to-End Text to Speech`: https://arxiv.org/abs/2006.04558 .. _`FastPitch: Parallel Text-to-speech with Pitch Prediction`: https://arxiv.org/abs/2006.06873 """ def __init__( self, # network structure related idim: int, odim: int, adim: int = 384, aheads: int = 4, elayers: int = 6, eunits: int = 1536, dlayers: int = 6, dunits: int = 1536, postnet_layers: int = 5, postnet_chans: int = 512, postnet_filts: int = 5, positionwise_layer_type: str = "conv1d", positionwise_conv_kernel_size: int = 1, use_scaled_pos_enc: bool = True, use_batch_norm: bool = True, encoder_normalize_before: bool = True, decoder_normalize_before: bool = True, encoder_concat_after: bool = False, decoder_concat_after: bool = False, reduction_factor: int = 1, encoder_type: str = "transformer", decoder_type: str = "transformer", # only for conformer conformer_rel_pos_type: str = "legacy", conformer_pos_enc_layer_type: str = "rel_pos", conformer_self_attn_layer_type: str = "rel_selfattn", conformer_activation_type: str = "swish", use_macaron_style_in_conformer: bool = True, use_cnn_in_conformer: bool = True, zero_triu: bool = False, conformer_enc_kernel_size: int = 7, conformer_dec_kernel_size: int = 31, # duration predictor duration_predictor_layers: int = 2, duration_predictor_chans: int = 384, duration_predictor_kernel_size: int = 3, # energy predictor energy_predictor_layers: int = 2, energy_predictor_chans: int = 384, energy_predictor_kernel_size: int = 3, energy_predictor_dropout: float = 0.5, energy_embed_kernel_size: int = 9, energy_embed_dropout: float = 0.5, stop_gradient_from_energy_predictor: bool = False, # pitch predictor pitch_predictor_layers: int = 2, pitch_predictor_chans: int = 384, pitch_predictor_kernel_size: int = 3, pitch_predictor_dropout: float = 0.5, pitch_embed_kernel_size: int = 9, pitch_embed_dropout: float = 0.5, stop_gradient_from_pitch_predictor: bool = False, # pretrained spk emb spk_embed_dim: int = None, spk_embed_integration_type: str = "add", # GST use_gst: bool = False, gst_tokens: int = 10, gst_heads: int = 4, gst_conv_layers: int = 6, gst_conv_chans_list: Sequence[int] = (32, 32, 64, 64, 128, 128), gst_conv_kernel_size: int = 3, gst_conv_stride: int = 2, gst_gru_layers: int = 1, gst_gru_units: int = 128, # training related transformer_enc_dropout_rate: float = 0.1, transformer_enc_positional_dropout_rate: float = 0.1, transformer_enc_attn_dropout_rate: float = 0.1, transformer_dec_dropout_rate: float = 0.1, transformer_dec_positional_dropout_rate: float = 0.1, transformer_dec_attn_dropout_rate: float = 0.1, duration_predictor_dropout_rate: float = 0.1, postnet_dropout_rate: float = 0.5, init_type: str = "xavier_uniform", init_enc_alpha: float = 1.0, init_dec_alpha: float = 1.0, use_masking: bool = False, use_weighted_masking: bool = False, ): """Initialize FastSpeech2 module.""" assert check_argument_types() super().__init__() # store hyperparameters self.idim = idim self.odim = odim self.eos = idim - 1 self.reduction_factor = reduction_factor self.encoder_type = encoder_type self.decoder_type = decoder_type self.stop_gradient_from_pitch_predictor = stop_gradient_from_pitch_predictor self.stop_gradient_from_energy_predictor = stop_gradient_from_energy_predictor self.use_scaled_pos_enc = use_scaled_pos_enc self.use_gst = use_gst self.spk_embed_dim = spk_embed_dim if self.spk_embed_dim is not None: self.spk_embed_integration_type = spk_embed_integration_type # use idx 0 as padding idx self.padding_idx = 0 # get positional encoding class pos_enc_class = ( ScaledPositionalEncoding if self.use_scaled_pos_enc else PositionalEncoding ) # check relative positional encoding compatibility if "conformer" in [encoder_type, decoder_type]: if conformer_rel_pos_type == "legacy": if conformer_pos_enc_layer_type == "rel_pos": conformer_pos_enc_layer_type = "legacy_rel_pos" logging.warning( "Fallback to conformer_pos_enc_layer_type = 'legacy_rel_pos' " "due to the compatibility. If you want to use the new one, " "please use conformer_pos_enc_layer_type = 'latest'." ) if conformer_self_attn_layer_type == "rel_selfattn": conformer_self_attn_layer_type = "legacy_rel_selfattn" logging.warning( "Fallback to " "conformer_self_attn_layer_type = 'legacy_rel_selfattn' " "due to the compatibility. If you want to use the new one, " "please use conformer_pos_enc_layer_type = 'latest'." ) elif conformer_rel_pos_type == "latest": assert conformer_pos_enc_layer_type != "legacy_rel_pos" assert conformer_self_attn_layer_type != "legacy_rel_selfattn" else: raise ValueError(f"Unknown rel_pos_type: {conformer_rel_pos_type}") # define encoder encoder_input_layer = torch.nn.Embedding( num_embeddings=idim, embedding_dim=adim, padding_idx=self.padding_idx ) if encoder_type == "transformer": self.encoder = TransformerEncoder( idim=idim, attention_dim=adim, attention_heads=aheads, linear_units=eunits, num_blocks=elayers, input_layer=encoder_input_layer, dropout_rate=transformer_enc_dropout_rate, positional_dropout_rate=transformer_enc_positional_dropout_rate, attention_dropout_rate=transformer_enc_attn_dropout_rate, pos_enc_class=pos_enc_class, normalize_before=encoder_normalize_before, concat_after=encoder_concat_after, positionwise_layer_type=positionwise_layer_type, positionwise_conv_kernel_size=positionwise_conv_kernel_size, ) elif encoder_type == "conformer": self.encoder = ConformerEncoder( idim=idim, attention_dim=adim, attention_heads=aheads, linear_units=eunits, num_blocks=elayers, input_layer=encoder_input_layer, dropout_rate=transformer_enc_dropout_rate, positional_dropout_rate=transformer_enc_positional_dropout_rate, attention_dropout_rate=transformer_enc_attn_dropout_rate, normalize_before=encoder_normalize_before, concat_after=encoder_concat_after, positionwise_layer_type=positionwise_layer_type, positionwise_conv_kernel_size=positionwise_conv_kernel_size, macaron_style=use_macaron_style_in_conformer, pos_enc_layer_type=conformer_pos_enc_layer_type, selfattention_layer_type=conformer_self_attn_layer_type, activation_type=conformer_activation_type, use_cnn_module=use_cnn_in_conformer, cnn_module_kernel=conformer_enc_kernel_size, zero_triu=zero_triu, ) else: raise ValueError(f"{encoder_type} is not supported.") # define GST if self.use_gst: self.gst = StyleEncoder( idim=odim, # the input is mel-spectrogram gst_tokens=gst_tokens, gst_token_dim=adim, gst_heads=gst_heads, conv_layers=gst_conv_layers, conv_chans_list=gst_conv_chans_list, conv_kernel_size=gst_conv_kernel_size, conv_stride=gst_conv_stride, gru_layers=gst_gru_layers, gru_units=gst_gru_units, ) # define additional projection for speaker embedding if self.spk_embed_dim is not None: if self.spk_embed_integration_type == "add": self.projection = torch.nn.Linear(self.spk_embed_dim, adim) else: self.projection = torch.nn.Linear(adim + self.spk_embed_dim, adim) # define duration predictor self.duration_predictor = DurationPredictor( idim=adim, n_layers=duration_predictor_layers, n_chans=duration_predictor_chans, kernel_size=duration_predictor_kernel_size, dropout_rate=duration_predictor_dropout_rate, ) # define pitch predictor self.pitch_predictor = VariancePredictor( idim=adim, n_layers=pitch_predictor_layers, n_chans=pitch_predictor_chans, kernel_size=pitch_predictor_kernel_size, dropout_rate=pitch_predictor_dropout, ) # NOTE(kan-bayashi): We use continuous pitch + FastPitch style avg self.pitch_embed = torch.nn.Sequential( torch.nn.Conv1d( in_channels=1, out_channels=adim, kernel_size=pitch_embed_kernel_size, padding=(pitch_embed_kernel_size - 1) // 2, ), torch.nn.Dropout(pitch_embed_dropout), ) # define energy predictor self.energy_predictor = VariancePredictor( idim=adim, n_layers=energy_predictor_layers, n_chans=energy_predictor_chans, kernel_size=energy_predictor_kernel_size, dropout_rate=energy_predictor_dropout, ) # NOTE(kan-bayashi): We use continuous enegy + FastPitch style avg self.energy_embed = torch.nn.Sequential( torch.nn.Conv1d( in_channels=1, out_channels=adim, kernel_size=energy_embed_kernel_size, padding=(energy_embed_kernel_size - 1) // 2, ), torch.nn.Dropout(energy_embed_dropout), ) # define length regulator self.length_regulator = LengthRegulator() # define decoder # NOTE: we use encoder as decoder # because fastspeech's decoder is the same as encoder if decoder_type == "transformer": self.decoder = TransformerEncoder( idim=0, attention_dim=adim, attention_heads=aheads, linear_units=dunits, num_blocks=dlayers, input_layer=None, dropout_rate=transformer_dec_dropout_rate, positional_dropout_rate=transformer_dec_positional_dropout_rate, attention_dropout_rate=transformer_dec_attn_dropout_rate, pos_enc_class=pos_enc_class, normalize_before=decoder_normalize_before, concat_after=decoder_concat_after, positionwise_layer_type=positionwise_layer_type, positionwise_conv_kernel_size=positionwise_conv_kernel_size, ) elif decoder_type == "conformer": self.decoder = ConformerEncoder( idim=0, attention_dim=adim, attention_heads=aheads, linear_units=dunits, num_blocks=dlayers, input_layer=None, dropout_rate=transformer_dec_dropout_rate, positional_dropout_rate=transformer_dec_positional_dropout_rate, attention_dropout_rate=transformer_dec_attn_dropout_rate, normalize_before=decoder_normalize_before, concat_after=decoder_concat_after, positionwise_layer_type=positionwise_layer_type, positionwise_conv_kernel_size=positionwise_conv_kernel_size, macaron_style=use_macaron_style_in_conformer, pos_enc_layer_type=conformer_pos_enc_layer_type, selfattention_layer_type=conformer_self_attn_layer_type, activation_type=conformer_activation_type, use_cnn_module=use_cnn_in_conformer, cnn_module_kernel=conformer_dec_kernel_size, ) else: raise ValueError(f"{decoder_type} is not supported.") # define final projection self.feat_out = torch.nn.Linear(adim, odim * reduction_factor) # define postnet self.postnet = ( None if postnet_layers == 0 else Postnet( idim=idim, odim=odim, n_layers=postnet_layers, n_chans=postnet_chans, n_filts=postnet_filts, use_batch_norm=use_batch_norm, dropout_rate=postnet_dropout_rate, ) ) # initialize parameters self._reset_parameters( init_type=init_type, init_enc_alpha=init_enc_alpha, init_dec_alpha=init_dec_alpha, ) # define criterions self.criterion = FastSpeech2Loss( use_masking=use_masking, use_weighted_masking=use_weighted_masking ) def forward( self, text: torch.Tensor, text_lengths: torch.Tensor, speech: torch.Tensor, speech_lengths: torch.Tensor, durations: torch.Tensor, durations_lengths: torch.Tensor, pitch: torch.Tensor, pitch_lengths: torch.Tensor, energy: torch.Tensor, energy_lengths: torch.Tensor, spembs: torch.Tensor = None, ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor], torch.Tensor]: """Calculate forward propagation. Args: text (LongTensor): Batch of padded token ids (B, Tmax). text_lengths (LongTensor): Batch of lengths of each input (B,). speech (Tensor): Batch of padded target features (B, Lmax, odim). speech_lengths (LongTensor): Batch of the lengths of each target (B,). durations (LongTensor): Batch of padded durations (B, Tmax + 1). durations_lengths (LongTensor): Batch of duration lengths (B, Tmax + 1). pitch (Tensor): Batch of padded token-averaged pitch (B, Tmax + 1, 1). pitch_lengths (LongTensor): Batch of pitch lengths (B, Tmax + 1). energy (Tensor): Batch of padded token-averaged energy (B, Tmax + 1, 1). energy_lengths (LongTensor): Batch of energy lengths (B, Tmax + 1). spembs (Tensor, optional): Batch of speaker embeddings (B, spk_embed_dim). Returns: Tensor: Loss scalar value. Dict: Statistics to be monitored. Tensor: Weight value. """ text = text[:, : text_lengths.max()] # for data-parallel speech = speech[:, : speech_lengths.max()] # for data-parallel durations = durations[:, : durations_lengths.max()] # for data-parallel pitch = pitch[:, : pitch_lengths.max()] # for data-parallel energy = energy[:, : energy_lengths.max()] # for data-parallel batch_size = text.size(0) # Add eos at the last of sequence xs = F.pad(text, [0, 1], "constant", self.padding_idx) for i, l in enumerate(text_lengths): xs[i, l] = self.eos ilens = text_lengths + 1 ys, ds, ps, es = speech, durations, pitch, energy olens = speech_lengths # forward propagation before_outs, after_outs, d_outs, p_outs, e_outs = self._forward( xs, ilens, ys, olens, ds, ps, es, spembs=spembs, is_inference=False ) # modify mod part of groundtruth if self.reduction_factor > 1: olens = olens.new([olen - olen % self.reduction_factor for olen in olens]) max_olen = max(olens) ys = ys[:, :max_olen] # calculate loss if self.postnet is None: after_outs = None # calculate loss l1_loss, duration_loss, pitch_loss, energy_loss = self.criterion( after_outs=after_outs, before_outs=before_outs, d_outs=d_outs, p_outs=p_outs, e_outs=e_outs, ys=ys, ds=ds, ps=ps, es=es, ilens=ilens, olens=olens, ) loss = l1_loss + duration_loss + pitch_loss + energy_loss stats = dict( l1_loss=l1_loss.item(), duration_loss=duration_loss.item(), pitch_loss=pitch_loss.item(), energy_loss=energy_loss.item(), loss=loss.item(), ) # report extra information if self.encoder_type == "transformer" and self.use_scaled_pos_enc: stats.update( encoder_alpha=self.encoder.embed[-1].alpha.data.item(), ) if self.decoder_type == "transformer" and self.use_scaled_pos_enc: stats.update( decoder_alpha=self.decoder.embed[-1].alpha.data.item(), ) loss, stats, weight = force_gatherable((loss, stats, batch_size), loss.device) return loss, stats, weight def _forward( self, xs: torch.Tensor, ilens: torch.Tensor, ys: torch.Tensor = None, olens: torch.Tensor = None, ds: torch.Tensor = None, ps: torch.Tensor = None, es: torch.Tensor = None, spembs: torch.Tensor = None, is_inference: bool = False, alpha: float = 1.0, ) -> Sequence[torch.Tensor]: # forward encoder x_masks = self._source_mask(ilens) hs, _ = self.encoder(xs, x_masks) # (B, Tmax, adim) # integrate with GST if self.use_gst: style_embs = self.gst(ys) hs = hs + style_embs.unsqueeze(1) # integrate speaker embedding if self.spk_embed_dim is not None: hs = self._integrate_with_spk_embed(hs, spembs) # forward duration predictor and variance predictors d_masks = make_pad_mask(ilens).to(xs.device) if self.stop_gradient_from_pitch_predictor: p_outs = self.pitch_predictor(hs.detach(), d_masks.unsqueeze(-1)) else: p_outs = self.pitch_predictor(hs, d_masks.unsqueeze(-1)) if self.stop_gradient_from_energy_predictor: e_outs = self.energy_predictor(hs.detach(), d_masks.unsqueeze(-1)) else: e_outs = self.energy_predictor(hs, d_masks.unsqueeze(-1)) if is_inference: d_outs = self.duration_predictor.inference(hs, d_masks) # (B, Tmax) # use prediction in inference p_embs = self.pitch_embed(p_outs.transpose(1, 2)).transpose(1, 2) e_embs = self.energy_embed(e_outs.transpose(1, 2)).transpose(1, 2) hs = hs + e_embs + p_embs hs = self.length_regulator(hs, d_outs, alpha) # (B, Lmax, adim) else: d_outs = self.duration_predictor(hs, d_masks) # use groundtruth in training p_embs = self.pitch_embed(ps.transpose(1, 2)).transpose(1, 2) e_embs = self.energy_embed(es.transpose(1, 2)).transpose(1, 2) hs = hs + e_embs + p_embs hs = self.length_regulator(hs, ds) # (B, Lmax, adim) # forward decoder if olens is not None and not is_inference: if self.reduction_factor > 1: olens_in = olens.new([olen // self.reduction_factor for olen in olens]) else: olens_in = olens h_masks = self._source_mask(olens_in) else: h_masks = None zs, _ = self.decoder(hs, h_masks) # (B, Lmax, adim) before_outs = self.feat_out(zs).view( zs.size(0), -1, self.odim ) # (B, Lmax, odim) # postnet -> (B, Lmax//r * r, odim) if self.postnet is None: after_outs = before_outs else: after_outs = before_outs + self.postnet( before_outs.transpose(1, 2) ).transpose(1, 2) return before_outs, after_outs, d_outs, p_outs, e_outs def inference( self, text: torch.Tensor, speech: torch.Tensor = None, spembs: torch.Tensor = None, durations: torch.Tensor = None, pitch: torch.Tensor = None, energy: torch.Tensor = None, alpha: float = 1.0, use_teacher_forcing: bool = False, ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: """Generate the sequence of features given the sequences of characters. Args: text (LongTensor): Input sequence of characters (T,). speech (Tensor, optional): Feature sequence to extract style (N, idim). spembs (Tensor, optional): Speaker embedding vector (spk_embed_dim,). durations (LongTensor, optional): Groundtruth of duration (T + 1,). pitch (Tensor, optional): Groundtruth of token-averaged pitch (T + 1, 1). energy (Tensor, optional): Groundtruth of token-averaged energy (T + 1, 1). alpha (float, optional): Alpha to control the speed. use_teacher_forcing (bool, optional): Whether to use teacher forcing. If true, groundtruth of duration, pitch and energy will be used. Returns: Tensor: Output sequence of features (L, odim). None: Dummy for compatibility. None: Dummy for compatibility. """ x, y = text, speech spemb, d, p, e = spembs, durations, pitch, energy # add eos at the last of sequence x = F.pad(x, [0, 1], "constant", self.eos) # setup batch axis ilens = torch.tensor([x.shape[0]], dtype=torch.long, device=x.device) xs, ys = x.unsqueeze(0), None if y is not None: ys = y.unsqueeze(0) if spemb is not None: spembs = spemb.unsqueeze(0) if use_teacher_forcing: # use groundtruth of duration, pitch, and energy ds, ps, es = d.unsqueeze(0), p.unsqueeze(0), e.unsqueeze(0) _, outs, *_ = self._forward( xs, ilens, ys, ds=ds, ps=ps, es=es, spembs=spembs, ) # (1, L, odim) else: _, outs, *_ = self._forward( xs, ilens, ys, spembs=spembs, is_inference=True, alpha=alpha, ) # (1, L, odim) return outs[0], None, None def _integrate_with_spk_embed( self, hs: torch.Tensor, spembs: torch.Tensor ) -> torch.Tensor: """Integrate speaker embedding with hidden states. Args: hs (Tensor): Batch of hidden state sequences (B, Tmax, adim). spembs (Tensor): Batch of speaker embeddings (B, spk_embed_dim). Returns: Tensor: Batch of integrated hidden state sequences (B, Tmax, adim). """ if self.spk_embed_integration_type == "add": # apply projection and then add to hidden states spembs = self.projection(F.normalize(spembs)) hs = hs + spembs.unsqueeze(1) elif self.spk_embed_integration_type == "concat": # concat hidden states with spk embeds and then apply projection spembs = F.normalize(spembs).unsqueeze(1).expand(-1, hs.size(1), -1) hs = self.projection(torch.cat([hs, spembs], dim=-1)) else: raise NotImplementedError("support only add or concat.") return hs def _source_mask(self, ilens: torch.Tensor) -> torch.Tensor: """Make masks for self-attention. Args: ilens (LongTensor): Batch of lengths (B,). Returns: Tensor: Mask tensor for self-attention. dtype=torch.uint8 in PyTorch 1.2- dtype=torch.bool in PyTorch 1.2+ (including 1.2) Examples: >>> ilens = [5, 3] >>> self._source_mask(ilens) tensor([[[1, 1, 1, 1, 1], [1, 1, 1, 0, 0]]], dtype=torch.uint8) """ x_masks = make_non_pad_mask(ilens).to(next(self.parameters()).device) return x_masks.unsqueeze(-2) def _reset_parameters( self, init_type: str, init_enc_alpha: float, init_dec_alpha: float ): # initialize parameters if init_type != "pytorch": initialize(self, init_type) # initialize alpha in scaled positional encoding if self.encoder_type == "transformer" and self.use_scaled_pos_enc: self.encoder.embed[-1].alpha.data = torch.tensor(init_enc_alpha) if self.decoder_type == "transformer" and self.use_scaled_pos_enc: self.decoder.embed[-1].alpha.data = torch.tensor(init_dec_alpha) class FastSpeech2Loss(torch.nn.Module): """Loss function module for FastSpeech2.""" def __init__(self, use_masking: bool = True, use_weighted_masking: bool = False): """Initialize feed-forward Transformer loss module. Args: use_masking (bool): Whether to apply masking for padded part in loss calculation. use_weighted_masking (bool): Whether to weighted masking in loss calculation. """ assert check_argument_types() super().__init__() assert (use_masking != use_weighted_masking) or not use_masking self.use_masking = use_masking self.use_weighted_masking = use_weighted_masking # define criterions reduction = "none" if self.use_weighted_masking else "mean" self.l1_criterion = torch.nn.L1Loss(reduction=reduction) self.mse_criterion = torch.nn.MSELoss(reduction=reduction) self.duration_criterion = DurationPredictorLoss(reduction=reduction) def forward( self, after_outs: torch.Tensor, before_outs: torch.Tensor, d_outs: torch.Tensor, p_outs: torch.Tensor, e_outs: torch.Tensor, ys: torch.Tensor, ds: torch.Tensor, ps: torch.Tensor, es: torch.Tensor, ilens: torch.Tensor, olens: torch.Tensor, ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """Calculate forward propagation. Args: after_outs (Tensor): Batch of outputs after postnets (B, Lmax, odim). before_outs (Tensor): Batch of outputs before postnets (B, Lmax, odim). d_outs (LongTensor): Batch of outputs of duration predictor (B, Tmax). p_outs (Tensor): Batch of outputs of pitch predictor (B, Tmax, 1). e_outs (Tensor): Batch of outputs of energy predictor (B, Tmax, 1). ys (Tensor): Batch of target features (B, Lmax, odim). ds (LongTensor): Batch of durations (B, Tmax). ps (Tensor): Batch of target token-averaged pitch (B, Tmax, 1). es (Tensor): Batch of target token-averaged energy (B, Tmax, 1). ilens (LongTensor): Batch of the lengths of each input (B,). olens (LongTensor): Batch of the lengths of each target (B,). Returns: Tensor: L1 loss value. Tensor: Duration predictor loss value. Tensor: Pitch predictor loss value. Tensor: Energy predictor loss value. """ # apply mask to remove padded part if self.use_masking: out_masks = make_non_pad_mask(olens).unsqueeze(-1).to(ys.device) before_outs = before_outs.masked_select(out_masks) if after_outs is not None: after_outs = after_outs.masked_select(out_masks) ys = ys.masked_select(out_masks) duration_masks = make_non_pad_mask(ilens).to(ys.device) d_outs = d_outs.masked_select(duration_masks) ds = ds.masked_select(duration_masks) pitch_masks = make_non_pad_mask(ilens).unsqueeze(-1).to(ys.device) p_outs = p_outs.masked_select(pitch_masks) e_outs = e_outs.masked_select(pitch_masks) ps = ps.masked_select(pitch_masks) es = es.masked_select(pitch_masks) # calculate loss l1_loss = self.l1_criterion(before_outs, ys) if after_outs is not None: l1_loss += self.l1_criterion(after_outs, ys) duration_loss = self.duration_criterion(d_outs, ds) pitch_loss = self.mse_criterion(p_outs, ps) energy_loss = self.mse_criterion(e_outs, es) # make weighted mask and apply it if self.use_weighted_masking: out_masks = make_non_pad_mask(olens).unsqueeze(-1).to(ys.device) out_weights = out_masks.float() / out_masks.sum(dim=1, keepdim=True).float() out_weights /= ys.size(0) * ys.size(2) duration_masks = make_non_pad_mask(ilens).to(ys.device) duration_weights = ( duration_masks.float() / duration_masks.sum(dim=1, keepdim=True).float() ) duration_weights /= ds.size(0) # apply weight l1_loss = l1_loss.mul(out_weights).masked_select(out_masks).sum() duration_loss = ( duration_loss.mul(duration_weights).masked_select(duration_masks).sum() ) pitch_masks = duration_masks.unsqueeze(-1) pitch_weights = duration_weights.unsqueeze(-1) pitch_loss = pitch_loss.mul(pitch_weights).masked_select(pitch_masks).sum() energy_loss = ( energy_loss.mul(pitch_weights).masked_select(pitch_masks).sum() ) return l1_loss, duration_loss, pitch_loss, energy_loss