Spaces:
Running
on
L40S
Running
on
L40S
""" | |
Tokenizer or wrapper around existing models. | |
Also defines the main interface that a model must follow to be usable as an audio tokenizer. | |
""" | |
from abc import ABC, abstractmethod | |
import logging | |
import typing as tp | |
import torch | |
from torch import nn | |
logger = logging.getLogger() | |
class AudioTokenizer(ABC, nn.Module): | |
"""Base API for all compression model that aim at being used as audio tokenizers | |
with a language model. | |
""" | |
def forward(self, x: torch.Tensor) : | |
... | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
"""See `EncodecModel.encode`.""" | |
... | |
def decode(self, codes: torch.Tensor, scale: tp.Optional[torch.Tensor] = None): | |
"""See `EncodecModel.decode`.""" | |
... | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
... | |
def channels(self) -> int: | |
... | |
def frame_rate(self) -> float: | |
... | |
def sample_rate(self) -> int: | |
... | |
def cardinality(self) -> int: | |
... | |
def num_codebooks(self) -> int: | |
... | |
def total_codebooks(self) -> int: | |
... | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer.""" | |
... | |
def get_pretrained( | |
name: str, | |
vae_config: str, | |
vae_model: str, | |
device: tp.Union[torch.device, str] = 'cpu', | |
mode='extract' | |
) -> 'AudioTokenizer': | |
"""Instantiate a AudioTokenizer model from a given pretrained model. | |
Args: | |
name (Path or str): name of the pretrained model. See after. | |
device (torch.device or str): Device on which the model is loaded. | |
""" | |
model: AudioTokenizer | |
if name.split('_')[0] == 'Flow1dVAESeparate': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = Flow1dVAESeparate(model_type, vae_config, vae_model) | |
elif name.split('_')[0] == 'FlowVocalAndMusicDecoderStereo': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = FlowVocalAndMusicDecoderStereo(model_type, mode=mode) | |
elif name.split('_')[0] == 'FlowVocalAndMusicDecoderStereoLayer7': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = FlowVocalAndMusicDecoderStereoLayer7(model_type, mode=mode) | |
elif name.split('_')[0] == 'FlowVocalAndMusicDecoderStereoLayer11': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = FlowVocalAndMusicDecoderStereoLayer11(model_type, mode=mode) | |
elif name.split('_')[0] == 'FlowVocalAndMusicDecoderStereoASRTuneLayer7': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = FlowVocalAndMusicDecoderStereoASRTuneLayer7(model_type, mode=mode) | |
elif name.split('_')[0] == 'FlowVocalAndMusicDecoderStereoASRTuneLayer7Code2': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = FlowVocalAndMusicDecoderStereoASRTuneLayer7Code2(model_type, mode=mode) | |
elif name.split('_')[0] == 'FlowVocalAndMusicDecoderStereoASRTuneLayer7Code1': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = FlowVocalAndMusicDecoderStereoASRTuneLayer7Code1(model_type, mode=mode) | |
elif name.split('_')[0] == 'Flow1dVAE2rvq': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = Flow1dVAE2rvq(model_type) | |
elif name.split('_')[0] == 'Flow1dVAE1rvq': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = Flow1dVAE1rvq(model_type, vae_config, vae_model) | |
elif name.split('_')[0] == 'Flow1dVAE4rvq': | |
model_type = name.split('_', 1)[1] | |
logger.info("Getting pretrained compression model from semantic model %s", model_type) | |
model = Flow1dVAE4rvq(model_type) | |
else: | |
raise NotImplementedError("{} is not implemented in models/audio_tokenizer.py".format( | |
name)) | |
return model.to(device).eval() | |
class FlowVocalAndMusicDecoderStereo(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str, | |
sample_rate=48000, | |
mode = 'extract', | |
): | |
super().__init__() | |
from codeclm.tokenizer.FlowVocalAndMusicDecoderStereoV014.generate_stereo import Tango | |
model_path = model_type | |
self.mode = mode | |
if mode == 'extract': | |
self.model = Tango(model_path=model_path, layer_num=3, load_main_model=False, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
elif mode == 'inference': | |
self.samplerate = sample_rate | |
self.model = Tango(model_path=model_path, layer_num=3, load_main_model=True, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, duration=40.96, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class FlowVocalAndMusicDecoderStereoLayer7(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "pytorch_model_2.bin", | |
sample_rate=48000, | |
mode = 'extract', | |
): | |
super().__init__() | |
from codeclm.tokenizer.FlowVocalAndMusicDecoderStereoV014.generate_stereo_layer7 import Tango | |
model_path = model_type | |
self.mode = mode | |
if mode == 'extract': | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=False, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
elif mode == 'inference': | |
self.samplerate = sample_rate | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=True, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
# print("Successfully loaded inference scheduler from {}".format(scheduler_name)) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, duration=40.96, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class FlowVocalAndMusicDecoderStereoASRTuneLayer7(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "model_layer7_1x4.safetensors", | |
sample_rate=48000, | |
mode = 'extract', | |
): | |
super().__init__() | |
from codeclm.tokenizer.FlowVocalAndMusicDecoderStereoV014.generate_stereo_7_1x4 import Tango | |
model_path = model_type | |
self.mode = mode | |
if mode == 'extract': | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=False, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
elif mode == 'inference': | |
self.samplerate = sample_rate | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=True, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
# print("Successfully loaded inference scheduler from {}".format(scheduler_name)) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, duration=40.96, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class FlowVocalAndMusicDecoderStereoASRTuneLayer7Code2(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "model_layer7_1x2.safetensors", | |
sample_rate=48000, | |
mode = 'extract', | |
): | |
super().__init__() | |
from codeclm.tokenizer.FlowVocalAndMusicDecoderStereoV014.generate_stereo_7_1x2 import Tango | |
model_path = model_type | |
self.mode = mode | |
if mode == 'extract': | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=False, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
elif mode == 'inference': | |
self.samplerate = sample_rate | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=True, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
# print("Successfully loaded inference scheduler from {}".format(scheduler_name)) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, duration=40.96, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class FlowVocalAndMusicDecoderStereoASRTuneLayer7Code1(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "model_layer7_1x1.safetensors", | |
sample_rate=48000, | |
mode = 'extract', | |
): | |
super().__init__() | |
from codeclm.tokenizer.FlowVocalAndMusicDecoderStereoV014.generate_stereo_7_1x1 import Tango | |
model_path = model_type | |
self.mode = mode | |
if mode == 'extract': | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=False, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
elif mode == 'inference': | |
self.samplerate = sample_rate | |
self.model = Tango(model_path=model_path, layer_num=7, load_main_model=True, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
# print("Successfully loaded inference scheduler from {}".format(scheduler_name)) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, duration=40.96, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class Flow1dVAE2rvq(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "model_2.safetensors", | |
): | |
super().__init__() | |
from codeclm.tokenizer.Flow1dVAE.generate_2rvq import Tango | |
model_path = model_type | |
self.model = Tango(model_path=model_path, rvq_num=2, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class Flow1dVAE1rvq(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "model_2_fixed.safetensors", | |
vae_config: str = "", | |
vae_model: str = "", | |
): | |
super().__init__() | |
from codeclm.tokenizer.Flow1dVAE.generate_1rvq import Tango | |
model_path = model_type | |
self.model = Tango(model_path=model_path, vae_config=vae_config, vae_model=vae_model, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class Flow1dVAE4rvq(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "model_2.safetensors", | |
): | |
super().__init__() | |
from codeclm.tokenizer.Flow1dVAE.generate_4rvq import Tango | |
model_path = model_type | |
self.model = Tango(model_path=model_path, rvq_num=4, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class Flow1dVAESeparate(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "model_2.safetensors", | |
vae_config: str = "", | |
vae_model: str = "", | |
): | |
super().__init__() | |
from codeclm.tokenizer.Flow1dVAE.generate_septoken import Tango | |
model_path = model_type | |
self.model = Tango(model_path=model_path, vae_config=vae_config, vae_model=vae_model, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x_vocal: torch.Tensor, x_bgm: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x_vocal.ndim == 2: | |
x_vocal = x_vocal.unsqueeze(1) | |
if x_bgm.ndim == 2: | |
x_bgm = x_bgm.unsqueeze(1) | |
codes_vocal, codes_bgm = self.model.sound2code(x_vocal, x_bgm) | |
return codes_vocal, codes_bgm | |
def decode(self, codes: torch.Tensor, prompt_vocal = None, prompt_bgm = None): | |
wav = self.model.code2sound(codes, prompt_vocal=prompt_vocal, prompt_bgm=prompt_bgm, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |
class FlowVocalAndMusicDecoderStereoLayer11(AudioTokenizer): | |
def __init__( | |
self, | |
model_type: str = "layer11_ckpt.pth", | |
sample_rate=48000, | |
mode = 'extract', | |
): | |
super().__init__() | |
from codeclm.tokenizer.FlowVocalAndMusicDecoderStereoV014.generate_stereo_11 import Tango | |
model_path = model_type | |
self.mode = mode | |
if mode == 'extract': | |
self.model = Tango(model_path=model_path, layer_num=11, load_main_model=False, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
elif mode == 'inference': | |
self.samplerate = sample_rate | |
self.model = Tango(model_path=model_path, layer_num=11, load_main_model=True, device='cuda') | |
print ("Successfully loaded checkpoint from:", model_path) | |
# print("Successfully loaded inference scheduler from {}".format(scheduler_name)) | |
self.n_quantizers = 1 | |
def forward(self, x: torch.Tensor) : | |
# We don't support training with this. | |
raise NotImplementedError("Forward and training with DAC not supported.") | |
def encode(self, x: torch.Tensor) -> tp.Tuple[torch.Tensor, tp.Optional[torch.Tensor]]: | |
if x.ndim == 2: | |
x = x.unsqueeze(1) | |
codes = self.model.sound2code(x) # [B T] -> [B N T] | |
return codes, None | |
def decode(self, codes: torch.Tensor, prompt = None, scale: tp.Optional[torch.Tensor] = None, ncodes=9): | |
wav = self.model.code2sound(codes, prompt=prompt, duration=40.96, guidance_scale=1.5, | |
num_steps=50, disable_progress=False) # [B,N,T] -> [B,T] | |
return wav[None] | |
def decode_latent(self, codes: torch.Tensor): | |
"""Decode from the discrete codes to continuous latent space.""" | |
# import pdb; pdb.set_trace() | |
return self.model.quantizer.from_codes(codes.transpose(1,2))[0] | |
def channels(self) -> int: | |
return 2 | |
def frame_rate(self) -> float: | |
return 25 | |
def sample_rate(self) -> int: | |
return self.samplerate | |
def cardinality(self) -> int: | |
return 10000 | |
def num_codebooks(self) -> int: | |
return self.n_quantizers | |
def total_codebooks(self) -> int: | |
# return self.model.RVQ | |
return 1 | |
def set_num_codebooks(self, n: int): | |
"""Set the active number of codebooks used by the quantizer. | |
""" | |
assert n >= 1 | |
assert n <= self.total_codebooks | |
self.n_quantizers = n | |