Dionyssos's picture
index out of bounds 2048 @ dequantize()
27d24be
import math
import typing as tp
from dataclasses import dataclass, field
import typing as tp
import torch
from torch import nn
from einops import rearrange
import torch.nn.functional as F
@dataclass
class QuantizedResult:
x: torch.Tensor
codes: torch.Tensor
bandwidth: torch.Tensor # bandwidth in kb/s used, per batch item.
penalty: tp.Optional[torch.Tensor] = None
metrics: dict = field(default_factory=dict)
class EuclideanCodebook(nn.Module):
def __init__(
self,
dim,
codebook_size,
kmeans_init=False,
kmeans_iters=10,
decay=0.8,
epsilon=1e-5,
):
super().__init__()
self.decay=decay
init_fn=uniform_init if not kmeans_init else torch.zeros
embed = init_fn(codebook_size, dim)
self.codebook_size = codebook_size
self.kmeans_iters = kmeans_iters
self.epsilon = epsilon
self.register_buffer("inited", torch.Tensor([not kmeans_init]))
self.register_buffer("cluster_size", torch.zeros(codebook_size))
self.register_buffer("embed", embed)
self.register_buffer("embed_avg", embed.clone())
@torch.jit.ignore
def init_embed_(self, data):
if self.inited:
return
embed, cluster_size = kmeans(data, self.codebook_size, self.kmeans_iters)
self.embed.data.copy_(embed)
self.embed_avg.data.copy_(embed.clone())
self.cluster_size.data.copy_(cluster_size)
self.inited.data.copy_(torch.Tensor([True]))
# Make sure all buffers across workers are in sync after initialization
# flashy.distrib.broadcast_tensors(self.buffers()) # brodcast param values to all GPUS
def postprocess_emb(self, embed_ind, shape):
return embed_ind.view(*shape[:-1])
def dequantize(self, embed_ind):
# embed_ind[0] = 2048
# print('MAX MAX MAX', embed_ind.shape)
quantize = F.embedding(embed_ind, self.embed)
# print('\n\nDE QUANT\n\n', quantize.shape) # (1, 35, 128) -> also arrives here for special_token
return quantize
def decode(self, embed_ind):
quantize = self.dequantize(embed_ind)
return quantize
class VectorQuantization(nn.Module):
def __init__(
self,
dim,
codebook_size,
codebook_dim=None,
decay=0.8,
epsilon=1e-5,
kmeans_init=False,
kmeans_iters=10,
channels_last=False,
):
super().__init__()
# _codebook_dim: int = default(codebook_dim, dim)
_codebook_dim = codebook_dim if codebook_dim is not None else dim
requires_projection = _codebook_dim != dim
self.project_in = (nn.Linear(dim, _codebook_dim) if requires_projection else nn.Identity())
self.project_out = (nn.Linear(_codebook_dim, dim) if requires_projection else nn.Identity())
self._codebook = EuclideanCodebook(dim=_codebook_dim,
codebook_size=codebook_size,
kmeans_init=kmeans_init,
kmeans_iters=kmeans_iters,
decay=decay,
epsilon=epsilon)
self.codebook_size = codebook_size
self.channels_last = channels_last
@property
def codebook(self):
return self._codebook.embed
@property
def inited(self):
return self._codebook.inited
def _postprocess(self, quantize):
if not self.channels_last:
quantize = rearrange(quantize, "b n d -> b d n")
return quantize
def decode(self, embed_ind):
quantize = self._codebook.decode(embed_ind)
quantize = self.project_out(quantize)
quantize = self._postprocess(quantize)
return quantize
class ResidualVectorQuantization(nn.Module):
"""Residual vector quantization implementation.
Follows Algorithm 1. in https://arxiv.org/pdf/2107.03312.pdf
"""
def __init__(self, *, num_quantizers, **kwargs):
super().__init__()
self.layers = nn.ModuleList(
[VectorQuantization(**kwargs) for _ in range(num_quantizers)]
)
def decode(self, q_indices: torch.Tensor) -> torch.Tensor:
quantized_out = torch.tensor(0.0, device=q_indices.device)
for i, indices in enumerate(q_indices):
layer = self.layers[i]
quantized = layer.decode(indices)
quantized_out = quantized_out + quantized
return quantized_out
# ------------------------------------- END core_vq.py
class ResidualVectorQuantizer(nn.Module):
"""Residual Vector Quantizer.
Args:
dimension (int): Dimension of the codebooks.
n_q (int): Number of residual vector quantizers used.
q_dropout (bool): Random quantizer drop out at train time.
bins (int): Codebook size.
decay (float): Decay for exponential moving average over the codebooks.
kmeans_init (bool): Whether to use kmeans to initialize the codebooks.
kmeans_iters (int): Number of iterations used for kmeans initialization.
threshold_ema_dead_code (int): Threshold for dead code expiration. Replace any codes
that have an exponential moving average cluster size less than the specified threshold with
randomly selected vector from the current batch.
orthogonal_reg_weight (float): Orthogonal regularization weights.
orthogonal_reg_active_codes_only (bool): Apply orthogonal regularization only on active codes.
orthogonal_reg_max_codes (optional int): Maximum number of codes to consider.
for orthogonal regularization.
"""
def __init__(
self,
dimension: int = 256,
n_q: int = 8,
q_dropout: bool = False,
bins: int = 1024,
decay: float = 0.99,
kmeans_init: bool = True,
kmeans_iters: int = 10,
threshold_ema_dead_code: int = 2,
orthogonal_reg_weight: float = 0.0,
orthogonal_reg_active_codes_only: bool = False,
orthogonal_reg_max_codes: tp.Optional[int] = None,
):
super().__init__()
self.max_n_q = n_q
self.n_q = n_q
self.q_dropout = q_dropout
self.dimension = dimension
self.bins = bins
self.decay = decay
self.kmeans_init = kmeans_init
self.kmeans_iters = kmeans_iters
self.threshold_ema_dead_code = threshold_ema_dead_code
self.orthogonal_reg_weight = orthogonal_reg_weight
self.orthogonal_reg_active_codes_only = orthogonal_reg_active_codes_only
self.orthogonal_reg_max_codes = orthogonal_reg_max_codes
print(f' {kmeans_init=}\n\n\n\n')
self.vq = ResidualVectorQuantization(
dim=self.dimension,
codebook_size=self.bins,
num_quantizers=self.n_q,
decay=self.decay,
kmeans_init=self.kmeans_init,
kmeans_iters=self.kmeans_iters,
channels_last=False
)
def forward(self, x: torch.Tensor, frame_rate: int):
n_q = self.n_q
if self.training and self.q_dropout:
n_q = int(torch.randint(1, self.n_q + 1, (1,)).item())
bw_per_q = math.log2(self.bins) * frame_rate / 1000
quantized, codes, commit_loss = self.vq(x, n_q=n_q)
codes = codes.transpose(0, 1)
# codes is [B, K, T], with T frames, K nb of codebooks.
bw = torch.tensor(n_q * bw_per_q).to(x)
return QuantizedResult(quantized, codes, bw, penalty=torch.mean(commit_loss))
def encode(self, x: torch.Tensor) -> torch.Tensor:
"""Encode a given input tensor with the specified frame rate at the given bandwidth.
The RVQ encode method sets the appropriate number of quantizer to use
and returns indices for each quantizer.
"""
n_q = self.n_q
codes = self.vq.encode(x, n_q=n_q)
codes = codes.transpose(0, 1)
# codes is [B, K, T], with T frames, K nb of codebooks.
return codes
def decode(self, codes: torch.Tensor) -> torch.Tensor:
"""Decode the given codes to the quantized representation."""
# codes is [B, K, T], with T frames, K nb of codebooks, vq.decode expects [K, B, T].
codes = codes.transpose(0, 1)
quantized = self.vq.decode(codes)
return quantized
@property
def total_codebooks(self):
return self.max_n_q
@property
def num_codebooks(self):
return self.n_q
def set_num_codebooks(self, n: int):
assert n > 0 and n <= self.max_n_q
self.n_q = n