### credit: https://github.com/dunky11/voicesmith import math from typing import Tuple import torch import torch.nn as nn # pylint: disable=consider-using-from-import import torch.nn.functional as F from TTS.tts.layers.delightful_tts.conv_layers import Conv1dGLU, DepthWiseConv1d, PointwiseConv1d from TTS.tts.layers.delightful_tts.networks import GLUActivation def calc_same_padding(kernel_size: int) -> Tuple[int, int]: pad = kernel_size // 2 return (pad, pad - (kernel_size + 1) % 2) class Conformer(nn.Module): def __init__( self, dim: int, n_layers: int, n_heads: int, speaker_embedding_dim: int, p_dropout: float, kernel_size_conv_mod: int, lrelu_slope: float, ): """ A Transformer variant that integrates both CNNs and Transformers components. Conformer proposes a novel combination of self-attention and convolution, in which self-attention learns the global interaction while the convolutions efficiently capture the local correlations. Args: dim (int): Number of the dimensions for the model. n_layers (int): Number of model layers. n_heads (int): The number of attention heads. speaker_embedding_dim (int): Number of speaker embedding dimensions. p_dropout (float): Probabilty of dropout. kernel_size_conv_mod (int): Size of kernels for convolution modules. Inputs: inputs, mask - **inputs** (batch, time, dim): Tensor containing input vector - **encoding** (batch, time, dim): Positional embedding tensor - **mask** (batch, 1, time2) or (batch, time1, time2): Tensor containing indices to be masked Returns: - **outputs** (batch, time, dim): Tensor produced by Conformer Encoder. """ super().__init__() d_k = d_v = dim // n_heads self.layer_stack = nn.ModuleList( [ ConformerBlock( dim, n_heads, d_k, d_v, kernel_size_conv_mod=kernel_size_conv_mod, dropout=p_dropout, speaker_embedding_dim=speaker_embedding_dim, lrelu_slope=lrelu_slope, ) for _ in range(n_layers) ] ) def forward( self, x: torch.Tensor, mask: torch.Tensor, speaker_embedding: torch.Tensor, encoding: torch.Tensor, ) -> torch.Tensor: """ Shapes: - x: :math:`[B, T_src, C]` - mask: :math: `[B]` - speaker_embedding: :math: `[B, C]` - encoding: :math: `[B, T_max2, C]` """ attn_mask = mask.view((mask.shape[0], 1, 1, mask.shape[1])) for enc_layer in self.layer_stack: x = enc_layer( x, mask=mask, slf_attn_mask=attn_mask, speaker_embedding=speaker_embedding, encoding=encoding, ) return x class ConformerBlock(torch.nn.Module): def __init__( self, d_model: int, n_head: int, d_k: int, # pylint: disable=unused-argument d_v: int, # pylint: disable=unused-argument kernel_size_conv_mod: int, speaker_embedding_dim: int, dropout: float, lrelu_slope: float = 0.3, ): """ A Conformer block is composed of four modules stacked together, A feed-forward module, a self-attention module, a convolution module, and a second feed-forward module in the end. The block starts with two Feed forward modules sandwiching the Multi-Headed Self-Attention module and the Conv module. Args: d_model (int): The dimension of model n_head (int): The number of attention heads. kernel_size_conv_mod (int): Size of kernels for convolution modules. speaker_embedding_dim (int): Number of speaker embedding dimensions. emotion_embedding_dim (int): Number of emotion embedding dimensions. dropout (float): Probabilty of dropout. Inputs: inputs, mask - **inputs** (batch, time, dim): Tensor containing input vector - **encoding** (batch, time, dim): Positional embedding tensor - **slf_attn_mask** (batch, 1, 1, time1): Tensor containing indices to be masked in self attention module - **mask** (batch, 1, time2) or (batch, time1, time2): Tensor containing indices to be masked Returns: - **outputs** (batch, time, dim): Tensor produced by the Conformer Block. """ super().__init__() if isinstance(speaker_embedding_dim, int): self.conditioning = Conv1dGLU( d_model=d_model, kernel_size=kernel_size_conv_mod, padding=kernel_size_conv_mod // 2, embedding_dim=speaker_embedding_dim, ) self.ff = FeedForward(d_model=d_model, dropout=dropout, kernel_size=3, lrelu_slope=lrelu_slope) self.conformer_conv_1 = ConformerConvModule( d_model, kernel_size=kernel_size_conv_mod, dropout=dropout, lrelu_slope=lrelu_slope ) self.ln = nn.LayerNorm(d_model) self.slf_attn = ConformerMultiHeadedSelfAttention(d_model=d_model, num_heads=n_head, dropout_p=dropout) self.conformer_conv_2 = ConformerConvModule( d_model, kernel_size=kernel_size_conv_mod, dropout=dropout, lrelu_slope=lrelu_slope ) def forward( self, x: torch.Tensor, speaker_embedding: torch.Tensor, mask: torch.Tensor, slf_attn_mask: torch.Tensor, encoding: torch.Tensor, ) -> torch.Tensor: """ Shapes: - x: :math:`[B, T_src, C]` - mask: :math: `[B]` - slf_attn_mask: :math: `[B, 1, 1, T_src]` - speaker_embedding: :math: `[B, C]` - emotion_embedding: :math: `[B, C]` - encoding: :math: `[B, T_max2, C]` """ if speaker_embedding is not None: x = self.conditioning(x, embeddings=speaker_embedding) x = self.ff(x) + x x = self.conformer_conv_1(x) + x res = x x = self.ln(x) x, _ = self.slf_attn(query=x, key=x, value=x, mask=slf_attn_mask, encoding=encoding) x = x + res x = x.masked_fill(mask.unsqueeze(-1), 0) x = self.conformer_conv_2(x) + x return x class FeedForward(nn.Module): def __init__( self, d_model: int, kernel_size: int, dropout: float, lrelu_slope: float, expansion_factor: int = 4, ): """ Feed Forward module for conformer block. Args: d_model (int): The dimension of model. kernel_size (int): Size of the kernels for conv layers. dropout (float): probability of dropout. expansion_factor (int): The factor by which to project the number of channels. lrelu_slope (int): the negative slope factor for the leaky relu activation. Inputs: inputs - **inputs** (batch, time, dim): Tensor containing input vector Returns: - **outputs** (batch, time, dim): Tensor produced by the feed forward module. """ super().__init__() self.dropout = nn.Dropout(dropout) self.ln = nn.LayerNorm(d_model) self.conv_1 = nn.Conv1d( d_model, d_model * expansion_factor, kernel_size=kernel_size, padding=kernel_size // 2, ) self.act = nn.LeakyReLU(lrelu_slope) self.conv_2 = nn.Conv1d(d_model * expansion_factor, d_model, kernel_size=1) def forward(self, x: torch.Tensor) -> torch.Tensor: """ Shapes: x: :math: `[B, T, C]` """ x = self.ln(x) x = x.permute((0, 2, 1)) x = self.conv_1(x) x = x.permute((0, 2, 1)) x = self.act(x) x = self.dropout(x) x = x.permute((0, 2, 1)) x = self.conv_2(x) x = x.permute((0, 2, 1)) x = self.dropout(x) x = 0.5 * x return x class ConformerConvModule(nn.Module): def __init__( self, d_model: int, expansion_factor: int = 2, kernel_size: int = 7, dropout: float = 0.1, lrelu_slope: float = 0.3, ): """ Convolution module for conformer. Starts with a gating machanism. a pointwise convolution and a gated linear unit (GLU). This is followed by a single 1-D depthwise convolution layer. Batchnorm is deployed just after the convolution to help with training. it also contains an expansion factor to project the number of channels. Args: d_model (int): The dimension of model. expansion_factor (int): The factor by which to project the number of channels. kernel_size (int): Size of kernels for convolution modules. dropout (float): Probabilty of dropout. lrelu_slope (float): The slope coefficient for leaky relu activation. Inputs: inputs - **inputs** (batch, time, dim): Tensor containing input vector Returns: - **outputs** (batch, time, dim): Tensor produced by the conv module. """ super().__init__() inner_dim = d_model * expansion_factor self.ln_1 = nn.LayerNorm(d_model) self.conv_1 = PointwiseConv1d(d_model, inner_dim * 2) self.conv_act = GLUActivation(slope=lrelu_slope) self.depthwise = DepthWiseConv1d( inner_dim, inner_dim, kernel_size=kernel_size, padding=calc_same_padding(kernel_size)[0], ) self.ln_2 = nn.GroupNorm(1, inner_dim) self.activation = nn.LeakyReLU(lrelu_slope) self.conv_2 = PointwiseConv1d(inner_dim, d_model) self.dropout = nn.Dropout(dropout) def forward(self, x: torch.Tensor) -> torch.Tensor: """ Shapes: x: :math: `[B, T, C]` """ x = self.ln_1(x) x = x.permute(0, 2, 1) x = self.conv_1(x) x = self.conv_act(x) x = self.depthwise(x) x = self.ln_2(x) x = self.activation(x) x = self.conv_2(x) x = x.permute(0, 2, 1) x = self.dropout(x) return x class ConformerMultiHeadedSelfAttention(nn.Module): """ Conformer employ multi-headed self-attention (MHSA) while integrating an important technique from Transformer-XL, the relative sinusoidal positional encoding scheme. The relative positional encoding allows the self-attention module to generalize better on different input length and the resulting encoder is more robust to the variance of the utterance length. Conformer use prenorm residual units with dropout which helps training and regularizing deeper models. Args: d_model (int): The dimension of model num_heads (int): The number of attention heads. dropout_p (float): probability of dropout Inputs: inputs, mask - **inputs** (batch, time, dim): Tensor containing input vector - **mask** (batch, 1, time2) or (batch, time1, time2): Tensor containing indices to be masked Returns: - **outputs** (batch, time, dim): Tensor produces by relative multi headed self attention module. """ def __init__(self, d_model: int, num_heads: int, dropout_p: float): super().__init__() self.attention = RelativeMultiHeadAttention(d_model=d_model, num_heads=num_heads) self.dropout = nn.Dropout(p=dropout_p) def forward( self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, mask: torch.Tensor, encoding: torch.Tensor, ) -> Tuple[torch.Tensor, torch.Tensor]: batch_size, seq_length, _ = key.size() # pylint: disable=unused-variable encoding = encoding[:, : key.shape[1]] encoding = encoding.repeat(batch_size, 1, 1) outputs, attn = self.attention(query, key, value, pos_embedding=encoding, mask=mask) outputs = self.dropout(outputs) return outputs, attn class RelativeMultiHeadAttention(nn.Module): """ Multi-head attention with relative positional encoding. This concept was proposed in the "Transformer-XL: Attentive Language Models Beyond a Fixed-Length Context" Args: d_model (int): The dimension of model num_heads (int): The number of attention heads. Inputs: query, key, value, pos_embedding, mask - **query** (batch, time, dim): Tensor containing query vector - **key** (batch, time, dim): Tensor containing key vector - **value** (batch, time, dim): Tensor containing value vector - **pos_embedding** (batch, time, dim): Positional embedding tensor - **mask** (batch, 1, time2) or (batch, time1, time2): Tensor containing indices to be masked Returns: - **outputs**: Tensor produces by relative multi head attention module. """ def __init__( self, d_model: int = 512, num_heads: int = 16, ): super().__init__() assert d_model % num_heads == 0, "d_model % num_heads should be zero." self.d_model = d_model self.d_head = int(d_model / num_heads) self.num_heads = num_heads self.sqrt_dim = math.sqrt(d_model) self.query_proj = nn.Linear(d_model, d_model) self.key_proj = nn.Linear(d_model, d_model, bias=False) self.value_proj = nn.Linear(d_model, d_model, bias=False) self.pos_proj = nn.Linear(d_model, d_model, bias=False) self.u_bias = nn.Parameter(torch.Tensor(self.num_heads, self.d_head)) self.v_bias = nn.Parameter(torch.Tensor(self.num_heads, self.d_head)) torch.nn.init.xavier_uniform_(self.u_bias) torch.nn.init.xavier_uniform_(self.v_bias) self.out_proj = nn.Linear(d_model, d_model) def forward( self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, pos_embedding: torch.Tensor, mask: torch.Tensor, ) -> Tuple[torch.Tensor, torch.Tensor]: batch_size = query.shape[0] query = self.query_proj(query).view(batch_size, -1, self.num_heads, self.d_head) key = self.key_proj(key).view(batch_size, -1, self.num_heads, self.d_head).permute(0, 2, 1, 3) value = self.value_proj(value).view(batch_size, -1, self.num_heads, self.d_head).permute(0, 2, 1, 3) pos_embedding = self.pos_proj(pos_embedding).view(batch_size, -1, self.num_heads, self.d_head) u_bias = self.u_bias.expand_as(query) v_bias = self.v_bias.expand_as(query) a = (query + u_bias).transpose(1, 2) content_score = a @ key.transpose(2, 3) b = (query + v_bias).transpose(1, 2) pos_score = b @ pos_embedding.permute(0, 2, 3, 1) pos_score = self._relative_shift(pos_score) score = content_score + pos_score score = score * (1.0 / self.sqrt_dim) score.masked_fill_(mask, -1e9) attn = F.softmax(score, -1) context = (attn @ value).transpose(1, 2) context = context.contiguous().view(batch_size, -1, self.d_model) return self.out_proj(context), attn def _relative_shift(self, pos_score: torch.Tensor) -> torch.Tensor: # pylint: disable=no-self-use batch_size, num_heads, seq_length1, seq_length2 = pos_score.size() zeros = torch.zeros((batch_size, num_heads, seq_length1, 1), device=pos_score.device) padded_pos_score = torch.cat([zeros, pos_score], dim=-1) padded_pos_score = padded_pos_score.view(batch_size, num_heads, seq_length2 + 1, seq_length1) pos_score = padded_pos_score[:, :, 1:].view_as(pos_score) return pos_score class MultiHeadAttention(nn.Module): """ input: query --- [N, T_q, query_dim] key --- [N, T_k, key_dim] output: out --- [N, T_q, num_units] """ def __init__(self, query_dim: int, key_dim: int, num_units: int, num_heads: int): super().__init__() self.num_units = num_units self.num_heads = num_heads self.key_dim = key_dim self.W_query = nn.Linear(in_features=query_dim, out_features=num_units, bias=False) self.W_key = nn.Linear(in_features=key_dim, out_features=num_units, bias=False) self.W_value = nn.Linear(in_features=key_dim, out_features=num_units, bias=False) def forward(self, query: torch.Tensor, key: torch.Tensor) -> torch.Tensor: querys = self.W_query(query) # [N, T_q, num_units] keys = self.W_key(key) # [N, T_k, num_units] values = self.W_value(key) split_size = self.num_units // self.num_heads querys = torch.stack(torch.split(querys, split_size, dim=2), dim=0) # [h, N, T_q, num_units/h] keys = torch.stack(torch.split(keys, split_size, dim=2), dim=0) # [h, N, T_k, num_units/h] values = torch.stack(torch.split(values, split_size, dim=2), dim=0) # [h, N, T_k, num_units/h] # score = softmax(QK^T / (d_k ** 0.5)) scores = torch.matmul(querys, keys.transpose(2, 3)) # [h, N, T_q, T_k] scores = scores / (self.key_dim**0.5) scores = F.softmax(scores, dim=3) # out = score * V out = torch.matmul(scores, values) # [h, N, T_q, num_units/h] out = torch.cat(torch.split(out, 1, dim=0), dim=3).squeeze(0) # [N, T_q, num_units] return out