Spaces:
Runtime error
Runtime error
import numpy as np | |
import json | |
import os | |
valid_track_infos = {'uri', 'name', 'artist_name', 'popularity', 'artist_genres', 'album', | |
'artist_popularity', 'audio_features', 'audio_analysis'} | |
def get_all_tracks_from_playlist_uri(sp, playlist_uri): | |
# get all playlist_tracks | |
offset = 0 | |
tracks = [] | |
done = False | |
while not done: | |
new_tracks = sp.playlist_tracks(playlist_uri, offset=offset, limit=100)["items"] | |
tracks += new_tracks | |
if len(new_tracks) < 100: | |
done = True | |
else: | |
offset += 100 | |
return tracks | |
def update_data_with_audio_features(sp, uris, data): | |
assert len(uris) <= 100 | |
tracks_audio_features = sp.audio_features(uris) | |
for i in range(len(uris)): | |
data[uris[i]]['track']['audio_features'] = tracks_audio_features[i] | |
return data, [] | |
def check_all_track_has_audio_features(data): | |
for uri in data.keys(): | |
assert 'audio_features' in data[uri]['track'].keys() | |
def get_all_tracks_from_playlists(sp, playlist_uris, verbose=False): | |
if verbose: print(f'Extracting all tracks from {len(playlist_uris)} playlists.') | |
# load data | |
cache_path = './cache_track_features_tmp.json' | |
if True: #not os.path.exists(cache_path): | |
with open(cache_path, 'w') as f: | |
json.dump(dict(), f) | |
with open(cache_path, 'r') as f: | |
data = json.load(f) | |
for k in list(data.keys()).copy(): | |
if k not in playlist_uris: | |
data.pop(k) | |
else: | |
print(k) | |
if verbose: print(f'\t{len(data.keys())} tracks loaded from cache') | |
# for each playlist, extract all tracks, remove doubles | |
if verbose: print(f'\tScanning tracks for each playlist') | |
new_additions = 0 | |
added_uris = [] | |
for i_playlist, playlist_uri in enumerate(playlist_uris): | |
new_tracks = get_all_tracks_from_playlist_uri(sp, playlist_uri) | |
# remove doubles | |
for new_track in new_tracks: | |
uri = new_track['track']['uri'].split(':')[-1] | |
if uri not in set(data.keys()): | |
genres = sp.artist(new_track['track']['artists'][0]['uri'])['genres'] | |
new_track['track']['genres'] = genres | |
data[uri] = new_track | |
added_uris.append(uri) | |
new_additions += 1 | |
# when 100 new added uris, compute their audio features | |
if len(added_uris) == 100: | |
data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
if (new_additions + 1) % 1000 == 0: | |
data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
check_all_track_has_audio_features(data) | |
with open(cache_path, 'w') as f: | |
json.dump(data, f) | |
if verbose: print(f"\t\t{i_playlist + 1} playlists scanned ({new_additions} new tracks, total: {len(data.keys())} tracks)") | |
if verbose: print('\tDone.') | |
data, _ = update_data_with_audio_features(sp, added_uris, data) | |
check_all_track_has_audio_features(data) | |
with open(cache_path, 'w') as f: | |
json.dump(data, f) | |
return data | |
def get_all_tracks_from_user(sp, user_id='bkayf', verbose=False): | |
if verbose: print(f'Extracting all tracks from user {user_id}.') | |
# load data | |
if user_id == 'bkayf': | |
cache_path = '../data/bkayf/cache_track_features.json' | |
if not os.path.exists(cache_path): | |
with open(cache_path, 'w') as f: | |
json.dump(dict(), f) | |
with open(cache_path, 'r') as f: | |
data = json.load(f) | |
else: | |
data = dict() | |
if verbose: print(f'\t{len(data.keys())} tracks loaded from cache') | |
# first get all playlists | |
offset = 0 | |
done = False | |
playlists = [] | |
if verbose: print(f'\tScanning playlists.') | |
while not done: | |
new_playlists = sp.user_playlists(user_id, offset=offset, limit=50)['items'] | |
playlists += new_playlists | |
if len(new_playlists) < 50: | |
done = True | |
if verbose: print(f'\t\tfrom {offset} to {offset + len(new_playlists)} (complete).') | |
else: | |
if verbose: print(f'\t\tfrom {offset} to {offset + len(new_playlists)},') | |
offset += 50 | |
# for each playlist, extract all tracks, remove doubles | |
if verbose: print(f'\tScanning tracks for each playlist') | |
new_additions = 0 | |
added_uris = [] | |
for i_playlist, playlist in enumerate(playlists): | |
if (i_playlist + 1) % 5 == 0: | |
if verbose: print(f"\t\t{i_playlist + 1} playlists scanned ({new_additions} new tracks, total: {len(data.keys())} tracks)") | |
playlist_uri = playlist['uri'].split(':')[-1] | |
new_tracks = get_all_tracks_from_playlist_uri(sp, playlist_uri) | |
# remove doubles | |
for new_track in new_tracks: | |
uri = new_track['track']['uri'].split(':')[-1] | |
if uri not in set(data.keys()): | |
data[uri] = new_track | |
added_uris.append(uri) | |
new_additions += 1 | |
# when 100 new added uris, compute their audio features | |
if len(added_uris) == 100: | |
data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
if (new_additions + 1) % 1000 == 0 and user_id == "bkayf": | |
data, added_uris = update_data_with_audio_features(sp, added_uris, data) | |
check_all_track_has_audio_features(data) | |
with open(cache_path, 'w') as f: | |
json.dump(data, f) | |
if verbose: print('\tDone.') | |
if user_id == "bkayf": | |
data, _ = update_data_with_audio_features(sp, added_uris, data) | |
check_all_track_has_audio_features(data) | |
with open(cache_path, 'w') as f: | |
json.dump(data, f) | |
return data | |
def get_uri_from_link(link): | |
return link.split("?")[0].split("/")[-1] | |
def get_track_info_from_playlist_uri(sp, playlist_uri, which_info=['uri'], verbose=False): | |
output = dict() | |
assert len(set(which_info) - valid_track_infos) == 0, f"Error which_info. Valid infos are: {valid_track_infos}" | |
tracks = get_all_tracks_from_playlist_uri(sp, playlist_uri) | |
if verbose: print(f'Playlist with {len(tracks)} tracks.') | |
# prepare artist info if needed | |
if any([w in which_info for w in ['artist_genres', 'artist_popularity', 'artist_name']]): | |
artist_uris = [x["track"]["artists"][0]["uri"] for x in tracks] | |
artist_infos = [sp.artist(artist_uri) for artist_uri in artist_uris] | |
for info in which_info: | |
# print(info) | |
if info in ['uri', 'name', 'album', 'popularity']: | |
output[info] = [] | |
for i_t, x in enumerate(tracks): | |
print(i_t) | |
output[info].append(x["track"][info]) | |
# output[info] = [x["track"][info] for x in tracks] | |
elif info in ['artist_genres', 'artist_popularity', 'artist_name']: | |
output[info] = [artist_info[info.split('_')[1]] for artist_info in artist_infos] | |
elif info == 'album': | |
output[info] = [x["track"][info]["name"] for x in tracks] | |
elif info == 'audio_features': | |
output[info] = [] | |
for i_t, x in enumerate(tracks): | |
print(i_t) | |
output[info].append(sp.audio_features(x["track"]["uri"])) | |
# output[info] = [sp.audio_features(x["track"]["uri"]) for x in tracks] | |
elif info == 'audio_analysis': | |
output[info] = [sp.audio_analysis(x["track"]["uri"]) for x in tracks] | |
else: | |
raise NotImplementedError | |
return output | |
def compute_progress_and_eta(times, iter, total, n_av=3000): | |
av_time = np.mean(times[-n_av:]) | |
progress = int(((iter + 1) / total) * 100) | |
eta_h = int(av_time * (total - iter) // 3600) | |
eta_m = int((av_time * (total - iter) - (eta_h * 3600)) // 60) | |
eta_s = int((av_time * (total - iter) - (eta_h * 3600) - eta_m * 60)) | |
eta = f"Progress: {progress}%, ETA: {eta_h}H{eta_m}M{eta_s}S." | |
return eta | |