TastyPiano / src /cocktails /pipeline /cocktail2affect.py
Cédric Colas
initial commit
e775f6d
raw
history blame
19.4 kB
import pandas as pd
import numpy as np
import os
from src.cocktails.utilities.cocktail_utilities import get_bunch_of_rep_keys
from src.cocktails.utilities.other_scrubbing_utilities import print_recipe
from src.cocktails.config import COCKTAILS_CSV_DATA
from src.music.config import CHECKPOINTS_PATH, EXPERIMENT_PATH
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from sklearn.neighbors import NearestNeighbors
import pickle
import random
experiment_path = EXPERIMENT_PATH + '/cocktails/representation_analysis/affective_mapping/'
min_max_path = CHECKPOINTS_PATH + "/cocktail_representation/minmax/"
cluster_model_path = CHECKPOINTS_PATH + "/music2cocktails/affects2affect_cluster/cluster_model.pickle"
affective_space_dimensions = ((-1, 1), (-1, 1), (-1, 1)) # valence, arousal, dominance
n_splits = (3, 3, 2) # number of bins per dimension
# dimensions_weights = [1, 1, 0.5]
dimensions_weights = [1, 1, 1]
total_n_clusters = np.prod(n_splits) # total number of bins
affective_boundaries = [np.arange(asd[0], asd[1]+1e-6, (asd[1] - asd[0]) / n_split) for asd, n_split in zip(affective_space_dimensions, n_splits)]
for af in affective_boundaries:
af[-1] += 1e-6
all_keys = get_bunch_of_rep_keys()['custom']
original_affective_keys = get_bunch_of_rep_keys()['affective']
affective_keys = [a.split(' ')[1] for a in original_affective_keys]
random.seed(0)
cluster_colors = ['#%06X' % random.randint(0, 0xFFFFFF) for _ in range(total_n_clusters)]
clustering_method = 'k_means' # 'k_means', 'handcoded', 'agglo', 'spectral'
if clustering_method != 'handcoded':
total_n_clusters = 10
min_arousal = np.loadtxt(min_max_path + 'min_arousal.txt')
max_arousal = np.loadtxt(min_max_path + 'max_arousal.txt')
min_val = np.loadtxt(min_max_path + 'min_valence.txt')
max_val = np.loadtxt(min_max_path + 'max_valence.txt')
min_dom = np.loadtxt(min_max_path + 'min_dominance.txt')
max_dom = np.loadtxt(min_max_path + 'max_dominance.txt')
def get_cocktail_reps(path, save=False):
cocktail_data = pd.read_csv(path)
cocktail_reps = np.array([cocktail_data[k] for k in original_affective_keys]).transpose()
n_data, dim_rep = cocktail_reps.shape
# print(f'{n_data} data points of {dim_rep} dimensions: {affective_keys}')
cocktail_reps = normalize_cocktail_reps_affective(cocktail_reps, save=save)
if save:
np.savetxt(experiment_path + f'cocktail_reps_for_affective_mapping_-1_1_norm_sigmoid_rescaling_{dim_rep}_keys.txt', cocktail_reps)
return cocktail_reps
def sigmoid(x, shift, beta):
return (1 / (1 + np.exp(-(x + shift) * beta)) - 0.5) * 2
def normalize_cocktail_reps_affective(cocktail_reps, save=False):
if save:
min_cr = cocktail_reps.min(axis=0)
max_cr = cocktail_reps.max(axis=0)
np.savetxt(min_max_path + 'min_cocktail_reps_affective.txt', min_cr)
np.savetxt(min_max_path + 'max_cocktail_reps_affective.txt', max_cr)
else:
min_cr = np.loadtxt(min_max_path + 'min_cocktail_reps_affective.txt')
max_cr = np.loadtxt(min_max_path + 'max_cocktail_reps_affective.txt')
cocktail_reps = ((cocktail_reps - min_cr) / (max_cr - min_cr) - 0.5) * 2
cocktail_reps[:, 0] = sigmoid(cocktail_reps[:, 0], shift=0.05, beta=4)
cocktail_reps[:, 1] = sigmoid(cocktail_reps[:, 1], shift=0.3, beta=5)
cocktail_reps[:, 2] = sigmoid(cocktail_reps[:, 2], shift=0.15, beta=3)
cocktail_reps[:, 3] = sigmoid(cocktail_reps[:, 3], shift=0.9, beta=20)
cocktail_reps[:, 4] = sigmoid(cocktail_reps[:, 4], shift=0, beta=4)
cocktail_reps[:, 5] = sigmoid(cocktail_reps[:, 5], shift=0.2, beta=3)
cocktail_reps[:, 6] = sigmoid(cocktail_reps[:, 6], shift=0.5, beta=5)
cocktail_reps[:, 7] = sigmoid(cocktail_reps[:, 7], shift=0.2, beta=6)
return cocktail_reps
def plot(cocktail_reps):
dim_rep = cocktail_reps.shape[1]
for i in range(dim_rep):
for j in range(i+1, dim_rep):
plt.figure()
plt.scatter(cocktail_reps[:, i], cocktail_reps[:, j], s=150, alpha=0.5)
plt.xlabel(affective_keys[i])
plt.ylabel(affective_keys[j])
plt.savefig(experiment_path + f'scatters/{affective_keys[i]}_vs_{affective_keys[j]}.png', dpi=300)
plt.close('all')
plt.figure()
plt.hist(cocktail_reps[:, i])
plt.xlabel(affective_keys[i])
plt.savefig(experiment_path + f'hists/{affective_keys[i]}.png', dpi=300)
plt.close('all')
def get_clusters(affective_coordinates, save=False):
if clustering_method in ['k_means', 'gmm',]:
if clustering_method == 'k_means': model = KMeans(n_clusters=total_n_clusters)
elif clustering_method == 'gmm': model = GaussianMixture(n_components=total_n_clusters, covariance_type="full")
model.fit(affective_coordinates * np.array(dimensions_weights))
def find_cluster(aff_coord):
if aff_coord.ndim == 1:
aff_coord = aff_coord.reshape(1, -1)
return model.predict(aff_coord * np.array(dimensions_weights))
cluster_centers = model.cluster_centers_ if clustering_method == 'k_means' else []
if save:
to_save = dict(cluster_model=model,
cluster_centers=cluster_centers,
nb_clusters=len(cluster_centers),
dimensions_weights=dimensions_weights)
with open(cluster_model_path, 'wb') as f:
pickle.dump(to_save, f)
stop= 1
elif clustering_method == 'handcoded':
def find_cluster(aff_coord):
if aff_coord.ndim == 1:
aff_coord = aff_coord.reshape(1, -1)
cluster_coordinates = []
for i in range(aff_coord.shape[0]):
cluster_coordinates.append([np.argwhere(affective_boundaries[j] <= aff_coord[i, j]).flatten()[-1] for j in range(3)])
cluster_coordinates = np.array(cluster_coordinates)
cluster_ids = cluster_coordinates[:, 0] * np.prod(n_splits[1:]) + cluster_coordinates[:, 1] * n_splits[-1] + cluster_coordinates[:, 2]
return cluster_ids
# find cluster centers
cluster_centers = []
for i in range(n_splits[0]):
asd = affective_space_dimensions[0]
x_coordinate = np.arange(asd[0] + 1 / n_splits[0], asd[1], (asd[1] - asd[0]) / n_splits[0])[i]
for j in range(n_splits[1]):
asd = affective_space_dimensions[1]
y_coordinate = np.arange(asd[0] + 1 / n_splits[1], asd[1], (asd[1] - asd[0]) / n_splits[1])[j]
for k in range(n_splits[2]):
asd = affective_space_dimensions[2]
z_coordinate = np.arange(asd[0] + 1 / n_splits[2], asd[1], (asd[1] - asd[0]) / n_splits[2])[k]
cluster_centers.append([x_coordinate, y_coordinate, z_coordinate])
cluster_centers = np.array(cluster_centers)
else:
raise NotImplemented
cluster_ids = find_cluster(affective_coordinates)
return cluster_ids, cluster_centers, find_cluster
def cocktail2affect(cocktail_reps, save=False):
if cocktail_reps.ndim == 1:
cocktail_reps = cocktail_reps.reshape(1, -1)
assert affective_keys == ['booze', 'sweet', 'sour', 'fizzy', 'complex', 'bitter', 'spicy', 'colorful']
all_weights = []
# valence
# + sweet - bitter - booze + colorful
weights = np.array([-1, 1, 0, 0, 0, -1, 0, 1])
valence = (cocktail_reps * weights).sum(axis=1)
if save:
min_ = valence.min()
max_ = valence.max()
np.savetxt(min_max_path + 'min_valence.txt', np.array([min_]))
np.savetxt(min_max_path + 'max_valence.txt', np.array([max_]))
else:
min_ = min_val
max_ = max_val
valence = 2 * ((valence - min_) / (max_ - min_) - 0.5)
valence = sigmoid(valence, shift=0.1, beta=3.5)
valence = valence.reshape(-1, 1)
all_weights.append(weights.copy())
# arousal
# + fizzy + sour + complex - sweet + spicy + bitter
# weights = np.array([0, -1, 1, 1, 1, 1, 1, 0])
weights = np.array([0.7, 0, 1.5, 1.5, 0.6, 0, 0.6, 0])
arousal = (cocktail_reps * weights).sum(axis=1)
if save:
min_ = arousal.min()
max_ = arousal.max()
np.savetxt(min_max_path + 'min_arousal.txt', np.array([min_]))
np.savetxt(min_max_path + 'max_arousal.txt', np.array([max_]))
else:
min_, max_ = min_arousal, max_arousal
arousal = 2 * ((arousal - min_) / (max_ - min_) - 0.5) # normalize to -1, 1
arousal = sigmoid(arousal, shift=0.3, beta=4)
arousal = arousal.reshape(-1, 1)
all_weights.append(weights.copy())
# dominance
# assert affective_keys == ['booze', 'sweet', 'sour', 'fizzy', 'complex', 'bitter', 'spicy', 'colorful']
# + booze + fizzy - complex - bitter - sweet
weights = np.array([1.5, -0.8, 0, 0.7, -1, -1.5, 0, 0])
dominance = (cocktail_reps * weights).sum(axis=1)
if save:
min_ = dominance.min()
max_ = dominance.max()
np.savetxt(min_max_path + 'min_dominance.txt', np.array([min_]))
np.savetxt(min_max_path + 'max_dominance.txt', np.array([max_]))
else:
min_, max_ = min_dom, max_dom
dominance = 2 * ((dominance - min_) / (max_ - min_) - 0.5)
dominance = sigmoid(dominance, shift=-0.05, beta=5)
dominance = dominance.reshape(-1, 1)
all_weights.append(weights.copy())
affective_coordinates = np.concatenate([valence, arousal, dominance], axis=1)
# if save:
# assert (affective_coordinates.min(axis=0) == np.array([ac[0] for ac in affective_space_dimensions])).all()
# assert (affective_coordinates.max(axis=0) == np.array([ac[1] for ac in affective_space_dimensions])).all()
return affective_coordinates, all_weights
def save_reps(path, affective_cluster_ids):
cocktail_data = pd.read_csv(path)
rep_keys = get_bunch_of_rep_keys()['custom']
cocktail_reps = np.array([cocktail_data[k] for k in rep_keys]).transpose()
np.savetxt(experiment_path + 'clustered_representations/' + f'min_cocktail_reps_custom_keys_dim{cocktail_reps.shape[1]}.txt', cocktail_reps.min(axis=0))
np.savetxt(experiment_path + 'clustered_representations/' + f'max_cocktail_reps_custom_keys_dim{cocktail_reps.shape[1]}.txt', cocktail_reps.max(axis=0))
cocktail_reps = ((cocktail_reps - cocktail_reps.min(axis=0)) / (cocktail_reps.max(axis=0) - cocktail_reps.min(axis=0)) - 0.5) * 2 # normalize in -1, 1
np.savetxt(experiment_path + 'clustered_representations/' + f'all_cocktail_reps_norm-1_1_custom_keys_dim{cocktail_reps.shape[1]}.txt', cocktail_reps)
np.savetxt(experiment_path + 'clustered_representations/' + 'affective_cluster_ids.txt', affective_cluster_ids)
for cluster_id in sorted(set(affective_cluster_ids)):
indexes = np.argwhere(affective_cluster_ids == cluster_id).flatten()
reps = cocktail_reps[indexes, :]
np.savetxt(experiment_path + 'clustered_representations/' + f'rep_cluster{cluster_id}_norm-1_1_custom_keys_dim{cocktail_reps.shape[1]}.txt', reps)
def study_affects(affective_coordinates, affective_cluster_ids):
plt.figure()
plt.hist(affective_cluster_ids, bins=total_n_clusters)
plt.xlabel('Affective cluster ids')
plt.xticks(np.arange(total_n_clusters))
plt.savefig(experiment_path + 'affective_cluster_distrib.png')
fig = plt.gcf()
plt.close(fig)
fig = plt.figure()
ax = fig.add_subplot(projection='3d')
ax.set_xlim([-1, 1])
ax.set_ylim([-1, 1])
ax.set_zlim([-1, 1])
for cluster_id in sorted(set(affective_cluster_ids)):
indexes = np.argwhere(affective_cluster_ids == cluster_id).flatten()
ax.scatter(affective_coordinates[indexes, 0], affective_coordinates[indexes, 1], affective_coordinates[indexes, 2], c=cluster_colors[cluster_id], s=150)
ax.set_xlabel('Valence')
ax.set_ylabel('Arousal')
ax.set_zlabel('Dominance')
stop = 1
plt.savefig(experiment_path + 'scatters_affect/affective_mapping.png')
fig = plt.gcf()
plt.close(fig)
affects = ['Valence', 'Arousal', 'Dominance']
for i in range(3):
for j in range(i + 1, 3):
fig = plt.figure()
ax = fig.add_subplot()
for cluster_id in sorted(set(affective_cluster_ids)):
indexes = np.argwhere(affective_cluster_ids == cluster_id).flatten()
ax.scatter(affective_coordinates[indexes, i], affective_coordinates[indexes, j], alpha=0.5, c=cluster_colors[cluster_id], s=150)
ax.set_xlabel(affects[i])
ax.set_ylabel(affects[j])
plt.savefig(experiment_path + f'scatters_affect/scatter_{affects[i]}_vs_{affects[j]}.png')
fig = plt.gcf()
plt.close(fig)
plt.figure()
plt.hist(affective_coordinates[:, i])
plt.xlabel(affects[i])
plt.savefig(experiment_path + f'hists_affect/hist_{affects[i]}.png')
fig = plt.gcf()
plt.close(fig)
plt.close('all')
stop = 1
def sample_clusters(path, cocktail_reps, all_weights, affective_cluster_ids, affective_cluster_centers, affective_coordinates, n_samples=4):
cocktail_data = pd.read_csv(path)
these_cocktail_reps = normalize_cocktail_reps_affective(np.array([cocktail_data[k] for k in original_affective_keys]).transpose())
names = cocktail_data['names']
urls = cocktail_data['urls']
ingr_str = cocktail_data['ingredients_str']
for cluster_id in sorted(set(affective_cluster_ids)):
indexes = np.argwhere(affective_cluster_ids == cluster_id).flatten()
print('\n\n\n---------\n----------\n-----------\n')
cluster_str = ''
cluster_str += f'Affective cluster #{cluster_id}' + \
f'\n\tSize: {len(indexes)}' + \
f'\n\tCenter: ' + \
f'\n\t\tVal: {affective_cluster_centers[cluster_id][0]:.2f}, ' + \
f'\n\t\tArousal: {affective_cluster_centers[cluster_id][1]:.2f}, ' + \
f'\n\t\tDominance: {affective_cluster_centers[cluster_id][2]:.2f}'
print(cluster_str)
if affective_cluster_centers[cluster_id][2] == np.max(affective_cluster_centers[:, 2]):
stop = 1
sampled_idx = np.random.choice(indexes, size=min(len(indexes), n_samples), replace=False)
cocktail_str = ''
for i in sampled_idx:
assert np.sum(cocktail_reps[i] - these_cocktail_reps[i]) < 1e-9
cocktail_str += f'\n\n-------------'
cocktail_str += print_recipe(ingr_str[i], name=names[i], to_print=False)
cocktail_str += f'\nUrl: {urls[i]}'
cocktail_str += '\n\nRepresentation: ' + ', '.join([f'{af}: {cr:.2f}' for af, cr in zip(affective_keys, cocktail_reps[i])]) + '\n'
cocktail_str += '\n' + generate_explanation(cocktail_reps[i], all_weights, affective_coordinates[i])
print(cocktail_str)
stop = 1
cluster_str += '\n' + cocktail_str
with open(f"/home/cedric/Documents/pianocktail/experiments/cocktails/representation_analysis/affective_mapping/clusters/cluster_{cluster_id}", 'w') as f:
f.write(cluster_str)
stop = 1
def explanation_per_dimension(i, cocktail_rep, all_weights, aff_coord):
names = ['valence', 'arousal', 'dominance']
weights = all_weights[i]
explanation_str = f'\n{names[i].capitalize()} explanation ({aff_coord[i]:.2f}):'
strengths = np.abs(weights * cocktail_rep)
strengths /= strengths.sum()
indexes = np.flip(np.argsort(strengths))
for ind in indexes:
if strengths[ind] != 0:
if np.sign(weights[ind]) == np.sign(cocktail_rep[ind]):
keyword = 'high' if cocktail_rep[ind] > 0 else 'low'
explanation_str += f'\n\t{int(strengths[ind]*100)}%: higher {names[i]} because {keyword} {affective_keys[ind]}'
else:
keyword = 'high' if cocktail_rep[ind] > 0 else 'low'
explanation_str += f'\n\t{int(strengths[ind]*100)}%: low {names[i]} because {keyword} {affective_keys[ind]}'
return explanation_str
def generate_explanation(cocktail_rep, all_weights, aff_coord):
explanation_str = ''
for i in range(3):
explanation_str += explanation_per_dimension(i, cocktail_rep, all_weights, aff_coord)
return explanation_str
def cocktails2affect_clusters(cocktail_rep):
if cocktail_rep.ndim == 1:
cocktail_rep = cocktail_rep.reshape(1, -1)
affective_coordinates, _ = cocktail2affect(cocktail_rep)
affective_cluster_ids, _, _ = get_clusters(affective_coordinates)
return affective_cluster_ids
def setup_affective_space(path, save=False):
cocktail_data = pd.read_csv(path)
names = cocktail_data['names']
recipes = cocktail_data['ingredients_str']
urls = cocktail_data['urls']
reps = get_cocktail_reps(path)
affective_coordinates, all_weights = cocktail2affect(reps)
affective_cluster_ids, affective_cluster_centers, find_cluster = get_clusters(affective_coordinates, save=save)
nn_model = NearestNeighbors(n_neighbors=1)
nn_model.fit(affective_coordinates)
def cocktail2affect_cluster(cocktail_rep):
affective_coordinates, _ = cocktail2affect(cocktail_rep)
return find_cluster(affective_coordinates)
affective_clusters = dict(affective_coordinates=affective_coordinates, # coordinates of cocktail in affective space
affective_cluster_ids=affective_cluster_ids, # cluster id of cocktails
affective_cluster_centers=affective_cluster_centers, # cluster centers in affective space
affective_weights=all_weights, # weights to compute valence, arousal, dominance from cocktail representations
original_affective_keys=original_affective_keys,
cocktail_reps=reps, # cocktail representations from the dataset (normalized)
find_cluster=find_cluster, # function to retrieve a cluster from affective coordinates
nn_model=nn_model, # to predict the nearest neighbor affective space,
names=names, # names of cocktails in the dataset
urls=urls, # urls from the dataset
recipes=recipes, # recipes of the dataset
cocktail2affect=cocktail2affect, # function to compute affects from cocktail representations
cocktails2affect_clusters=cocktails2affect_clusters,
cocktail2affect_cluster=cocktail2affect_cluster
)
return affective_clusters
if __name__ == '__main__':
reps = get_cocktail_reps(COCKTAILS_CSV_DATA, save=True)
# plot(reps)
affective_coordinates, all_weights = cocktail2affect(reps, save=True)
affective_cluster_ids, affective_cluster_centers, find_cluster = get_clusters(affective_coordinates)
save_reps(COCKTAILS_CSV_DATA, affective_cluster_ids)
study_affects(affective_coordinates, affective_cluster_ids)
sample_clusters(COCKTAILS_CSV_DATA, reps, all_weights, affective_cluster_ids, affective_cluster_centers, affective_coordinates)
setup_affective_space(COCKTAILS_CSV_DATA, save=True)