|
import math |
|
import torch |
|
import numpy as np |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from typing import List, Optional, Tuple |
|
from .configuration import AVHubertConfig |
|
from fairseq.utils import index_put |
|
from fairseq.modules import LayerNorm, SamePad |
|
from fairseq.models.wav2vec.wav2vec2 import TransformerSentenceEncoderLayer |
|
from fairseq.modules.transformer_sentence_encoder import init_bert_params |
|
|
|
|
|
class TransformerEncoder(nn.Module): |
|
def __init__(self, config: AVHubertConfig) -> None: |
|
super().__init__() |
|
|
|
self.dropout = config.dropout |
|
self.embedding_dim = config.encoder_embed_dim |
|
|
|
self.pos_conv = nn.Conv1d( |
|
self.embedding_dim, |
|
self.embedding_dim, |
|
kernel_size=config.conv_pos, |
|
padding=config.conv_pos // 2, |
|
groups=config.conv_pos_groups, |
|
) |
|
dropout = 0 |
|
std = math.sqrt((4 * (1.0 - dropout)) / (config.conv_pos * self.embedding_dim)) |
|
nn.init.normal_(self.pos_conv.weight, mean=0, std=std) |
|
nn.init.constant_(self.pos_conv.bias, 0) |
|
|
|
self.pos_conv = nn.utils.weight_norm( |
|
self.pos_conv, name="weight", dim=2 |
|
) |
|
self.pos_conv = nn.Sequential(self.pos_conv, SamePad(config.conv_pos), nn.GELU()) |
|
|
|
self.layers = nn.ModuleList( |
|
[ |
|
TransformerSentenceEncoderLayer( |
|
embedding_dim=self.embedding_dim, |
|
ffn_embedding_dim=config.encoder_ffn_embed_dim, |
|
num_attention_heads=config.encoder_attention_heads, |
|
dropout=self.dropout, |
|
attention_dropout=config.attention_dropout, |
|
activation_dropout=config.activation_dropout, |
|
activation_fn=config.activation_fn, |
|
layer_norm_first=config.layer_norm_first, |
|
) |
|
for _ in range(config.encoder_layers) |
|
] |
|
) |
|
|
|
self.layer_norm_first = config.layer_norm_first |
|
self.layer_norm = LayerNorm(self.embedding_dim) |
|
self.layerdrop = config.encoder_layerdrop |
|
|
|
self.apply(init_bert_params) |
|
|
|
def forward( |
|
self, |
|
x: torch.Tensor, |
|
padding_mask: Optional[torch.Tensor] = None, |
|
layer: Optional[int] = None, |
|
) -> Tuple[torch.Tensor, List[Tuple[torch.Tensor, torch.Tensor]]]: |
|
x, layer_results = self.extract_features(x, padding_mask, layer) |
|
if self.layer_norm_first and layer is None: |
|
x = self.layer_norm(x) |
|
return x, layer_results |
|
|
|
def extract_features( |
|
self, |
|
x: torch.Tensor, |
|
padding_mask: Optional[torch.Tensor] = None, |
|
tgt_layer: Optional[int] = None, |
|
) -> Tuple[torch.Tensor, List[Tuple[torch.Tensor, torch.Tensor]]]: |
|
if padding_mask is not None: |
|
x = index_put(x, padding_mask, 0) |
|
|
|
x_conv = self.pos_conv(x.transpose(1, 2)) |
|
x_conv = x_conv.transpose(1, 2) |
|
x = x + x_conv |
|
|
|
if not self.layer_norm_first: |
|
x = self.layer_norm(x) |
|
|
|
x = F.dropout(x, p=self.dropout, training=self.training) |
|
|
|
|
|
x = x.transpose(0, 1) |
|
|
|
layer_results = [] |
|
r = None |
|
for i, layer in enumerate(self.layers): |
|
dropout_probability = np.random.random() |
|
if not self.training or (dropout_probability > self.layerdrop): |
|
x, z = layer(x, self_attn_padding_mask=padding_mask, need_weights=False) |
|
if tgt_layer is not None: |
|
layer_results.append((x, z)) |
|
if i == tgt_layer: |
|
r = x |
|
break |
|
|
|
if r is not None: |
|
x = r |
|
|
|
|
|
x = x.transpose(0, 1) |
|
|
|
return x, layer_results |
|
|