neucodec-decoder-ft-de / NeucodecDecoder.py
LenDigLearn's picture
Upload NeucodecDecoder.py
c5b3420 verified
import torch
import torch.nn as nn
from einops import rearrange
from torchtune.modules import RotaryPositionalEmbeddings
from vector_quantize_pytorch import ResidualFSQ
from huggingface_hub import PyTorchModelHubMixin, hf_hub_download
# the following implementations were taken from the NeuCodec repository and slightly changed
# sources https://github.com/neuphonic/neucodec/blob/main/neucodec/model.py, https://github.com/neuphonic/neucodec/blob/main/neucodec/codec_decoder_vocos.py and https://github.com/neuphonic/neucodec/blob/main/neucodec/bs_roformer5.py
class RMSNorm(torch.nn.Module):
def __init__(self, dim: int, eps: float = 1e-6):
r"""https://github.com/meta-llama/llama/blob/main/llama/model.py"""
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def forward(self, x):
norm_x = torch.mean(x**2, dim=-1, keepdim=True)
output = x * torch.rsqrt(norm_x + self.eps) * self.weight
return output
class MLP(nn.Module):
def __init__(self, dim: int) -> None:
super().__init__()
self.fc1 = nn.Linear(dim, 4 * dim, bias=False)
self.silu = nn.SiLU()
self.fc2 = nn.Linear(4 * dim, dim, bias=False)
def forward(self, x):
x = self.fc1(x)
x = self.silu(x)
x = self.fc2(x)
return x
class Attention(nn.Module):
def __init__(
self, dim: int, n_heads: int, rotary_embed: RotaryPositionalEmbeddings
):
super().__init__()
assert dim % n_heads == 0
self.n_heads = n_heads
self.dim = dim
self.rotary_embed = rotary_embed
self.flash = hasattr(torch.nn.functional, "scaled_dot_product_attention")
assert self.flash, "Must have flash attention."
self.c_attn = nn.Linear(dim, 3 * dim, bias=False)
self.c_proj = nn.Linear(dim, dim, bias=False)
def forward(self, x):
r"""
Args:
x: (b, t, h*d)
Constants:
b: batch_size
t: time steps
r: 3
h: heads_num
d: heads_dim
"""
B, T, C = x.size()
q, k, v = rearrange(
self.c_attn(x), "b t (r h d) -> r b h t d", r=3, h=self.n_heads
)
# q, k, v: (b, h, t, d)
q = self.rotary_embed(q)
k = self.rotary_embed(k)
if self.flash:
y = torch.nn.functional.scaled_dot_product_attention(
q, k, v, attn_mask=None, dropout_p=0, is_causal=False
)
y = rearrange(y, "b h t d -> b t (h d)")
y = self.c_proj(y)
# shape: (b, t, h*d)
return y
class TransformerBlock(nn.Module):
def __init__(
self, dim: int, n_heads: int, rotary_embed: RotaryPositionalEmbeddings
):
super().__init__()
self.dim = dim
self.n_heads = n_heads
self.att_norm = RMSNorm(dim)
self.ffn_norm = RMSNorm(dim)
self.att = Attention(dim=dim, n_heads=n_heads, rotary_embed=rotary_embed)
self.mlp = MLP(dim=dim)
def forward(
self,
x: torch.Tensor,
):
x = x + self.att(self.att_norm(x))
x = x + self.mlp(self.ffn_norm(x))
return x
class ISTFT(nn.Module):
"""
Custom implementation of ISTFT since torch.istft doesn't allow custom padding (other than `center=True`) with
windowing. This is because the NOLA (Nonzero Overlap Add) check fails at the edges.
See issue: https://github.com/pytorch/pytorch/issues/62323
Specifically, in the context of neural vocoding we are interested in "same" padding analogous to CNNs.
The NOLA constraint is met as we trim padded samples anyway.
Args:
n_fft (int): Size of Fourier transform.
hop_length (int): The distance between neighboring sliding window frames.
win_length (int): The size of window frame and STFT filter.
padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same".
"""
def __init__(
self, n_fft: int, hop_length: int, win_length: int, padding: str = "same"
):
super().__init__()
if padding not in ["center", "same"]:
raise ValueError("Padding must be 'center' or 'same'.")
self.padding = padding
self.n_fft = n_fft
self.hop_length = hop_length
self.win_length = win_length
window = torch.hann_window(win_length)
self.register_buffer("window", window, persistent=False) # changed persistent to False for safetensors compatibility
def forward(self, spec: torch.Tensor) -> torch.Tensor:
"""
Compute the Inverse Short Time Fourier Transform (ISTFT) of a complex spectrogram.
Args:
spec (Tensor): Input complex spectrogram of shape (B, N, T), where B is the batch size,
N is the number of frequency bins, and T is the number of time frames.
Returns:
Tensor: Reconstructed time-domain signal of shape (B, L), where L is the length of the output signal.
"""
if self.padding == "center":
# Fallback to pytorch native implementation
return torch.istft(
spec,
self.n_fft,
self.hop_length,
self.win_length,
self.window,
center=True,
)
elif self.padding == "same":
pad = (self.win_length - self.hop_length) // 2
else:
raise ValueError("Padding must be 'center' or 'same'.")
assert spec.dim() == 3, "Expected a 3D tensor as input"
B, N, T = spec.shape
# Inverse FFT
ifft = torch.fft.irfft(spec, self.n_fft, dim=1, norm="backward")
ifft = ifft * self.window[None, :, None]
# Overlap and Add
output_size = (T - 1) * self.hop_length + self.win_length
y = torch.nn.functional.fold(
ifft,
output_size=(1, output_size),
kernel_size=(1, self.win_length),
stride=(1, self.hop_length),
)[:, 0, 0, pad:-pad]
# Window envelope
window_sq = self.window.square().expand(1, T, -1).transpose(1, 2)
window_envelope = torch.nn.functional.fold(
window_sq,
output_size=(1, output_size),
kernel_size=(1, self.win_length),
stride=(1, self.hop_length),
).squeeze()[pad:-pad]
# Normalize
assert (window_envelope > 1e-11).all()
y = y / window_envelope
return y
class FourierHead(nn.Module):
"""Base class for inverse fourier modules."""
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Args:
x (Tensor): Input tensor of shape (B, L, H), where B is the batch size,
L is the sequence length, and H denotes the model dimension.
Returns:
Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal.
"""
raise NotImplementedError("Subclasses must implement the forward method.")
class ISTFTHead(FourierHead):
"""
ISTFT Head module for predicting STFT complex coefficients.
Args:
dim (int): Hidden dimension of the model.
n_fft (int): Size of Fourier transform.
hop_length (int): The distance between neighboring sliding window frames, which should align with
the resolution of the input features.
padding (str, optional): Type of padding. Options are "center" or "same". Defaults to "same".
"""
def __init__(self, dim: int, n_fft: int, hop_length: int, padding: str = "same"):
super().__init__()
out_dim = n_fft + 2
self.out = torch.nn.Linear(dim, out_dim)
self.istft = ISTFT(
n_fft=n_fft, hop_length=hop_length, win_length=n_fft, padding=padding
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of the ISTFTHead module.
Args:
x (Tensor): Input tensor of shape (B, L, H), where B is the batch size,
L is the sequence length, and H denotes the model dimension.
Returns:
Tensor: Reconstructed time-domain audio signal of shape (B, T), where T is the length of the output signal.
"""
x_pred = self.out(x)
# x_pred = x
x_pred = x_pred.transpose(1, 2)
mag, p = x_pred.chunk(2, dim=1)
mag = torch.exp(mag)
mag = torch.clip(
mag, max=1e2
) # safeguard to prevent excessively large magnitudes
# wrapping happens here. These two lines produce real and imaginary value
x = torch.cos(p)
y = torch.sin(p)
# recalculating phase here does not produce anything new
# only costs time
# phase = torch.atan2(y, x)
# S = mag * torch.exp(phase * 1j)
# better directly produce the complex value
S = mag * (x + 1j * y)
audio = self.istft(S)
return audio.unsqueeze(1), x_pred
def nonlinearity(x):
# swish
return x * torch.sigmoid(x)
def Normalize(in_channels, num_groups=32):
return torch.nn.GroupNorm(
num_groups=num_groups, num_channels=in_channels, eps=1e-6, affine=True
)
class ResnetBlock(nn.Module):
def __init__(
self,
*,
in_channels,
out_channels=None,
conv_shortcut=False,
dropout,
temb_channels=512,
):
super().__init__()
self.in_channels = in_channels
out_channels = in_channels if out_channels is None else out_channels
self.out_channels = out_channels
self.use_conv_shortcut = conv_shortcut
self.norm1 = Normalize(in_channels)
self.conv1 = torch.nn.Conv1d(
in_channels, out_channels, kernel_size=3, stride=1, padding=1
)
if temb_channels > 0:
self.temb_proj = torch.nn.Linear(temb_channels, out_channels)
self.norm2 = Normalize(out_channels)
self.dropout = torch.nn.Dropout(dropout)
self.conv2 = torch.nn.Conv1d(
out_channels, out_channels, kernel_size=3, stride=1, padding=1
)
if self.in_channels != self.out_channels:
if self.use_conv_shortcut:
self.conv_shortcut = torch.nn.Conv1d(
in_channels, out_channels, kernel_size=3, stride=1, padding=1
)
else:
self.nin_shortcut = torch.nn.Conv1d(
in_channels, out_channels, kernel_size=1, stride=1, padding=0
)
def forward(self, x, temb=None):
h = x
h = self.norm1(h)
h = nonlinearity(h)
h = self.conv1(h)
if temb is not None:
h = h + self.temb_proj(nonlinearity(temb))[:, :, None, None]
h = self.norm2(h)
h = nonlinearity(h)
h = self.dropout(h)
h = self.conv2(h)
if self.in_channels != self.out_channels:
if self.use_conv_shortcut:
x = self.conv_shortcut(x)
else:
x = self.nin_shortcut(x)
return x + h
class Backbone(nn.Module):
"""Base class for the generator's backbone. It preserves the same temporal resolution across all layers."""
def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor:
"""
Args:
x (Tensor): Input tensor of shape (B, C, L), where B is the batch size,
C denotes output features, and L is the sequence length.
Returns:
Tensor: Output of shape (B, L, H), where B is the batch size, L is the sequence length,
and H denotes the model dimension.
"""
raise NotImplementedError("Subclasses must implement the forward method.")
class VocosBackbone(Backbone):
"""
Vocos backbone module built with ConvNeXt blocks. Supports additional conditioning with Adaptive Layer Normalization
Args:
input_channels (int): Number of input features channels.
dim (int): Hidden dimension of the model.
intermediate_dim (int): Intermediate dimension used in ConvNeXtBlock.
num_layers (int): Number of ConvNeXtBlock layers.
layer_scale_init_value (float, optional): Initial value for layer scaling. Defaults to `1 / num_layers`.
adanorm_num_embeddings (int, optional): Number of embeddings for AdaLayerNorm.
None means non-conditional model. Defaults to None.
"""
def __init__(self, hidden_dim=1024, depth=12, heads=16, pos_meb_dim=64):
super().__init__()
self.embed = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=7, padding=3)
self.temb_ch = 0
block_in = hidden_dim
dropout = 0.1
prior_net: List[nn.Module] = [
ResnetBlock(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
dropout=dropout,
),
ResnetBlock(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
dropout=dropout,
),
]
self.prior_net = nn.Sequential(*prior_net)
depth = depth
time_rotary_embed = RotaryPositionalEmbeddings(dim=pos_meb_dim)
transformer_blocks = [
TransformerBlock(
dim=hidden_dim, n_heads=heads, rotary_embed=time_rotary_embed
)
for _ in range(depth)
]
self.transformers = nn.Sequential(*transformer_blocks)
self.final_layer_norm = nn.LayerNorm(hidden_dim, eps=1e-6)
post_net: List[nn.Module] = [
ResnetBlock(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
dropout=dropout,
),
ResnetBlock(
in_channels=block_in,
out_channels=block_in,
temb_channels=self.temb_ch,
dropout=dropout,
),
]
self.post_net = nn.Sequential(*post_net)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x.transpose(1, 2)
x = self.embed(x)
x = self.prior_net(x)
x = x.transpose(1, 2)
x = self.transformers(x)
x = x.transpose(1, 2)
x = self.post_net(x)
x = x.transpose(1, 2)
x = self.final_layer_norm(x)
return x
def init_weights(m):
if isinstance(m, nn.Conv1d):
nn.init.trunc_normal_(m.weight, std=0.02)
nn.init.constant_(m.bias, 0)
class CodecDecoderVocos(nn.Module):
def __init__(
self,
hidden_dim=1024,
depth=12,
heads=16,
pos_meb_dim=64,
hop_length=320,
vq_num_quantizers=1,
vq_dim=2048, # 1024 2048
vq_commit_weight=0.25,
vq_weight_init=False,
vq_full_commit_loss=False,
codebook_size=16384,
codebook_dim=16,
):
super().__init__()
self.hop_length = hop_length
self.quantizer = ResidualFSQ(
dim=vq_dim, levels=[4, 4, 4, 4, 4, 4, 4, 4], num_quantizers=1
)
self.backbone = VocosBackbone(
hidden_dim=hidden_dim, depth=depth, heads=heads, pos_meb_dim=pos_meb_dim
)
self.head = ISTFTHead(
dim=hidden_dim,
n_fft=self.hop_length * 4,
hop_length=self.hop_length,
padding="same",
)
self.reset_parameters()
def forward(self, x, vq=True):
if vq is True:
# x, q, commit_loss = self.quantizer(x)
x = x.permute(0, 2, 1)
x, q = self.quantizer(x)
x = x.permute(0, 2, 1)
q = q.permute(0, 2, 1)
return x, q, None
x = self.backbone(x)
x, _ = self.head(x)
return x, _
def vq2emb(self, vq):
self.quantizer = self.quantizer.eval()
x = self.quantizer.vq2emb(vq)
return x
def get_emb(self):
self.quantizer = self.quantizer.eval()
embs = self.quantizer.get_emb()
return embs
def inference_vq(self, vq):
x = vq[None, :, :]
x = self.model(x)
return x
def inference_0(self, x):
x, q, loss, perp = self.quantizer(x)
x = self.model(x)
return x, None
def inference(self, x):
x = self.model(x)
return x, None
def remove_weight_norm(self):
"""Remove weight normalization module from all of the layers."""
def _remove_weight_norm(m):
try:
torch.nn.utils.remove_weight_norm(m)
except ValueError: # this module didn't have weight norm
return
self.apply(_remove_weight_norm)
def apply_weight_norm(self):
"""Apply weight normalization module from all of the layers."""
def _apply_weight_norm(m):
if isinstance(m, nn.Conv1d) or isinstance(m, nn.ConvTranspose1d):
torch.nn.utils.weight_norm(m)
self.apply(_apply_weight_norm)
def reset_parameters(self):
self.apply(init_weights)
class NeuCodecDecoder(
nn.Module,
PyTorchModelHubMixin
):
def __init__(self, sample_rate: int, hop_length: int):
super().__init__()
self.sample_rate = sample_rate
self.hop_length = hop_length
self.generator = CodecDecoderVocos(hop_length=hop_length)
self.fc_post_a = nn.Linear(2048, 1024)
@property
def device(self):
return next(self.parameters()).device
def decode_code(self, fsq_codes: torch.Tensor) -> torch.Tensor:
"""
Args:
fsq_codes: torch.Tensor [B, 1, F], 50hz FSQ codes
Returns:
recon: torch.Tensor [B, 1, T], reconstructed 24kHz audio
"""
fsq_post_emb = self.generator.quantizer.get_output_from_indices(fsq_codes.transpose(1, 2))
fsq_post_emb = fsq_post_emb.transpose(1, 2)
fsq_post_emb = self.fc_post_a(fsq_post_emb.transpose(1, 2)).transpose(1, 2)
recon = self.generator(fsq_post_emb.transpose(1, 2), vq=False)[0]
return recon