import sys |
sys.path.append('/aifs4su/data/zheny/bigcodec_final/BigCodec_conv_transformer_vocos') |
import numpy as np |
import torch |
import torch.nn as nn |
from vq.residual_vq import ResidualVQ |
from vq.module import WNConv1d, DecoderBlock, ResLSTM |
from vq.alias_free_torch import * |
from vq import activations |
from typing import Optional |
from vq.module import ConvNeXtBlock, AdaLayerNorm |
from vq.bs_roformer5 import TransformerBlock |
from torchtune.modules import RotaryPositionalEmbeddings |
from vector_quantize_pytorch import ResidualFSQ |
from torch.nn import Module, ModuleList |
class ISTFT(nn.Module): |
""" |
Custom implementation of ISTFT since torch.istft doesn't allow custom padding (other than `center=True`) with |
windowing. This is because the NOLA (Nonzero Overlap Add) check fails at the edges. |
See issue: https://github.com/pytorch/pytorch/issues/62323 |
Specifically, in the context of neural vocoding we are interested in "same" padding analogous to CNNs. |
The NOLA constraint is met as we trim padded samples anyway. |
Args: |
n_fft (int): Size of Fourier transform. |
hop_length (int): The distance between neighboring sliding window frames. |
win_length (int): The size of window frame and STFT filter. |
padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". |
""" |
def __init__(self, n_fft: int, hop_length: int, win_length: int, padding: str = "same"): |
super().__init__() |
if padding not in ["center", "same"]: |
raise ValueError("Padding must be 'center' or 'same'.") |
self.padding = padding |
self.n_fft = n_fft |
self.hop_length = hop_length |
self.win_length = win_length |
window = torch.hann_window(win_length) |
self.register_buffer("window", window) |
def forward(self, spec: torch.Tensor) -> torch.Tensor: |
""" |
Compute the Inverse Short Time Fourier Transform (ISTFT) of a complex spectrogram. |
Args: |
spec (Tensor): Input complex spectrogram of shape (B, N, T), where B is the batch size, |
N is the number of frequency bins, and T is the number of time frames. |
Returns: |
Tensor: Reconstructed time-domain signal of shape (B, L), where L is the length of the output signal. |
""" |
if self.padding == "center": |
return torch.istft(spec, self.n_fft, self.hop_length, self.win_length, self.window, center=True) |
elif self.padding == "same": |
pad = (self.win_length - self.hop_length) // 2 |
else: |
raise ValueError("Padding must be 'center' or 'same'.") |
assert spec.dim() == 3, "Expected a 3D tensor as input" |
B, N, T = spec.shape |
ifft = torch.fft.irfft(spec, self.n_fft, dim=1, norm="backward") |
ifft = ifft * self.window[None, :, None] |
output_size = (T - 1) * self.hop_length + self.win_length |
y = torch.nn.functional.fold( |
ifft, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length), |
)[:, 0, 0, pad:-pad] |
window_sq = self.window.square().expand(1, T, -1).transpose(1, 2) |
window_envelope = torch.nn.functional.fold( |
window_sq, output_size=(1, output_size), kernel_size=(1, self.win_length), stride=(1, self.hop_length), |
).squeeze()[pad:-pad] |
assert (window_envelope > 1e-11).all() |
y = y / window_envelope |
return y |
class FourierHead(nn.Module): |
"""Base class for inverse fourier modules.""" |
def forward(self, x: torch.Tensor) -> torch.Tensor: |
""" |
Args: |
x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, |
L is the sequence length, and H denotes the model dimension. |
Returns: |
Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. |
""" |
raise NotImplementedError("Subclasses must implement the forward method.") |
class ISTFTHead(FourierHead): |
""" |
ISTFT Head module for predicting STFT complex coefficients. |
Args: |
dim (int): Hidden dimension of the model. |
n_fft (int): Size of Fourier transform. |
hop_length (int): The distance between neighboring sliding window frames, which should align with |
the resolution of the input features. |
padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same". |
""" |
def __init__(self, dim: int, n_fft: int, hop_length: int, padding: str = "same"): |
super().__init__() |
out_dim = n_fft + 2 |
self.out = torch.nn.Linear(dim, out_dim) |
self.istft = ISTFT(n_fft=n_fft, hop_length=hop_length, win_length=n_fft, padding=padding) |
def forward(self, x: torch.Tensor) -> torch.Tensor: |
""" |
Forward pass of the ISTFTHead module. |
Args: |
x (Tensor): Input tensor of shape (B, L, H), where B is the batch size, |
L is the sequence length, and H denotes the model dimension. |
Returns: |
Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal. |
""" |
x_pred = self.out(x ) |
x_pred = x_pred.transpose(1, 2) |
mag, p = x_pred.chunk(2, dim=1) |
mag = torch.exp(mag) |
mag = torch.clip(mag, max=1e2) |
x = torch.cos(p) |
y = torch.sin(p) |
S = mag * (x + 1j * y) |
audio = self.istft(S) |
return audio.unsqueeze(1),x_pred |
def nonlinearity(x): |
return x * torch.sigmoid(x) |
def Normalize(in_channels, num_groups=32): |
return torch.nn.GroupNorm(num_groups=num_groups, num_channels=in_channels, eps=1e-6, affine=True) |
class ResnetBlock(nn.Module): |
def __init__(self, *, in_channels, out_channels=None, conv_shortcut=False, |
dropout, temb_channels=512): |
super().__init__() |
self.in_channels = in_channels |
out_channels = in_channels if out_channels is None else out_channels |
self.out_channels = out_channels |
self.use_conv_shortcut = conv_shortcut |
self.norm1 = Normalize(in_channels) |
self.conv1 = torch.nn.Conv1d(in_channels, |
out_channels, |
kernel_size=3, |
stride=1, |
padding=1) |
if temb_channels > 0: |
self.temb_proj = torch.nn.Linear(temb_channels, |
out_channels) |
self.norm2 = Normalize(out_channels) |
self.dropout = torch.nn.Dropout(dropout) |
self.conv2 = torch.nn.Conv1d(out_channels, |
out_channels, |
kernel_size=3, |
stride=1, |
padding=1) |
if self.in_channels != self.out_channels: |
if self.use_conv_shortcut: |
self.conv_shortcut = torch.nn.Conv1d(in_channels, |
out_channels, |
kernel_size=3, |
stride=1, |
padding=1) |
else: |
self.nin_shortcut = torch.nn.Conv1d(in_channels, |
out_channels, |
kernel_size=1, |
stride=1, |
padding=0) |
def forward(self, x, temb=None): |
h = x |
h = self.norm1(h) |
h = nonlinearity(h) |
h = self.conv1(h) |
if temb is not None: |
h = h + self.temb_proj(nonlinearity(temb))[:, :, None, None] |
h = self.norm2(h) |
h = nonlinearity(h) |
h = self.dropout(h) |
h = self.conv2(h) |
if self.in_channels != self.out_channels: |
if self.use_conv_shortcut: |
x = self.conv_shortcut(x) |
else: |
x = self.nin_shortcut(x) |
return x + h |
class AttnBlock(nn.Module): |
def __init__(self, in_channels): |
super().__init__() |
self.in_channels = in_channels |
self.norm = Normalize(in_channels) |
self.q = torch.nn.Conv1d(in_channels, |
in_channels, |
kernel_size=1, |
stride=1, |
padding=0) |
self.k = torch.nn.Conv1d(in_channels, |
in_channels, |
kernel_size=1, |
stride=1, |
padding=0) |
self.v = torch.nn.Conv1d(in_channels, |
in_channels, |
kernel_size=1, |
stride=1, |
padding=0) |
self.proj_out = torch.nn.Conv1d(in_channels, |
in_channels, |
kernel_size=1, |
stride=1, |
padding=0) |
def forward(self, x): |
h_ = x |
h_ = self.norm(h_) |
q = self.q(h_) |
k = self.k(h_) |
v = self.v(h_) |
b, c, h = q.shape |
q = q.permute(0, 2, 1) |
w_ = torch.bmm(q, k) |
w_ = w_ * (int(c) ** (-0.5)) |
w_ = torch.nn.functional.softmax(w_, dim=2) |
w_ = w_.permute(0, 2, 1) |
h_ = torch.bmm(v, w_) |
h_ = self.proj_out(h_) |
return x + h_ |
def make_attn(in_channels, attn_type="vanilla"): |
assert attn_type in ["vanilla", "linear", "none"], f'attn_type {attn_type} unknown' |
print(f"making attention of type '{attn_type}' with {in_channels} in_channels") |
if attn_type == "vanilla": |
return AttnBlock(in_channels) |
class Backbone(nn.Module): |
"""Base class for the generator's backbone. It preserves the same temporal resolution across all layers.""" |
def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor: |
""" |
Args: |
x (Tensor): Input tensor of shape (B, C, L), where B is the batch size, |
C denotes output features, and L is the sequence length. |
Returns: |
Tensor: Output of shape (B, L, H), where B is the batch size, L is the sequence length, |
and H denotes the model dimension. |
""" |
raise NotImplementedError("Subclasses must implement the forward method.") |
class VocosBackbone(Backbone): |
""" |
Vocos backbone module built with ConvNeXt blocks. Supports additional conditioning with Adaptive Layer Normalization |
Args: |
input_channels (int): Number of input features channels. |
dim (int): Hidden dimension of the model. |
intermediate_dim (int): Intermediate dimension used in ConvNeXtBlock. |
num_layers (int): Number of ConvNeXtBlock layers. |
layer_scale_init_value (float, optional): Initial value for layer scaling. Defaults to `1 / num_layers`. |
adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm. |
None means non-conditional model. Defaults to None. |
""" |
def __init__( |
self, hidden_dim=1024,depth=12,heads=16,pos_meb_dim=64): |
super().__init__() |
self.embed = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=7, padding=3) |
self.temb_ch = 0 |
block_in = hidden_dim |
dropout = 0.1 |
prior_net : tp.List[nn.Module] = [ |
ResnetBlock(in_channels=block_in,out_channels=block_in, |
temb_channels=self.temb_ch,dropout=dropout), |
ResnetBlock(in_channels=block_in,out_channels=block_in, |
temb_channels=self.temb_ch,dropout=dropout), |
] |
self.prior_net = nn.Sequential(*prior_net) |
depth = depth |
time_rotary_embed = RotaryPositionalEmbeddings(dim=pos_meb_dim) |
transformer_blocks = [ |
TransformerBlock(dim=hidden_dim, n_heads=heads, rotary_embed=time_rotary_embed) |
for _ in range(depth) |
] |
self.transformers = nn.Sequential(*transformer_blocks) |
self.final_layer_norm = nn.LayerNorm(hidden_dim, eps=1e-6) |
post_net : tp.List[nn.Module] = [ |
ResnetBlock(in_channels=block_in,out_channels=block_in, |
temb_channels=self.temb_ch,dropout=dropout), |
ResnetBlock(in_channels=block_in,out_channels=block_in, |
temb_channels=self.temb_ch,dropout=dropout), |
] |
self.post_net = nn.Sequential(*post_net) |
def forward(self, x: torch.Tensor ) -> torch.Tensor: |
x = x.transpose(1, 2) |
x = self.embed(x) |
x = self.prior_net(x) |
x = x.transpose(1, 2) |
x= self.transformers(x) |
x = x.transpose(1, 2) |
x = self.post_net(x) |
x = x.transpose(1, 2) |
x = self.final_layer_norm(x) |
return x |
def init_weights(m): |
if isinstance(m, nn.Conv1d): |
nn.init.trunc_normal_(m.weight, std=0.02) |
nn.init.constant_(m.bias, 0) |
class CodecDecoderVocos(nn.Module): |
def __init__(self, |
hidden_dim=1024, |
depth=12, |
heads=16, |
pos_meb_dim=64, |
hop_length=320, |
vq_num_quantizers=1, |
vq_dim=2048, |
vq_commit_weight=0.25, |
vq_weight_init=False, |
vq_full_commit_loss=False, |
codebook_size=16384, |
codebook_dim=16, |
): |
super().__init__() |
self.hop_length = hop_length |
self.quantizer = ResidualFSQ( |
dim = vq_dim, |
levels = [4, 4, 4, 4, 4,4,4,4], |
num_quantizers = 1 |
) |
self.backbone = VocosBackbone( hidden_dim=hidden_dim,depth=depth,heads=heads,pos_meb_dim=pos_meb_dim) |
self.head = ISTFTHead(dim=hidden_dim, n_fft=self.hop_length*4, hop_length=self.hop_length, padding="same") |
self.reset_parameters() |
def forward(self, x, vq=True): |
if vq is True: |
x = x.permute(0, 2, 1) |
x, q = self.quantizer(x) |
x = x.permute(0, 2, 1) |
q = q.permute(0, 2, 1) |
return x, q, None |
x = self.backbone(x) |
x,_ = self.head(x) |
return x ,_ |
def vq2emb(self, vq): |
self.quantizer = self.quantizer.eval() |
x = self.quantizer.vq2emb(vq) |
return x |
def get_emb(self): |
self.quantizer = self.quantizer.eval() |
embs = self.quantizer.get_emb() |
return embs |
def inference_vq(self, vq): |
x = vq[None,:,:] |
x = self.model(x) |
return x |
def inference_0(self, x): |
x, q, loss, perp = self.quantizer(x) |
x = self.model(x) |
return x, None |
def inference(self, x): |
x = self.model(x) |
return x, None |
def remove_weight_norm(self): |
"""Remove weight normalization module from all of the layers.""" |
def _remove_weight_norm(m): |
try: |
torch.nn.utils.remove_weight_norm(m) |
except ValueError: |
return |
self.apply(_remove_weight_norm) |
def apply_weight_norm(self): |
"""Apply weight normalization module from all of the layers.""" |
def _apply_weight_norm(m): |
if isinstance(m, nn.Conv1d) or isinstance(m, nn.ConvTranspose1d): |
torch.nn.utils.weight_norm(m) |
self.apply(_apply_weight_norm) |
def reset_parameters(self): |
self.apply(init_weights) |
class CodecDecoderVocos_transpose(nn.Module): |
def __init__(self, |
hidden_dim=1024, |
depth=12, |
heads=16, |
pos_meb_dim=64, |
hop_length=320, |
vq_num_quantizers=1, |
vq_dim=1024, |
vq_commit_weight=0.25, |
vq_weight_init=False, |
vq_full_commit_loss=False, |
codebook_size=16384, |
codebook_dim=16, |
): |
super().__init__() |
self.hop_length = hop_length |
self.quantizer = ResidualVQ( |
num_quantizers=vq_num_quantizers, |
dim=vq_dim, |
codebook_size=codebook_size, |
codebook_dim=codebook_dim, |
threshold_ema_dead_code=2, |
commitment=vq_commit_weight, |
weight_init=vq_weight_init, |
full_commit_loss=vq_full_commit_loss, |
) |
self.backbone = VocosBackbone( hidden_dim=hidden_dim,depth=depth,heads=heads,pos_meb_dim=pos_meb_dim) |
self.inverse_mel_conv = nn.Sequential( |
nn.GELU(), |
nn.ConvTranspose1d( |
in_channels=hidden_dim, |
out_channels=hidden_dim, |
kernel_size=3, |
stride=2, |
padding=1, |
output_padding=1 |
), |
nn.GELU(), |
nn.ConvTranspose1d( |
in_channels=hidden_dim, |
out_channels=hidden_dim, |
kernel_size=3, |
padding=1 |
) |
) |
self.head = ISTFTHead(dim=hidden_dim, n_fft=self.hop_length*4, hop_length=self.hop_length, padding="same") |
self.reset_parameters() |
def forward(self, x, vq=True): |
if vq is True: |
x, q, commit_loss = self.quantizer(x) |
return x, q, commit_loss |
x = self.backbone(x) |
x,_ = self.head(x) |
return x ,_ |
def vq2emb(self, vq): |
self.quantizer = self.quantizer.eval() |
x = self.quantizer.vq2emb(vq) |
return x |
def get_emb(self): |
self.quantizer = self.quantizer.eval() |
embs = self.quantizer.get_emb() |
return embs |
def inference_vq(self, vq): |
x = vq[None,:,:] |
x = self.model(x) |
return x |
def inference_0(self, x): |
x, q, loss, perp = self.quantizer(x) |
x = self.model(x) |
return x, None |
def inference(self, x): |
x = self.model(x) |
return x, None |
def remove_weight_norm(self): |
"""Remove weight normalization module from all of the layers.""" |
def _remove_weight_norm(m): |
try: |
torch.nn.utils.remove_weight_norm(m) |
except ValueError: |
return |
self.apply(_remove_weight_norm) |
def apply_weight_norm(self): |
"""Apply weight normalization module from all of the layers.""" |
def _apply_weight_norm(m): |
if isinstance(m, nn.Conv1d) or isinstance(m, nn.ConvTranspose1d): |
torch.nn.utils.weight_norm(m) |
self.apply(_apply_weight_norm) |
def reset_parameters(self): |
self.apply(init_weights) |
def main(): |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
print(f"Using device: {device}") |
model = CodecDecoderVocos_transpose().to(device) |
print("Model initialized.") |
batch_size = 2 |
in_channels = 1024 |
sequence_length = 50 |
dummy_input = torch.randn(batch_size, in_channels, sequence_length).to(device) |
print(f"Dummy input shape: {dummy_input.shape}") |
model.eval() |
with torch.no_grad(): |
output_no_vq = model(dummy_input, vq=False) |
print("\nForward pass without VQ:") |
print(f"Output shape: {output_no_vq.shape}") |
c=1 |
if __name__ == "__main__": |
main() |