Spaces:
Sleeping
Sleeping
File size: 8,838 Bytes
41b9d24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
"""
TODO: train a linear probe
usage:
python gtzan_embeddings.py --args.load conf/interface.yml --Interface.device cuda --path_to_gtzan /path/to/gtzan/genres_original --output_dir /path/to/output
"""
from pathlib import Path
from typing import List
import audiotools as at
from audiotools import AudioSignal
import argbind
import torch
import numpy as np
import zipfile
import json
from vampnet.interface import Interface
import tqdm
# bind the Interface to argbind
Interface = argbind.bind(Interface)
DEBUG = False
def smart_plotly_export(fig, save_path):
img_format = save_path.split('.')[-1]
if img_format == 'html':
fig.write_html(save_path)
elif img_format == 'bytes':
return fig.to_image(format='png')
#TODO: come back and make this prettier
elif img_format == 'numpy':
import io
from PIL import Image
def plotly_fig2array(fig):
#convert Plotly fig to an array
fig_bytes = fig.to_image(format="png", width=1200, height=700)
buf = io.BytesIO(fig_bytes)
img = Image.open(buf)
return np.asarray(img)
return plotly_fig2array(fig)
elif img_format == 'jpeg' or 'png' or 'webp':
fig.write_image(save_path)
else:
raise ValueError("invalid image format")
def dim_reduce(emb, labels, save_path, n_components=3, method='tsne', title=''):
"""
dimensionality reduction for visualization!
saves an html plotly figure to save_path
parameters:
emb (np.ndarray): the samples to be reduces with shape (samples, features)
labels (list): list of labels for embedding
save_path (str): path where u wanna save ur figure
method (str): umap, tsne, or pca
title (str): title for ur figure
returns:
proj (np.ndarray): projection vector with shape (samples, dimensions)
"""
import pandas as pd
import plotly.express as px
if method == 'umap':
from umap import UMAP
reducer = umap.UMAP(n_components=n_components)
elif method == 'tsne':
from sklearn.manifold import TSNE
reducer = TSNE(n_components=n_components)
elif method == 'pca':
from sklearn.decomposition import PCA
reducer = PCA(n_components=n_components)
else:
raise ValueError
proj = reducer.fit_transform(emb)
if n_components == 2:
df = pd.DataFrame(dict(
x=proj[:, 0],
y=proj[:, 1],
instrument=labels
))
fig = px.scatter(df, x='x', y='y', color='instrument',
title=title+f"_{method}")
elif n_components == 3:
df = pd.DataFrame(dict(
x=proj[:, 0],
y=proj[:, 1],
z=proj[:, 2],
instrument=labels
))
fig = px.scatter_3d(df, x='x', y='y', z='z',
color='instrument',
title=title)
else:
raise ValueError("cant plot more than 3 components")
fig.update_traces(marker=dict(size=6,
line=dict(width=1,
color='DarkSlateGrey')),
selector=dict(mode='markers'))
return smart_plotly_export(fig, save_path)
# per JukeMIR, we want the emebddings from the middle layer?
def vampnet_embed(sig: AudioSignal, interface: Interface, layer=10):
with torch.inference_mode():
# preprocess the signal
sig = interface.preprocess(sig)
# get the coarse vampnet model
vampnet = interface.coarse
# get the tokens
z = interface.encode(sig)[:, :vampnet.n_codebooks, :]
z_latents = vampnet.embedding.from_codes(z, interface.codec)
# do a forward pass through the model, get the embeddings
_z, embeddings = vampnet(z_latents, return_activations=True)
# print(f"got embeddings with shape {embeddings.shape}")
# [layer, batch, time, n_dims]
# [20, 1, 600ish, 768]
# squeeze batch dim (1 bc layer should be dim 0)
assert embeddings.shape[1] == 1, f"expected batch dim to be 1, got {embeddings.shape[0]}"
embeddings = embeddings.squeeze(1)
num_layers = embeddings.shape[0]
assert layer < num_layers, f"layer {layer} is out of bounds for model with {num_layers} layers"
# do meanpooling over the time dimension
embeddings = embeddings.mean(dim=-2)
# [20, 768]
# return the embeddings
return embeddings
from dataclasses import dataclass, fields
@dataclass
class Embedding:
genre: str
filename: str
embedding: np.ndarray
def save(self, path):
"""Save the Embedding object to a given path as a zip file."""
with zipfile.ZipFile(path, 'w') as archive:
# Save numpy array
with archive.open('embedding.npy', 'w') as f:
np.save(f, self.embedding)
# Save non-numpy data as json
non_numpy_data = {f.name: getattr(self, f.name) for f in fields(self) if f.name != 'embedding'}
with archive.open('data.json', 'w') as f:
f.write(json.dumps(non_numpy_data).encode('utf-8'))
@classmethod
def load(cls, path):
"""Load the Embedding object from a given zip path."""
with zipfile.ZipFile(path, 'r') as archive:
# Load numpy array
with archive.open('embedding.npy') as f:
embedding = np.load(f)
# Load non-numpy data from json
with archive.open('data.json') as f:
data = json.loads(f.read().decode('utf-8'))
return cls(embedding=embedding, **data)
@argbind.bind(without_prefix=True)
def main(
path_to_gtzan: str = None,
cache_dir: str = "./.gtzan_emb_cache",
output_dir: str = "./gtzan_vampnet_embeddings",
layers: List[int] = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
):
path_to_gtzan = Path(path_to_gtzan)
assert path_to_gtzan.exists(), f"{path_to_gtzan} does not exist"
cache_dir = Path(cache_dir)
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
# load our interface
# argbind will automatically load the default config,
interface = Interface()
# gtzan should have a folder for each genre, so let's get the list of genres
genres = [Path(x).name for x in path_to_gtzan.iterdir() if x.is_dir()]
print(f"Found {len(genres)} genres")
print(f"genres: {genres}")
# collect audio files, genres, and embeddings
data = []
for genre in genres:
audio_files = list(at.util.find_audio(path_to_gtzan / genre))
print(f"Found {len(audio_files)} audio files for genre {genre}")
for audio_file in tqdm.tqdm(audio_files, desc=f"embedding genre {genre}"):
# check if we have a cached embedding for this file
cached_path = (cache_dir / f"{genre}_{audio_file.stem}.emb")
if cached_path.exists():
# if so, load it
if DEBUG:
print(f"loading cached embedding for {cached_path.stem}")
embedding = Embedding.load(cached_path)
else:
try:
sig = AudioSignal(audio_file)
except Exception as e:
print(f"failed to load {audio_file.name} with error {e}")
print(f"skipping {audio_file.name}")
continue
# gets the embedding
emb = vampnet_embed(sig, interface).cpu().numpy()
# create an embedding we can save/load
embedding = Embedding(
genre=genre,
filename=audio_file.name,
embedding=emb
)
# cache the embeddings
cached_path.parent.mkdir(exist_ok=True, parents=True)
embedding.save(cached_path)
data.append(embedding)
# now, let's do a dim reduction on the embeddings
# and visualize them.
# collect a list of embeddings and labels
embeddings = [d.embedding for d in data]
labels = [d.genre for d in data]
# convert the embeddings to a numpy array
embeddings = np.stack(embeddings)
# do dimensionality reduction for each layer we're given
for layer in tqdm.tqdm(layers, desc="dim reduction"):
dim_reduce(
embeddings[:, layer, :], labels,
save_path=str(output_dir / f'vampnet-gtzan-layer={layer}.html'),
n_components=2, method='tsne',
title=f'vampnet-gtzan-layer={layer}'
)
if __name__ == "__main__":
args = argbind.parse_args()
with argbind.scope(args):
main() |