akhaliq3
spaces demo
5019931
raw
history blame
No virus
17.4 kB
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchlibrosa.stft import ISTFT, STFT, magphase
from bytesep.models.pytorch_modules import Base, Subband, act, init_bn, init_layer
class ConvBlockRes(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, activation, momentum):
r"""Residual block."""
super(ConvBlockRes, self).__init__()
self.activation = activation
padding = [kernel_size[0] // 2, kernel_size[1] // 2]
self.bn1 = nn.BatchNorm2d(in_channels, momentum=momentum)
self.bn2 = nn.BatchNorm2d(out_channels, momentum=momentum)
self.conv1 = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=(1, 1),
dilation=(1, 1),
padding=padding,
bias=False,
)
self.conv2 = nn.Conv2d(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=(1, 1),
dilation=(1, 1),
padding=padding,
bias=False,
)
if in_channels != out_channels:
self.shortcut = nn.Conv2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=(1, 1),
stride=(1, 1),
padding=(0, 0),
)
self.is_shortcut = True
else:
self.is_shortcut = False
self.init_weights()
def init_weights(self):
init_bn(self.bn1)
init_bn(self.bn2)
init_layer(self.conv1)
init_layer(self.conv2)
if self.is_shortcut:
init_layer(self.shortcut)
def forward(self, x):
origin = x
x = self.conv1(act(self.bn1(x), self.activation))
x = self.conv2(act(self.bn2(x), self.activation))
if self.is_shortcut:
return self.shortcut(origin) + x
else:
return origin + x
class EncoderBlockRes4B(nn.Module):
def __init__(
self, in_channels, out_channels, kernel_size, downsample, activation, momentum
):
r"""Encoder block, contains 8 convolutional layers."""
super(EncoderBlockRes4B, self).__init__()
self.conv_block1 = ConvBlockRes(
in_channels, out_channels, kernel_size, activation, momentum
)
self.conv_block2 = ConvBlockRes(
out_channels, out_channels, kernel_size, activation, momentum
)
self.conv_block3 = ConvBlockRes(
out_channels, out_channels, kernel_size, activation, momentum
)
self.conv_block4 = ConvBlockRes(
out_channels, out_channels, kernel_size, activation, momentum
)
self.downsample = downsample
def forward(self, x):
encoder = self.conv_block1(x)
encoder = self.conv_block2(encoder)
encoder = self.conv_block3(encoder)
encoder = self.conv_block4(encoder)
encoder_pool = F.avg_pool2d(encoder, kernel_size=self.downsample)
return encoder_pool, encoder
class DecoderBlockRes4B(nn.Module):
def __init__(
self, in_channels, out_channels, kernel_size, upsample, activation, momentum
):
r"""Decoder block, contains 1 transpose convolutional and 8 convolutional layers."""
super(DecoderBlockRes4B, self).__init__()
self.kernel_size = kernel_size
self.stride = upsample
self.activation = activation
self.conv1 = torch.nn.ConvTranspose2d(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=self.stride,
stride=self.stride,
padding=(0, 0),
bias=False,
dilation=(1, 1),
)
self.bn1 = nn.BatchNorm2d(in_channels, momentum=momentum)
self.conv_block2 = ConvBlockRes(
out_channels * 2, out_channels, kernel_size, activation, momentum
)
self.conv_block3 = ConvBlockRes(
out_channels, out_channels, kernel_size, activation, momentum
)
self.conv_block4 = ConvBlockRes(
out_channels, out_channels, kernel_size, activation, momentum
)
self.conv_block5 = ConvBlockRes(
out_channels, out_channels, kernel_size, activation, momentum
)
self.init_weights()
def init_weights(self):
init_bn(self.bn1)
init_layer(self.conv1)
def forward(self, input_tensor, concat_tensor):
x = self.conv1(act(self.bn1(input_tensor), self.activation))
x = torch.cat((x, concat_tensor), dim=1)
x = self.conv_block2(x)
x = self.conv_block3(x)
x = self.conv_block4(x)
x = self.conv_block5(x)
return x
class ResUNet143_DecouplePlus(nn.Module, Base):
def __init__(self, input_channels, target_sources_num):
super(ResUNet143_DecouplePlus, self).__init__()
self.input_channels = input_channels
self.target_sources_num = target_sources_num
window_size = 2048
hop_size = 441
center = True
pad_mode = "reflect"
window = "hann"
activation = "relu"
momentum = 0.01
self.subbands_num = 4
self.K = 4 # outputs: |M|, cos∠M, sin∠M, |M2|
self.downsample_ratio = 2 ** 6 # This number equals 2^{#encoder_blcoks}
self.stft = STFT(
n_fft=window_size,
hop_length=hop_size,
win_length=window_size,
window=window,
center=center,
pad_mode=pad_mode,
freeze_parameters=True,
)
self.istft = ISTFT(
n_fft=window_size,
hop_length=hop_size,
win_length=window_size,
window=window,
center=center,
pad_mode=pad_mode,
freeze_parameters=True,
)
self.bn0 = nn.BatchNorm2d(window_size // 2 + 1, momentum=momentum)
self.subband = Subband(subbands_num=self.subbands_num)
self.encoder_block1 = EncoderBlockRes4B(
in_channels=input_channels * self.subbands_num,
out_channels=32,
kernel_size=(3, 3),
downsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.encoder_block2 = EncoderBlockRes4B(
in_channels=32,
out_channels=64,
kernel_size=(3, 3),
downsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.encoder_block3 = EncoderBlockRes4B(
in_channels=64,
out_channels=128,
kernel_size=(3, 3),
downsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.encoder_block4 = EncoderBlockRes4B(
in_channels=128,
out_channels=256,
kernel_size=(3, 3),
downsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.encoder_block5 = EncoderBlockRes4B(
in_channels=256,
out_channels=384,
kernel_size=(3, 3),
downsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.encoder_block6 = EncoderBlockRes4B(
in_channels=384,
out_channels=384,
kernel_size=(3, 3),
downsample=(1, 2),
activation=activation,
momentum=momentum,
)
self.conv_block7a = EncoderBlockRes4B(
in_channels=384,
out_channels=384,
kernel_size=(3, 3),
downsample=(1, 1),
activation=activation,
momentum=momentum,
)
self.conv_block7b = EncoderBlockRes4B(
in_channels=384,
out_channels=384,
kernel_size=(3, 3),
downsample=(1, 1),
activation=activation,
momentum=momentum,
)
self.conv_block7c = EncoderBlockRes4B(
in_channels=384,
out_channels=384,
kernel_size=(3, 3),
downsample=(1, 1),
activation=activation,
momentum=momentum,
)
self.conv_block7d = EncoderBlockRes4B(
in_channels=384,
out_channels=384,
kernel_size=(3, 3),
downsample=(1, 1),
activation=activation,
momentum=momentum,
)
self.decoder_block1 = DecoderBlockRes4B(
in_channels=384,
out_channels=384,
kernel_size=(3, 3),
upsample=(1, 2),
activation=activation,
momentum=momentum,
)
self.decoder_block2 = DecoderBlockRes4B(
in_channels=384,
out_channels=384,
kernel_size=(3, 3),
upsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.decoder_block3 = DecoderBlockRes4B(
in_channels=384,
out_channels=256,
kernel_size=(3, 3),
upsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.decoder_block4 = DecoderBlockRes4B(
in_channels=256,
out_channels=128,
kernel_size=(3, 3),
upsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.decoder_block5 = DecoderBlockRes4B(
in_channels=128,
out_channels=64,
kernel_size=(3, 3),
upsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.decoder_block6 = DecoderBlockRes4B(
in_channels=64,
out_channels=32,
kernel_size=(3, 3),
upsample=(2, 2),
activation=activation,
momentum=momentum,
)
self.after_conv_block1 = EncoderBlockRes4B(
in_channels=32,
out_channels=32,
kernel_size=(3, 3),
downsample=(1, 1),
activation=activation,
momentum=momentum,
)
self.after_conv2 = nn.Conv2d(
in_channels=32,
out_channels=input_channels
* self.subbands_num
* target_sources_num
* self.K,
kernel_size=(1, 1),
stride=(1, 1),
padding=(0, 0),
bias=True,
)
self.init_weights()
def init_weights(self):
init_bn(self.bn0)
init_layer(self.after_conv2)
def feature_maps_to_wav(
self,
input_tensor: torch.Tensor,
sp: torch.Tensor,
sin_in: torch.Tensor,
cos_in: torch.Tensor,
audio_length: int,
) -> torch.Tensor:
r"""Convert feature maps to waveform.
Args:
input_tensor: (batch_size, feature_maps, time_steps, freq_bins)
sp: (batch_size, feature_maps, time_steps, freq_bins)
sin_in: (batch_size, feature_maps, time_steps, freq_bins)
cos_in: (batch_size, feature_maps, time_steps, freq_bins)
Outputs:
waveform: (batch_size, target_sources_num * input_channels, segment_samples)
"""
batch_size, _, time_steps, freq_bins = input_tensor.shape
x = input_tensor.reshape(
batch_size,
self.target_sources_num,
self.input_channels,
self.K,
time_steps,
freq_bins,
)
# x: (batch_size, target_sources_num, input_channles, K, time_steps, freq_bins)
mask_mag = torch.sigmoid(x[:, :, :, 0, :, :])
_mask_real = torch.tanh(x[:, :, :, 1, :, :])
_mask_imag = torch.tanh(x[:, :, :, 2, :, :])
linear_mag = x[:, :, :, 3, :, :]
_, mask_cos, mask_sin = magphase(_mask_real, _mask_imag)
# mask_cos, mask_sin: (batch_size, target_sources_num, input_channles, time_steps, freq_bins)
# Y = |Y|cos∠Y + j|Y|sin∠Y
# = |Y|cos(∠X + ∠M) + j|Y|sin(∠X + ∠M)
# = |Y|(cos∠X cos∠M - sin∠X sin∠M) + j|Y|(sin∠X cos∠M + cos∠X sin∠M)
out_cos = (
cos_in[:, None, :, :, :] * mask_cos - sin_in[:, None, :, :, :] * mask_sin
)
out_sin = (
sin_in[:, None, :, :, :] * mask_cos + cos_in[:, None, :, :, :] * mask_sin
)
# out_cos: (batch_size, target_sources_num, input_channles, time_steps, freq_bins)
# out_sin: (batch_size, target_sources_num, input_channles, time_steps, freq_bins)
# Calculate |Y|.
out_mag = F.relu_(sp[:, None, :, :, :] * mask_mag + linear_mag)
# out_mag: (batch_size, target_sources_num, input_channles, time_steps, freq_bins)
# Calculate Y_{real} and Y_{imag} for ISTFT.
out_real = out_mag * out_cos
out_imag = out_mag * out_sin
# out_real, out_imag: (batch_size, target_sources_num, input_channles, time_steps, freq_bins)
# Reformat shape to (n, 1, time_steps, freq_bins) for ISTFT.
shape = (
batch_size * self.target_sources_num * self.input_channels,
1,
time_steps,
freq_bins,
)
out_real = out_real.reshape(shape)
out_imag = out_imag.reshape(shape)
# ISTFT.
x = self.istft(out_real, out_imag, audio_length)
# (batch_size * target_sources_num * input_channels, segments_num)
# Reshape.
waveform = x.reshape(
batch_size, self.target_sources_num * self.input_channels, audio_length
)
# (batch_size, target_sources_num * input_channels, segments_num)
return waveform
def forward(self, input_dict):
r"""
Args:
input: (batch_size, channels_num, segment_samples)
Outputs:
output_dict: {
'wav': (batch_size, channels_num, segment_samples)
}
"""
mixtures = input_dict['waveform']
# (batch_size, input_channels, segment_samples)
mag, cos_in, sin_in = self.wav_to_spectrogram_phase(mixtures)
# mag, cos_in, sin_in: (batch_size, input_channels, time_steps, freq_bins)
# Batch normalize on individual frequency bins.
x = mag.transpose(1, 3)
x = self.bn0(x)
x = x.transpose(1, 3)
"""(batch_size, input_channels, time_steps, freq_bins)"""
# Pad spectrogram to be evenly divided by downsample ratio.
origin_len = x.shape[2]
pad_len = (
int(np.ceil(x.shape[2] / self.downsample_ratio)) * self.downsample_ratio
- origin_len
)
x = F.pad(x, pad=(0, 0, 0, pad_len))
"""(batch_size, input_channels, padded_time_steps, freq_bins)"""
# Let frequency bins be evenly divided by 2, e.g., 1025 -> 1024
x = x[..., 0 : x.shape[-1] - 1] # (bs, input_channels, T, F)
x = self.subband.analysis(x)
# (bs, input_channels, T, F'), where F' = F // subbands_num
# UNet
(x1_pool, x1) = self.encoder_block1(x) # x1_pool: (bs, 32, T / 2, F / 2)
(x2_pool, x2) = self.encoder_block2(x1_pool) # x2_pool: (bs, 64, T / 4, F / 4)
(x3_pool, x3) = self.encoder_block3(x2_pool) # x3_pool: (bs, 128, T / 8, F / 8)
(x4_pool, x4) = self.encoder_block4(
x3_pool
) # x4_pool: (bs, 256, T / 16, F / 16)
(x5_pool, x5) = self.encoder_block5(
x4_pool
) # x5_pool: (bs, 384, T / 32, F / 32)
(x6_pool, x6) = self.encoder_block6(
x5_pool
) # x6_pool: (bs, 384, T / 32, F / 64)
(x_center, _) = self.conv_block7a(x6_pool) # (bs, 384, T / 32, F / 64)
(x_center, _) = self.conv_block7b(x_center) # (bs, 384, T / 32, F / 64)
(x_center, _) = self.conv_block7c(x_center) # (bs, 384, T / 32, F / 64)
(x_center, _) = self.conv_block7d(x_center) # (bs, 384, T / 32, F / 64)
x7 = self.decoder_block1(x_center, x6) # (bs, 384, T / 32, F / 32)
x8 = self.decoder_block2(x7, x5) # (bs, 384, T / 16, F / 16)
x9 = self.decoder_block3(x8, x4) # (bs, 256, T / 8, F / 8)
x10 = self.decoder_block4(x9, x3) # (bs, 128, T / 4, F / 4)
x11 = self.decoder_block5(x10, x2) # (bs, 64, T / 2, F / 2)
x12 = self.decoder_block6(x11, x1) # (bs, 32, T, F)
(x, _) = self.after_conv_block1(x12) # (bs, 32, T, F)
x = self.after_conv2(x) # (bs, channels * 3, T, F)
# (batch_size, input_channles * subbands_num * targets_num * k, T, F')
x = self.subband.synthesis(x)
# (batch_size, input_channles * targets_num * K, T, F)
# Recover shape
x = F.pad(x, pad=(0, 1)) # Pad frequency, e.g., 1024 -> 1025.
x = x[:, :, 0:origin_len, :] # (bs, feature_maps, time_steps, freq_bins)
audio_length = mixtures.shape[2]
separated_audio = self.feature_maps_to_wav(x, mag, sin_in, cos_in, audio_length)
# separated_audio: (batch_size, target_sources_num * input_channels, segments_num)
output_dict = {'waveform': separated_audio}
return output_dict