# Copyright (c) 2022 Ximalaya Inc. (authors: Yuguang Yang) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Modified from Squeezeformer(https://github.com/kssteven418/Squeezeformer) # Squeezeformer(https://github.com/upskyy/Squeezeformer) # NeMo(https://github.com/NVIDIA/NeMo) """DepthwiseConv2dSubsampling4 and TimeReductionLayer definition.""" import torch import torch.nn as nn import torch.nn.functional as F from wenet.transformer.subsampling import BaseSubsampling from typing import Tuple from wenet.squeezeformer.conv2d import Conv2dValid class DepthwiseConv2dSubsampling4(BaseSubsampling): """Depthwise Convolutional 2D subsampling (to 1/4 length). Args: idim (int): Input dimension. odim (int): Output dimension. pos_enc_class (nn.Module): position encoding class. dw_stride (int): Whether do depthwise convolution. input_size (int): filter bank dimension. """ def __init__(self, idim: int, odim: int, pos_enc_class: torch.nn.Module, dw_stride: bool = False, input_size: int = 80, input_dropout_rate: float = 0.1, init_weights: bool = True): super(DepthwiseConv2dSubsampling4, self).__init__() self.idim = idim self.odim = odim self.pw_conv = nn.Conv2d(in_channels=idim, out_channels=odim, kernel_size=3, stride=2) self.act1 = nn.ReLU() self.dw_conv = nn.Conv2d(in_channels=odim, out_channels=odim, kernel_size=3, stride=2, groups=odim if dw_stride else 1) self.act2 = nn.ReLU() self.pos_enc = pos_enc_class self.input_proj = nn.Sequential( nn.Linear(odim * (((input_size - 1) // 2 - 1) // 2), odim), nn.Dropout(p=input_dropout_rate), ) if init_weights: linear_max = (odim * input_size / 4)**-0.5 torch.nn.init.uniform_(self.input_proj.state_dict()['0.weight'], -linear_max, linear_max) torch.nn.init.uniform_(self.input_proj.state_dict()['0.bias'], -linear_max, linear_max) self.subsampling_rate = 4 # 6 = (3 - 1) * 1 + (3 - 1) * 2 self.right_context = 6 def forward( self, x: torch.Tensor, x_mask: torch.Tensor, offset: int = 0 ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: x = x.unsqueeze(1) # (b, c=1, t, f) x = self.pw_conv(x) x = self.act1(x) x = self.dw_conv(x) x = self.act2(x) b, c, t, f = x.size() x = x.permute(0, 2, 1, 3) x = x.contiguous().view(b, t, c * f) x, pos_emb = self.pos_enc(x, offset) x = self.input_proj(x) return x, pos_emb, x_mask[:, :, :-2:2][:, :, :-2:2] class TimeReductionLayer1D(nn.Module): """ Modified NeMo, Squeezeformer Time Reduction procedure. Downsamples the audio by `stride` in the time dimension. Args: channel (int): input dimension of MultiheadAttentionMechanism and PositionwiseFeedForward out_dim (int): Output dimension of the module. kernel_size (int): Conv kernel size for depthwise convolution in convolution module stride (int): Downsampling factor in time dimension. """ def __init__(self, channel: int, out_dim: int, kernel_size: int = 5, stride: int = 2): super(TimeReductionLayer1D, self).__init__() self.channel = channel self.out_dim = out_dim self.kernel_size = kernel_size self.stride = stride self.padding = max(0, self.kernel_size - self.stride) self.dw_conv = nn.Conv1d( in_channels=channel, out_channels=channel, kernel_size=kernel_size, stride=stride, padding=self.padding, groups=channel, ) self.pw_conv = nn.Conv1d( in_channels=channel, out_channels=out_dim, kernel_size=1, stride=1, padding=0, groups=1, ) self.init_weights() def init_weights(self): dw_max = self.kernel_size**-0.5 pw_max = self.channel**-0.5 torch.nn.init.uniform_(self.dw_conv.weight, -dw_max, dw_max) torch.nn.init.uniform_(self.dw_conv.bias, -dw_max, dw_max) torch.nn.init.uniform_(self.pw_conv.weight, -pw_max, pw_max) torch.nn.init.uniform_(self.pw_conv.bias, -pw_max, pw_max) def forward( self, xs, xs_lens: torch.Tensor, mask: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), mask_pad: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), ): xs = xs.transpose(1, 2) # [B, C, T] xs = xs.masked_fill(mask_pad.eq(0), 0.0) xs = self.dw_conv(xs) xs = self.pw_conv(xs) xs = xs.transpose(1, 2) # [B, T, C] B, T, D = xs.size() mask = mask[:, ::self.stride, ::self.stride] mask_pad = mask_pad[:, :, ::self.stride] L = mask_pad.size(-1) # For JIT exporting, we remove F.pad operator. if L - T < 0: xs = xs[:, :L - T, :].contiguous() else: dummy_pad = torch.zeros(B, L - T, D, device=xs.device) xs = torch.cat([xs, dummy_pad], dim=1) xs_lens = torch.div(xs_lens + 1, 2, rounding_mode='trunc') return xs, xs_lens, mask, mask_pad class TimeReductionLayer2D(nn.Module): def __init__(self, kernel_size: int = 5, stride: int = 2, encoder_dim: int = 256): super(TimeReductionLayer2D, self).__init__() self.encoder_dim = encoder_dim self.kernel_size = kernel_size self.dw_conv = Conv2dValid(in_channels=encoder_dim, out_channels=encoder_dim, kernel_size=(kernel_size, 1), stride=stride, valid_trigy=True) self.pw_conv = Conv2dValid( in_channels=encoder_dim, out_channels=encoder_dim, kernel_size=1, stride=1, valid_trigx=False, valid_trigy=False, ) self.kernel_size = kernel_size self.stride = stride self.init_weights() def init_weights(self): dw_max = self.kernel_size**-0.5 pw_max = self.encoder_dim**-0.5 torch.nn.init.uniform_(self.dw_conv.weight, -dw_max, dw_max) torch.nn.init.uniform_(self.dw_conv.bias, -dw_max, dw_max) torch.nn.init.uniform_(self.pw_conv.weight, -pw_max, pw_max) torch.nn.init.uniform_(self.pw_conv.bias, -pw_max, pw_max) def forward( self, xs: torch.Tensor, xs_lens: torch.Tensor, mask: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), mask_pad: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: xs = xs.masked_fill(mask_pad.transpose(1, 2).eq(0), 0.0) xs = xs.unsqueeze(2) padding1 = self.kernel_size - self.stride xs = F.pad(xs, (0, 0, 0, 0, 0, padding1, 0, 0), mode='constant', value=0.) xs = self.dw_conv(xs.permute(0, 3, 1, 2)) xs = self.pw_conv(xs).permute(0, 3, 2, 1).squeeze(1).contiguous() tmp_length = xs.size(1) xs_lens = torch.div(xs_lens + 1, 2, rounding_mode='trunc') padding2 = max(0, (xs_lens.max() - tmp_length).data.item()) batch_size, hidden = xs.size(0), xs.size(-1) dummy_pad = torch.zeros(batch_size, padding2, hidden, device=xs.device) xs = torch.cat([xs, dummy_pad], dim=1) mask = mask[:, ::2, ::2] mask_pad = mask_pad[:, :, ::2] return xs, xs_lens, mask, mask_pad class TimeReductionLayerStream(nn.Module): """ Squeezeformer Time Reduction procedure. Downsamples the audio by `stride` in the time dimension. Args: channel (int): input dimension of MultiheadAttentionMechanism and PositionwiseFeedForward out_dim (int): Output dimension of the module. kernel_size (int): Conv kernel size for depthwise convolution in convolution module stride (int): Downsampling factor in time dimension. """ def __init__(self, channel: int, out_dim: int, kernel_size: int = 1, stride: int = 2): super(TimeReductionLayerStream, self).__init__() self.channel = channel self.out_dim = out_dim self.kernel_size = kernel_size self.stride = stride self.dw_conv = nn.Conv1d( in_channels=channel, out_channels=channel, kernel_size=kernel_size, stride=stride, padding=0, groups=channel, ) self.pw_conv = nn.Conv1d( in_channels=channel, out_channels=out_dim, kernel_size=1, stride=1, padding=0, groups=1, ) self.init_weights() def init_weights(self): dw_max = self.kernel_size**-0.5 pw_max = self.channel**-0.5 torch.nn.init.uniform_(self.dw_conv.weight, -dw_max, dw_max) torch.nn.init.uniform_(self.dw_conv.bias, -dw_max, dw_max) torch.nn.init.uniform_(self.pw_conv.weight, -pw_max, pw_max) torch.nn.init.uniform_(self.pw_conv.bias, -pw_max, pw_max) def forward( self, xs, xs_lens: torch.Tensor, mask: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), mask_pad: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), ): xs = xs.transpose(1, 2) # [B, C, T] xs = xs.masked_fill(mask_pad.eq(0), 0.0) xs = self.dw_conv(xs) xs = self.pw_conv(xs) xs = xs.transpose(1, 2) # [B, T, C] B, T, D = xs.size() mask = mask[:, ::self.stride, ::self.stride] mask_pad = mask_pad[:, :, ::self.stride] L = mask_pad.size(-1) # For JIT exporting, we remove F.pad operator. if L - T < 0: xs = xs[:, :L - T, :].contiguous() else: dummy_pad = torch.zeros(B, L - T, D, device=xs.device) xs = torch.cat([xs, dummy_pad], dim=1) xs_lens = torch.div(xs_lens + 1, 2, rounding_mode='trunc') return xs, xs_lens, mask, mask_pad