# Copyright (c) Facebook, Inc. and its affiliates. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. # # Created on 2018/12 # Author: Kaituo XU # Modified on 2019/11 by Alexandre Defossez, added support for multiple output channels # Here is the original license: # The MIT License (MIT) # # Copyright (c) 2018 Kaituo XU # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import math import torch import torch.nn as nn import torch.nn.functional as F from .utils import capture_init EPS = 1e-8 def overlap_and_add(signal, frame_step): outer_dimensions = signal.size()[:-2] frames, frame_length = signal.size()[-2:] subframe_length = math.gcd(frame_length, frame_step) # gcd=Greatest Common Divisor subframe_step = frame_step // subframe_length subframes_per_frame = frame_length // subframe_length output_size = frame_step * (frames - 1) + frame_length output_subframes = output_size // subframe_length subframe_signal = signal.view(*outer_dimensions, -1, subframe_length) frame = torch.arange(0, output_subframes, device=signal.device).unfold(0, subframes_per_frame, subframe_step) frame = frame.long() # signal may in GPU or CPU frame = frame.contiguous().view(-1) result = signal.new_zeros(*outer_dimensions, output_subframes, subframe_length) result.index_add_(-2, frame, subframe_signal) result = result.view(*outer_dimensions, -1) return result class ConvTasNet(nn.Module): @capture_init def __init__(self, sources, N=256, L=20, B=256, H=512, P=3, X=8, R=4, audio_channels=2, norm_type="gLN", causal=False, mask_nonlinear='relu', samplerate=44100, segment_length=44100 * 2 * 4): """ Args: sources: list of sources N: Number of filters in autoencoder L: Length of the filters (in samples) B: Number of channels in bottleneck 1 × 1-conv block H: Number of channels in convolutional blocks P: Kernel size in convolutional blocks X: Number of convolutional blocks in each repeat R: Number of repeats norm_type: BN, gLN, cLN causal: causal or non-causal mask_nonlinear: use which non-linear function to generate mask """ super(ConvTasNet, self).__init__() # Hyper-parameter self.sources = sources self.C = len(sources) self.N, self.L, self.B, self.H, self.P, self.X, self.R = N, L, B, H, P, X, R self.norm_type = norm_type self.causal = causal self.mask_nonlinear = mask_nonlinear self.audio_channels = audio_channels self.samplerate = samplerate self.segment_length = segment_length # Components self.encoder = Encoder(L, N, audio_channels) self.separator = TemporalConvNet( N, B, H, P, X, R, self.C, norm_type, causal, mask_nonlinear) self.decoder = Decoder(N, L, audio_channels) # init for p in self.parameters(): if p.dim() > 1: nn.init.xavier_normal_(p) def valid_length(self, length): return length def forward(self, mixture): """ Args: mixture: [M, T], M is batch size, T is #samples Returns: est_source: [M, C, T] """ mixture_w = self.encoder(mixture) est_mask = self.separator(mixture_w) est_source = self.decoder(mixture_w, est_mask) # T changed after conv1d in encoder, fix it here T_origin = mixture.size(-1) T_conv = est_source.size(-1) est_source = F.pad(est_source, (0, T_origin - T_conv)) return est_source class Encoder(nn.Module): """Estimation of the nonnegative mixture weight by a 1-D conv layer. """ def __init__(self, L, N, audio_channels): super(Encoder, self).__init__() # Hyper-parameter self.L, self.N = L, N # Components # 50% overlap self.conv1d_U = nn.Conv1d(audio_channels, N, kernel_size=L, stride=L // 2, bias=False) def forward(self, mixture): """ Args: mixture: [M, T], M is batch size, T is #samples Returns: mixture_w: [M, N, K], where K = (T-L)/(L/2)+1 = 2T/L-1 """ mixture_w = F.relu(self.conv1d_U(mixture)) # [M, N, K] return mixture_w class Decoder(nn.Module): def __init__(self, N, L, audio_channels): super(Decoder, self).__init__() # Hyper-parameter self.N, self.L = N, L self.audio_channels = audio_channels # Components self.basis_signals = nn.Linear(N, audio_channels * L, bias=False) def forward(self, mixture_w, est_mask): """ Args: mixture_w: [M, N, K] est_mask: [M, C, N, K] Returns: est_source: [M, C, T] """ # D = W * M source_w = torch.unsqueeze(mixture_w, 1) * est_mask # [M, C, N, K] source_w = torch.transpose(source_w, 2, 3) # [M, C, K, N] # S = DV est_source = self.basis_signals(source_w) # [M, C, K, ac * L] m, c, k, _ = est_source.size() est_source = est_source.view(m, c, k, self.audio_channels, -1).transpose(2, 3).contiguous() est_source = overlap_and_add(est_source, self.L // 2) # M x C x ac x T return est_source class TemporalConvNet(nn.Module): def __init__(self, N, B, H, P, X, R, C, norm_type="gLN", causal=False, mask_nonlinear='relu'): """ Args: N: Number of filters in autoencoder B: Number of channels in bottleneck 1 × 1-conv block H: Number of channels in convolutional blocks P: Kernel size in convolutional blocks X: Number of convolutional blocks in each repeat R: Number of repeats C: Number of speakers norm_type: BN, gLN, cLN causal: causal or non-causal mask_nonlinear: use which non-linear function to generate mask """ super(TemporalConvNet, self).__init__() # Hyper-parameter self.C = C self.mask_nonlinear = mask_nonlinear # Components # [M, N, K] -> [M, N, K] layer_norm = ChannelwiseLayerNorm(N) # [M, N, K] -> [M, B, K] bottleneck_conv1x1 = nn.Conv1d(N, B, 1, bias=False) # [M, B, K] -> [M, B, K] repeats = [] for r in range(R): blocks = [] for x in range(X): dilation = 2**x padding = (P - 1) * dilation if causal else (P - 1) * dilation // 2 blocks += [ TemporalBlock(B, H, P, stride=1, padding=padding, dilation=dilation, norm_type=norm_type, causal=causal) ] repeats += [nn.Sequential(*blocks)] temporal_conv_net = nn.Sequential(*repeats) # [M, B, K] -> [M, C*N, K] mask_conv1x1 = nn.Conv1d(B, C * N, 1, bias=False) # Put together self.network = nn.Sequential(layer_norm, bottleneck_conv1x1, temporal_conv_net, mask_conv1x1) def forward(self, mixture_w): """ Keep this API same with TasNet Args: mixture_w: [M, N, K], M is batch size returns: est_mask: [M, C, N, K] """ M, N, K = mixture_w.size() score = self.network(mixture_w) # [M, N, K] -> [M, C*N, K] score = score.view(M, self.C, N, K) # [M, C*N, K] -> [M, C, N, K] if self.mask_nonlinear == 'softmax': est_mask = F.softmax(score, dim=1) elif self.mask_nonlinear == 'relu': est_mask = F.relu(score) else: raise ValueError("Unsupported mask non-linear function") return est_mask class TemporalBlock(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, stride, padding, dilation, norm_type="gLN", causal=False): super(TemporalBlock, self).__init__() # [M, B, K] -> [M, H, K] conv1x1 = nn.Conv1d(in_channels, out_channels, 1, bias=False) prelu = nn.PReLU() norm = chose_norm(norm_type, out_channels) # [M, H, K] -> [M, B, K] dsconv = DepthwiseSeparableConv(out_channels, in_channels, kernel_size, stride, padding, dilation, norm_type, causal) # Put together self.net = nn.Sequential(conv1x1, prelu, norm, dsconv) def forward(self, x): """ Args: x: [M, B, K] Returns: [M, B, K] """ residual = x out = self.net(x) # TODO: when P = 3 here works fine, but when P = 2 maybe need to pad? return out + residual # look like w/o F.relu is better than w/ F.relu # return F.relu(out + residual) class DepthwiseSeparableConv(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, stride, padding, dilation, norm_type="gLN", causal=False): super(DepthwiseSeparableConv, self).__init__() # Use `groups` option to implement depthwise convolution # [M, H, K] -> [M, H, K] depthwise_conv = nn.Conv1d(in_channels, in_channels, kernel_size, stride=stride, padding=padding, dilation=dilation, groups=in_channels, bias=False) if causal: chomp = Chomp1d(padding) prelu = nn.PReLU() norm = chose_norm(norm_type, in_channels) # [M, H, K] -> [M, B, K] pointwise_conv = nn.Conv1d(in_channels, out_channels, 1, bias=False) # Put together if causal: self.net = nn.Sequential(depthwise_conv, chomp, prelu, norm, pointwise_conv) else: self.net = nn.Sequential(depthwise_conv, prelu, norm, pointwise_conv) def forward(self, x): """ Args: x: [M, H, K] Returns: result: [M, B, K] """ return self.net(x) class Chomp1d(nn.Module): """To ensure the output length is the same as the input. """ def __init__(self, chomp_size): super(Chomp1d, self).__init__() self.chomp_size = chomp_size def forward(self, x): """ Args: x: [M, H, Kpad] Returns: [M, H, K] """ return x[:, :, :-self.chomp_size].contiguous() def chose_norm(norm_type, channel_size): """The input of normlization will be (M, C, K), where M is batch size, C is channel size and K is sequence length. """ if norm_type == "gLN": return GlobalLayerNorm(channel_size) elif norm_type == "cLN": return ChannelwiseLayerNorm(channel_size) elif norm_type == "id": return nn.Identity() else: # norm_type == "BN": # Given input (M, C, K), nn.BatchNorm1d(C) will accumulate statics # along M and K, so this BN usage is right. return nn.BatchNorm1d(channel_size) # TODO: Use nn.LayerNorm to impl cLN to speed up class ChannelwiseLayerNorm(nn.Module): """Channel-wise Layer Normalization (cLN)""" def __init__(self, channel_size): super(ChannelwiseLayerNorm, self).__init__() self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.reset_parameters() def reset_parameters(self): self.gamma.data.fill_(1) self.beta.data.zero_() def forward(self, y): """ Args: y: [M, N, K], M is batch size, N is channel size, K is length Returns: cLN_y: [M, N, K] """ mean = torch.mean(y, dim=1, keepdim=True) # [M, 1, K] var = torch.var(y, dim=1, keepdim=True, unbiased=False) # [M, 1, K] cLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta return cLN_y class GlobalLayerNorm(nn.Module): """Global Layer Normalization (gLN)""" def __init__(self, channel_size): super(GlobalLayerNorm, self).__init__() self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1] self.reset_parameters() def reset_parameters(self): self.gamma.data.fill_(1) self.beta.data.zero_() def forward(self, y): """ Args: y: [M, N, K], M is batch size, N is channel size, K is length Returns: gLN_y: [M, N, K] """ # TODO: in torch 1.0, torch.mean() support dim list mean = y.mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) # [M, 1, 1] var = (torch.pow(y - mean, 2)).mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) gLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta return gLN_y if __name__ == "__main__": torch.manual_seed(123) M, N, L, T = 2, 3, 4, 12 K = 2 * T // L - 1 B, H, P, X, R, C, norm_type, causal = 2, 3, 3, 3, 2, 2, "gLN", False mixture = torch.randint(3, (M, T)) # test Encoder encoder = Encoder(L, N) encoder.conv1d_U.weight.data = torch.randint(2, encoder.conv1d_U.weight.size()) mixture_w = encoder(mixture) print('mixture', mixture) print('U', encoder.conv1d_U.weight) print('mixture_w', mixture_w) print('mixture_w size', mixture_w.size()) # test TemporalConvNet separator = TemporalConvNet(N, B, H, P, X, R, C, norm_type=norm_type, causal=causal) est_mask = separator(mixture_w) print('est_mask', est_mask) # test Decoder decoder = Decoder(N, L) est_mask = torch.randint(2, (B, K, C, N)) est_source = decoder(mixture_w, est_mask) print('est_source', est_source) # test Conv-TasNet conv_tasnet = ConvTasNet(N, L, B, H, P, X, R, C, norm_type=norm_type) est_source = conv_tasnet(mixture) print('est_source', est_source) print('est_source size', est_source.size())