Spaces:
Runtime error
Runtime error
| from pathlib import Path | |
| import random | |
| from typing import List | |
| import tempfile | |
| import subprocess | |
| import argbind | |
| from tqdm import tqdm | |
| import torch | |
| from vampnet.interface import Interface | |
| from vampnet import mask as pmask | |
| import audiotools as at | |
| Interface: Interface = argbind.bind(Interface) | |
| def calculate_bitrate( | |
| interface, num_codebooks, | |
| downsample_factor | |
| ): | |
| bit_width = 10 | |
| sr = interface.codec.sample_rate | |
| hop = interface.codec.hop_size | |
| rate = (sr / hop) * ((bit_width * num_codebooks) / downsample_factor) | |
| return rate | |
| def baseline(sig, interface): | |
| return interface.preprocess(sig) | |
| def reconstructed(sig, interface): | |
| return interface.to_signal( | |
| interface.encode(sig) | |
| ) | |
| def coarse2fine(sig, interface): | |
| z = interface.encode(sig) | |
| z = z[:, :interface.c2f.n_conditioning_codebooks, :] | |
| z = interface.coarse_to_fine(z) | |
| return interface.to_signal(z) | |
| class CoarseCond: | |
| def __init__(self, num_conditioning_codebooks, downsample_factor): | |
| self.num_conditioning_codebooks = num_conditioning_codebooks | |
| self.downsample_factor = downsample_factor | |
| def __call__(self, sig, interface): | |
| z = interface.encode(sig) | |
| mask = pmask.full_mask(z) | |
| mask = pmask.codebook_unmask(mask, self.num_conditioning_codebooks) | |
| mask = pmask.periodic_mask(mask, self.downsample_factor) | |
| zv = interface.coarse_vamp(z, mask) | |
| zv = interface.coarse_to_fine(zv) | |
| return interface.to_signal(zv) | |
| def opus(sig, interface, bitrate=128): | |
| sig = interface.preprocess(sig) | |
| with tempfile.NamedTemporaryFile(suffix=".wav") as f: | |
| sig.write(f.name) | |
| opus_name = Path(f.name).with_suffix(".opus") | |
| # convert to opus | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", f.name, | |
| "-c:a", "libopus", | |
| "-b:a", f"{bitrate}", | |
| opus_name | |
| ] | |
| subprocess.run(cmd, check=True) | |
| # convert back to wav | |
| output_name = Path(f"{f.name}-opus").with_suffix(".wav") | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", opus_name, | |
| output_name | |
| ] | |
| subprocess.run(cmd, check=True) | |
| sig = at.AudioSignal( | |
| output_name, | |
| sample_rate=sig.sample_rate | |
| ) | |
| return sig | |
| def mask_ratio_1_step(ratio=1.0): | |
| def wrapper(sig, interface): | |
| z = interface.encode(sig) | |
| mask = pmask.linear_random(z, ratio) | |
| zv = interface.coarse_vamp( | |
| z, | |
| mask, | |
| sampling_steps=1, | |
| ) | |
| return interface.to_signal(zv) | |
| return wrapper | |
| def num_sampling_steps(num_steps=1): | |
| def wrapper(sig, interface: Interface): | |
| z = interface.encode(sig) | |
| mask = pmask.periodic_mask(z, 16) | |
| zv = interface.coarse_vamp( | |
| z, | |
| mask, | |
| sampling_steps=num_steps, | |
| ) | |
| zv = interface.coarse_to_fine(zv) | |
| return interface.to_signal(zv) | |
| return wrapper | |
| def beat_mask(ctx_time): | |
| def wrapper(sig, interface): | |
| beat_mask = interface.make_beat_mask( | |
| sig, | |
| before_beat_s=ctx_time/2, | |
| after_beat_s=ctx_time/2, | |
| invert=True | |
| ) | |
| z = interface.encode(sig) | |
| zv = interface.coarse_vamp( | |
| z, beat_mask | |
| ) | |
| zv = interface.coarse_to_fine(zv) | |
| return interface.to_signal(zv) | |
| return wrapper | |
| def inpaint(ctx_time): | |
| def wrapper(sig, interface: Interface): | |
| z = interface.encode(sig) | |
| mask = pmask.inpaint(z, interface.s2t(ctx_time), interface.s2t(ctx_time)) | |
| zv = interface.coarse_vamp(z, mask) | |
| zv = interface.coarse_to_fine(zv) | |
| return interface.to_signal(zv) | |
| return wrapper | |
| def token_noise(noise_amt): | |
| def wrapper(sig, interface: Interface): | |
| z = interface.encode(sig) | |
| mask = pmask.random(z, noise_amt) | |
| z = torch.where( | |
| mask, | |
| torch.randint_like(z, 0, interface.coarse.vocab_size), | |
| z | |
| ) | |
| return interface.to_signal(z) | |
| return wrapper | |
| EXP_REGISTRY = {} | |
| EXP_REGISTRY["gen-compression"] = { | |
| "baseline": baseline, | |
| "reconstructed": reconstructed, | |
| "coarse2fine": coarse2fine, | |
| **{ | |
| f"{n}_codebooks_downsampled_{x}x": CoarseCond(num_conditioning_codebooks=n, downsample_factor=x) | |
| for (n, x) in ( | |
| (1, 1), # 1 codebook, no downsampling | |
| (4, 4), # 4 codebooks, downsampled 4x | |
| (4, 16), # 4 codebooks, downsampled 16x | |
| (4, 32), # 4 codebooks, downsampled 16x | |
| ) | |
| }, | |
| **{ | |
| f"token_noise_{x}": mask_ratio_1_step(ratio=x) | |
| for x in [0.25, 0.5, 0.75] | |
| }, | |
| } | |
| EXP_REGISTRY["sampling-steps"] = { | |
| # "codec": reconstructed, | |
| **{f"steps_{n}": num_sampling_steps(n) for n in [1, 4, 12, 36, 64, 72]}, | |
| } | |
| EXP_REGISTRY["musical-sampling"] = { | |
| **{f"beat_mask_{t}": beat_mask(t) for t in [0.075]}, | |
| **{f"inpaint_{t}": inpaint(t) for t in [0.5, 1.0,]}, # multiply these by 2 (they go left and right) | |
| } | |
| def main( | |
| sources=[ | |
| "/media/CHONK/hugo/spotdl/val", | |
| ], | |
| output_dir: str = "./samples", | |
| max_excerpts: int = 2000, | |
| exp_type: str = "gen-compression", | |
| seed: int = 0, | |
| ext: str = [".mp3"], | |
| ): | |
| at.util.seed(seed) | |
| interface = Interface() | |
| output_dir = Path(output_dir) | |
| output_dir.mkdir(exist_ok=True, parents=True) | |
| from audiotools.data.datasets import AudioLoader, AudioDataset | |
| loader = AudioLoader(sources=sources, shuffle_state=seed, ext=ext) | |
| dataset = AudioDataset(loader, | |
| sample_rate=interface.codec.sample_rate, | |
| duration=interface.coarse.chunk_size_s, | |
| n_examples=max_excerpts, | |
| without_replacement=True, | |
| ) | |
| if exp_type in EXP_REGISTRY: | |
| SAMPLE_CONDS = EXP_REGISTRY[exp_type] | |
| else: | |
| raise ValueError(f"Unknown exp_type {exp_type}") | |
| indices = list(range(max_excerpts)) | |
| random.shuffle(indices) | |
| for i in tqdm(indices): | |
| # if all our files are already there, skip | |
| done = [] | |
| for name in SAMPLE_CONDS: | |
| o_dir = Path(output_dir) / name | |
| done.append((o_dir / f"{i}.wav").exists()) | |
| if all(done): | |
| continue | |
| sig = dataset[i]["signal"] | |
| results = { | |
| name: cond(sig, interface).cpu() | |
| for name, cond in SAMPLE_CONDS.items() | |
| } | |
| for name, sig in results.items(): | |
| o_dir = Path(output_dir) / name | |
| o_dir.mkdir(exist_ok=True, parents=True) | |
| sig.write(o_dir / f"{i}.wav") | |
| if __name__ == "__main__": | |
| args = argbind.parse_args() | |
| with argbind.scope(args): | |
| main() | |