# Partially from https://github.com/Mael-zys/T2M-GPT from typing import List, Optional, Union import torch import torch.nn as nn from torch import Tensor, nn from torch.distributions.distribution import Distribution from .tools.resnet import Resnet1D from .tools.quantize_cnn import QuantizeEMAReset, Quantizer, QuantizeEMA, QuantizeReset from collections import OrderedDict class VQVae(nn.Module): def __init__(self, nfeats: int, quantizer: str = "ema_reset", code_num=512, code_dim=512, output_emb_width=512, down_t=3, stride_t=2, width=512, depth=3, dilation_growth_rate=3, norm=None, activation: str = "relu", **kwargs) -> None: super().__init__() self.code_dim = code_dim self.encoder = Encoder(nfeats, output_emb_width, down_t, stride_t, width, depth, dilation_growth_rate, activation=activation, norm=norm) self.decoder = Decoder(nfeats, output_emb_width, down_t, stride_t, width, depth, dilation_growth_rate, activation=activation, norm=norm) if quantizer == "ema_reset": self.quantizer = QuantizeEMAReset(code_num, code_dim, mu=0.99) elif quantizer == "orig": self.quantizer = Quantizer(code_num, code_dim, beta=1.0) elif quantizer == "ema": self.quantizer = QuantizeEMA(code_num, code_dim, mu=0.99) elif quantizer == "reset": self.quantizer = QuantizeReset(code_num, code_dim) def preprocess(self, x): # (bs, T, Jx3) -> (bs, Jx3, T) x = x.permute(0, 2, 1) return x def postprocess(self, x): # (bs, Jx3, T) -> (bs, T, Jx3) x = x.permute(0, 2, 1) return x def forward(self, features: Tensor): # Preprocess x_in = self.preprocess(features) # Encode x_encoder = self.encoder(x_in) # quantization x_quantized, loss, perplexity = self.quantizer(x_encoder) # decoder x_decoder = self.decoder(x_quantized) x_out = self.postprocess(x_decoder) return x_out, loss, perplexity def encode( self, features: Tensor, ) -> Union[Tensor, Distribution]: N, T, _ = features.shape x_in = self.preprocess(features) x_encoder = self.encoder(x_in) x_encoder = self.postprocess(x_encoder) x_encoder = x_encoder.contiguous().view(-1, x_encoder.shape[-1]) # (NT, C) code_idx = self.quantizer.quantize(x_encoder) code_idx = code_idx.view(N, -1) # latent, dist return code_idx, None def decode(self, z: Tensor): x_d = self.quantizer.dequantize(z) x_d = x_d.view(1, -1, self.code_dim).permute(0, 2, 1).contiguous() # decoder x_decoder = self.decoder(x_d) x_out = self.postprocess(x_decoder) return x_out class Encoder(nn.Module): def __init__(self, input_emb_width=3, output_emb_width=512, down_t=3, stride_t=2, width=512, depth=3, dilation_growth_rate=3, activation='relu', norm=None): super().__init__() blocks = [] filter_t, pad_t = stride_t * 2, stride_t // 2 blocks.append(nn.Conv1d(input_emb_width, width, 3, 1, 1)) blocks.append(nn.ReLU()) for i in range(down_t): input_dim = width block = nn.Sequential( nn.Conv1d(input_dim, width, filter_t, stride_t, pad_t), Resnet1D(width, depth, dilation_growth_rate, activation=activation, norm=norm), ) blocks.append(block) blocks.append(nn.Conv1d(width, output_emb_width, 3, 1, 1)) self.model = nn.Sequential(*blocks) def forward(self, x): return self.model(x) class Decoder(nn.Module): def __init__(self, input_emb_width=3, output_emb_width=512, down_t=3, stride_t=2, width=512, depth=3, dilation_growth_rate=3, activation='relu', norm=None): super().__init__() blocks = [] filter_t, pad_t = stride_t * 2, stride_t // 2 blocks.append(nn.Conv1d(output_emb_width, width, 3, 1, 1)) blocks.append(nn.ReLU()) for i in range(down_t): out_dim = width block = nn.Sequential( Resnet1D(width, depth, dilation_growth_rate, reverse_dilation=True, activation=activation, norm=norm), nn.Upsample(scale_factor=2, mode='nearest'), nn.Conv1d(width, out_dim, 3, 1, 1)) blocks.append(block) blocks.append(nn.Conv1d(width, width, 3, 1, 1)) blocks.append(nn.ReLU()) blocks.append(nn.Conv1d(width, input_emb_width, 3, 1, 1)) self.model = nn.Sequential(*blocks) def forward(self, x): return self.model(x)