Spaces:
Sleeping
Sleeping
import pandas as pd | |
import spotipy | |
from spotipy.oauth2 import SpotifyOAuth, SpotifyClientCredentials | |
import yaml | |
import re | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sklearn.preprocessing import MinMaxScaler | |
import pickle | |
import streamlit as st | |
import os | |
import dotenv | |
dotenv.load_dotenv() | |
spotify_client_id = os.getenv("CLIENT_ID") | |
spotify_client_secret = os.getenv("CLIENT_SECRET") | |
def get_track_info(track_uri): | |
auth_manager = SpotifyClientCredentials(client_id=spotify_client_id, client_secret=spotify_client_secret) | |
sp = spotipy.client.Spotify(auth_manager=auth_manager) | |
# Get track information | |
track_info = sp.track(track_uri) | |
# Extract track name and artist name | |
track_name = track_info['name'] | |
artist_name = track_info['artists'][0]['name'] | |
# Return the track name and artist name | |
return track_name, artist_name | |
def get_track_names(playlist_id): | |
track_names = [] | |
auth_manager = SpotifyClientCredentials(client_id=spotify_client_id, client_secret=spotify_client_secret) | |
sp = spotipy.client.Spotify(auth_manager=auth_manager) | |
# Get playlist | |
playlist = sp.playlist(playlist_id) | |
# Extract track names | |
for item in playlist['tracks']['items']: | |
track = item['track'] | |
track_name = track['name'] | |
artists = [artist['name'] for artist in track['artists']] | |
track_names.append({'track_name': track_name, 'artist_name': artists[0]}) | |
return track_names | |
def parse_results(results): | |
# Initialize lists to store results | |
names = [] | |
artists = [] | |
uris = [] | |
# Loop through each track in the results | |
for idx, item in enumerate(results['tracks']['items']): | |
names.append(item['name']) | |
artists.append(item['artists'][0]['name']) | |
uris.append(item['uri']) | |
# Create a DataFrame | |
df = pd.DataFrame({ | |
'Name': names, | |
'Artist': artists, | |
'URI': uris | |
}) | |
return df | |
def search_spotify(query): | |
log = [] | |
try: | |
log.append('spotify local method') | |
auth_manager = SpotifyClientCredentials(client_id=spotify_client_id, client_secret=spotify_client_secret) | |
except: | |
log.append('spotify .streamlit method') | |
try: | |
Client_id=st.secrets["Client_ID"] | |
client_secret=st.secrets["Client_secret"] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
except: | |
log.append('spotify hug method') | |
Client_id=os.environ['Client_ID'] | |
client_secret=os.environ['Client_secret'] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
sp = spotipy.client.Spotify(auth_manager=auth_manager) | |
results = sp.search(q=query, type='track,playlist') | |
return results | |
def playlist_model(url, model, max_gen=3, same_art=5): | |
log = [] | |
Fresult = [] | |
try: | |
log.append('Start logging') | |
uri = url.split('/')[-1].split('?')[0] | |
try: | |
log.append('spotify local method') | |
auth_manager = SpotifyClientCredentials(client_id=spotify_client_id, client_secret=spotify_client_secret) | |
except: | |
log.append('spotify .streamlit method') | |
try: | |
Client_id=st.secrets["Client_ID"] | |
client_secret=st.secrets["Client_secret"] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
except: | |
log.append('spotify hug method') | |
Client_id=os.environ['Client_ID'] | |
client_secret=os.environ['Client_secret'] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
sp = spotipy.client.Spotify(auth_manager=auth_manager) | |
if model == 'Spotify Model': | |
def get_IDs(user, playlist_id): | |
try: | |
log.append('start playlist extraction') | |
track_ids = [] | |
playlist = sp.user_playlist(user, playlist_id) | |
for item in playlist['tracks']['items']: | |
track = item['track'] | |
track_ids.append(track['id']) | |
return track_ids | |
except Exception as e: | |
log.append('Failed to load the playlist') | |
log.append(e) | |
track_ids = get_IDs('Ruby', uri) | |
track_ids_uni = list(set(track_ids)) | |
log.append('Starting Spotify Model') | |
Spotifyresult = pd.DataFrame() | |
for i in range(len(track_ids_uni)-5): | |
if len(Spotifyresult) >= 5: | |
break | |
try: | |
ff = sp.recommendations(seed_tracks=list(track_ids_uni[i:i+5]), limit=5) | |
except Exception as e: | |
log.append(e) | |
continue | |
for z in range(5): | |
result = pd.DataFrame([z+(5*i)+1]) | |
result['uri'] = ff['tracks'][z]['id'] | |
Spotifyresult = pd.concat([Spotifyresult, result], axis=0) | |
Spotifyresult.drop_duplicates(subset=['uri'], inplace=True,keep='first') | |
Fresult = Spotifyresult.uri[:5] | |
log.append('Model run successfully') | |
return Fresult, log | |
lendf=len(pd.read_csv('data/streamlit.csv',usecols=['track_uri'])) | |
dtypes = {'track_uri': 'object', 'artist_uri': 'object', 'album_uri': 'object', 'danceability': 'float16', 'energy': 'float16', 'key': 'float16', | |
'loudness': 'float16', 'mode': 'float16', 'speechiness': 'float16', 'acousticness': 'float16', 'instrumentalness': 'float16', | |
'liveness': 'float16', 'valence': 'float16', 'tempo': 'float16', 'duration_ms': 'float32', 'time_signature': 'float16', | |
'Track_release_date': 'int8', 'Track_pop': 'int8', 'Artist_pop': 'int8', 'Artist_genres': 'object'} | |
col_name= ['track_uri', 'artist_uri', 'album_uri', 'danceability', 'energy', 'key', | |
'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', | |
'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature', | |
'Track_release_date', 'Track_pop', 'Artist_pop', 'Artist_genres'] | |
try: | |
def get_IDs(user, playlist_id): | |
log.append('start playlist extraction') | |
track_ids = [] | |
artist_id = [] | |
playlist = sp.user_playlist(user, playlist_id) | |
for item in playlist['tracks']['items']: | |
track = item['track'] | |
track_ids.append(track['id']) | |
artist = item['track']['artists'] | |
artist_id.append(artist[0]['id']) | |
return track_ids, artist_id | |
except Exception as e: | |
log.append('Failed to load the playlist') | |
log.append(e) | |
track_ids, artist_id = get_IDs('Ruby', uri) | |
log.append("Number of Track : {}".format(len(track_ids))) | |
artist_id_uni = list(set(artist_id)) | |
track_ids_uni = list(set(track_ids)) | |
log.append("Number of unique Artists : {}".format(len(artist_id_uni))) | |
log.append("Number of unique Tracks : {}".format(len(track_ids_uni))) | |
def extract(track_ids_uni, artist_id_uni): | |
err = [] | |
err.append('Start audio features extraction') | |
audio_features = pd.DataFrame() | |
for i in range(0, len(track_ids_uni), 25): | |
try: | |
track_feature = sp.audio_features(track_ids_uni[i:i+25]) | |
track_df = pd.DataFrame(track_feature) | |
audio_features = pd.concat([audio_features, track_df], axis=0) | |
except Exception as e: | |
err.append(e) | |
continue | |
err.append('Start track features extraction') | |
track_ = pd.DataFrame() | |
for i in range(0, len(track_ids_uni), 25): | |
try: | |
track_features = sp.tracks(track_ids_uni[i:i+25]) | |
for x in range(25): | |
track_pop = pd.DataFrame([track_ids_uni[i+x]], columns=['Track_uri']) | |
track_pop['Track_release_date'] = track_features['tracks'][x]['album']['release_date'] | |
track_pop['Track_pop'] = track_features['tracks'][x]["popularity"] | |
track_pop['Artist_uri'] = track_features['tracks'][x]['artists'][0]['id'] | |
track_pop['Album_uri'] = track_features['tracks'][x]['album']['id'] | |
track_ = pd.concat([track_, track_pop], axis=0) | |
except Exception as e: | |
err.append(e) | |
continue | |
err.append('Start artist features extraction') | |
artist_ = pd.DataFrame() | |
for i in range(0, len(artist_id_uni), 25): | |
try: | |
artist_features = sp.artists(artist_id_uni[i:i+25]) | |
for x in range(25): | |
artist_df = pd.DataFrame([artist_id_uni[i+x]], columns=['Artist_uri']) | |
artist_pop = artist_features['artists'][x]["popularity"] | |
artist_genres = artist_features['artists'][x]["genres"] | |
artist_df["Artist_pop"] = artist_pop | |
if artist_genres: | |
artist_df["genres"] = " ".join([re.sub(' ', '_', i) for i in artist_genres]) | |
else: | |
artist_df["genres"] = "unknown" | |
artist_ = pd.concat([artist_, artist_df], axis=0) | |
except Exception as e: | |
err.append(e) | |
continue | |
try: | |
test = pd.DataFrame( | |
track_, columns=['Track_uri', 'Artist_uri', 'Album_uri']) | |
test.rename(columns={'Track_uri': 'track_uri', | |
'Artist_uri': 'artist_uri', 'Album_uri': 'album_uri'}, inplace=True) | |
audio_features.drop( | |
columns=['type', 'uri', 'track_href', 'analysis_url'], axis=1, inplace=True) | |
test = pd.merge(test, audio_features, | |
left_on="track_uri", right_on="id", how='outer') | |
test = pd.merge(test, track_, left_on="track_uri", | |
right_on="Track_uri", how='outer') | |
test = pd.merge(test, artist_, left_on="artist_uri", | |
right_on="Artist_uri", how='outer') | |
test.rename(columns={'genres': 'Artist_genres'}, inplace=True) | |
test.drop(columns=['Track_uri', 'Artist_uri_x', | |
'Artist_uri_y', 'Album_uri', 'id'], axis=1, inplace=True) | |
test.dropna(axis=0, inplace=True) | |
test['Track_pop'] = test['Track_pop'].apply(lambda x: int(x/5)) | |
test['Artist_pop'] = test['Artist_pop'].apply(lambda x: int(x/5)) | |
test['Track_release_date'] = test['Track_release_date'].apply(lambda x: x.split('-')[0]) | |
test['Track_release_date'] = test['Track_release_date'].astype('int16') | |
test['Track_release_date'] = test['Track_release_date'].apply(lambda x: int(x/5)) | |
test[['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']] = test[[ | |
'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']].astype('float16') | |
test[['duration_ms']] = test[['duration_ms']].astype('float32') | |
test[['Track_release_date', 'Track_pop', 'Artist_pop']] = test[[ | |
'Track_release_date', 'Track_pop', 'Artist_pop']].astype('int8') | |
except Exception as e: | |
err.append(e) | |
err.append('Finish extraction') | |
return test, err | |
test, err = extract(track_ids_uni, artist_id_uni) | |
for i in err: | |
log.append(i) | |
del err | |
grow = test.copy() | |
test['Artist_genres'] = test['Artist_genres'].apply(lambda x: x.split(" ")) | |
tfidf = TfidfVectorizer(max_features=max_gen) | |
tfidf_matrix = tfidf.fit_transform(test['Artist_genres'].apply(lambda x: " ".join(x))) | |
genre_df = pd.DataFrame(tfidf_matrix.toarray()) | |
genre_df.columns = ['genre' + "|" +i for i in tfidf.get_feature_names_out()] | |
genre_df = genre_df.astype('float16') | |
test.drop(columns=['Artist_genres'], axis=1, inplace=True) | |
test = pd.concat([test.reset_index(drop=True),genre_df.reset_index(drop=True)], axis=1) | |
Fresult = pd.DataFrame() | |
x = 1 | |
for i in range(int(lendf/2), lendf+1, int(lendf/2)): | |
try: | |
df = pd.read_csv('data/streamlit.csv',names= col_name,dtype=dtypes,skiprows=x,nrows=i) | |
log.append('reading data frame chunks from {} to {}'.format(x,i)) | |
except Exception as e: | |
log.append('Failed to load grow') | |
log.append(e) | |
grow = grow[~grow['track_uri'].isin(df['track_uri'].values)] | |
df = df[~df['track_uri'].isin(test['track_uri'].values)] | |
df['Artist_genres'] = df['Artist_genres'].apply(lambda x: x.split(" ")) | |
tfidf_matrix = tfidf.transform(df['Artist_genres'].apply(lambda x: " ".join(x))) | |
genre_df = pd.DataFrame(tfidf_matrix.toarray()) | |
genre_df.columns = ['genre' + "|" +i for i in tfidf.get_feature_names_out()] | |
genre_df = genre_df.astype('float16') | |
df.drop(columns=['Artist_genres'], axis=1, inplace=True) | |
df = pd.concat([df.reset_index(drop=True), | |
genre_df.reset_index(drop=True)], axis=1) | |
del genre_df | |
try: | |
df.drop(columns=['genre|unknown'], axis=1, inplace=True) | |
test.drop(columns=['genre|unknown'], axis=1, inplace=True) | |
except: | |
log.append('genre|unknown not found') | |
log.append('Scaling the data .....') | |
if x == 1: | |
sc = pickle.load(open('data/sc.sav','rb')) | |
df.iloc[:, 3:19] = sc.transform(df.iloc[:, 3:19]) | |
test.iloc[:, 3:19] = sc.transform(test.iloc[:, 3:19]) | |
log.append("Creating playlist vector") | |
playvec = pd.DataFrame(test.sum(axis=0)).T | |
else: | |
df.iloc[:, 3:19] = sc.transform(df.iloc[:, 3:19]) | |
x = i | |
if model == 'Model 1': | |
df['sim']=cosine_similarity(df.drop(['track_uri', 'artist_uri', 'album_uri'], axis = 1),playvec.drop(['track_uri', 'artist_uri', 'album_uri'], axis = 1)) | |
df['sim2']=cosine_similarity(df.iloc[:,16:-1],playvec.iloc[:,16:]) | |
df['sim3']=cosine_similarity(df.iloc[:,19:-2],playvec.iloc[:,19:]) | |
df = df.sort_values(['sim3','sim2','sim'],ascending = False,kind='stable').groupby('artist_uri').head(same_art).head(5) | |
Fresult = pd.concat([Fresult, df], axis=0) | |
Fresult = Fresult.sort_values(['sim3', 'sim2', 'sim'],ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).head(5) | |
elif model == 'Model 2': | |
df['sim'] = cosine_similarity(df.iloc[:, 3:16], playvec.iloc[:, 3:16]) | |
df['sim2'] = cosine_similarity(df.loc[:, df.columns.str.startswith('T') | df.columns.str.startswith('A')], playvec.loc[:, playvec.columns.str.startswith('T') | playvec.columns.str.startswith('A')]) | |
df['sim3'] = cosine_similarity(df.loc[:, df.columns.str.startswith('genre')], playvec.loc[:, playvec.columns.str.startswith('genre')]) | |
df['sim4'] = (df['sim']+df['sim2']+df['sim3'])/3 | |
df = df.sort_values(['sim4'], ascending=False,kind='stable').groupby('artist_uri').head(same_art).head(5) | |
Fresult = pd.concat([Fresult, df], axis=0) | |
Fresult = Fresult.sort_values(['sim4'], ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).head(5) | |
del test | |
try: | |
del df | |
log.append('Getting Result') | |
except: | |
log.append('Getting Result') | |
if model == 'Model 1': | |
Fresult = Fresult.sort_values(['sim3', 'sim2', 'sim'],ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).track_uri.head(5) | |
elif model == 'Model 2': | |
Fresult = Fresult.sort_values(['sim4'], ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).track_uri.head(5) | |
log.append('{} New Tracks Found'.format(len(grow))) | |
if(len(grow)>=1): | |
try: | |
new=pd.read_csv('data/new_tracks.csv',dtype=dtypes) | |
new=pd.concat([new, grow], axis=0) | |
new=new[new.Track_pop >0] | |
new.drop_duplicates(subset=['track_uri'], inplace=True,keep='last') | |
new.to_csv('data/new_tracks.csv',index=False) | |
except: | |
grow.to_csv('data/new_tracks.csv', index=False) | |
log.append('Model run successfully') | |
except Exception as e: | |
log.append("Model Failed") | |
log.append(e) | |
return Fresult, log | |
def top_tracks(url,region): | |
log = [] | |
Fresult = [] | |
uri = url.split('/')[-1].split('?')[0] | |
try: | |
log.append('spotify local method') | |
auth_manager = SpotifyClientCredentials(client_id=spotify_client_id, client_secret=spotify_client_secret) | |
except: | |
log.append('spotify .streamlit method') | |
try: | |
Client_id=st.secrets["Client_ID"] | |
client_secret=st.secrets["Client_secret"] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
except: | |
log.append('spotify hug method') | |
Client_id=os.environ['Client_ID'] | |
client_secret=os.environ['Client_secret'] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
sp = spotipy.client.Spotify(auth_manager=auth_manager) | |
try: | |
log.append('Starting Spotify Model') | |
top=sp.artist_top_tracks(uri,country=region) | |
for i in range(5) : | |
Fresult.append(top['tracks'][i]['id']) | |
log.append('Model run successfully') | |
except Exception as e: | |
log.append("Model Failed") | |
log.append(e) | |
return Fresult,log | |
def song_model(url, model, max_gen=3, same_art=5): | |
log = [] | |
Fresult = [] | |
try: | |
log.append('Start logging') | |
uri = url.split('/')[-1].split('?')[0] | |
try: | |
log.append('spotify local method') | |
auth_manager = SpotifyClientCredentials(client_id=spotify_client_id, client_secret=spotify_client_secret) | |
except: | |
log.append('spotify .streamlit method') | |
try: | |
Client_id=st.secrets["Client_ID"] | |
client_secret=st.secrets["Client_secret"] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
except: | |
log.append('spotify hug method') | |
Client_id=os.environ['Client_ID'] | |
client_secret=os.environ['Client_secret'] | |
auth_manager = SpotifyClientCredentials(client_id=Client_id, client_secret=client_secret) | |
sp = spotipy.client.Spotify(auth_manager=auth_manager) | |
if model == 'Spotify Model': | |
log.append('Starting Spotify Model') | |
aa=sp.recommendations(seed_tracks=[uri], limit=25) | |
for i in range(25): | |
Fresult.append(aa['tracks'][i]['id']) | |
log.append('Model run successfully') | |
return Fresult, log | |
lendf=len(pd.read_csv('data/streamlit.csv',usecols=['track_uri'])) | |
dtypes = {'track_uri': 'object', 'artist_uri': 'object', 'album_uri': 'object', 'danceability': 'float16', 'energy': 'float16', 'key': 'float16', | |
'loudness': 'float16', 'mode': 'float16', 'speechiness': 'float16', 'acousticness': 'float16', 'instrumentalness': 'float16', | |
'liveness': 'float16', 'valence': 'float16', 'tempo': 'float16', 'duration_ms': 'float32', 'time_signature': 'float16', | |
'Track_release_date': 'int8', 'Track_pop': 'int8', 'Artist_pop': 'int8', 'Artist_genres': 'object'} | |
col_name= ['track_uri', 'artist_uri', 'album_uri', 'danceability', 'energy', 'key', | |
'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', | |
'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature', | |
'Track_release_date', 'Track_pop', 'Artist_pop', 'Artist_genres'] | |
log.append('Start audio features extraction') | |
audio_features = pd.DataFrame(sp.audio_features([uri])) | |
log.append('Start track features extraction') | |
track_ = pd.DataFrame() | |
track_features = sp.tracks([uri]) | |
track_pop = pd.DataFrame([uri], columns=['Track_uri']) | |
track_pop['Track_release_date'] = track_features['tracks'][0]['album']['release_date'] | |
track_pop['Track_pop'] = track_features['tracks'][0]["popularity"] | |
track_pop['Artist_uri'] = track_features['tracks'][0]['artists'][0]['id'] | |
track_pop['Album_uri'] = track_features['tracks'][0]['album']['id'] | |
track_ = pd.concat([track_, track_pop], axis=0) | |
log.append('Start artist features extraction') | |
artist_id_uni=list(track_['Artist_uri']) | |
artist_ = pd.DataFrame() | |
artist_features = sp.artists(artist_id_uni) | |
artist_df = pd.DataFrame(artist_id_uni, columns=['Artist_uri']) | |
artist_pop = artist_features['artists'][0]["popularity"] | |
artist_genres = artist_features['artists'][0]["genres"] | |
artist_df["Artist_pop"] = artist_pop | |
if artist_genres: | |
artist_df["genres"] = " ".join([re.sub(' ', '_', i) for i in artist_genres]) | |
else: | |
artist_df["genres"] = "unknown" | |
artist_ = pd.concat([artist_, artist_df], axis=0) | |
try: | |
test = pd.DataFrame(track_, columns=['Track_uri', 'Artist_uri', 'Album_uri']) | |
test.rename(columns={'Track_uri': 'track_uri','Artist_uri': 'artist_uri', 'Album_uri': 'album_uri'}, inplace=True) | |
audio_features.drop(columns=['type', 'uri', 'track_href', 'analysis_url'], axis=1, inplace=True) | |
test = pd.merge(test, audio_features,left_on="track_uri", right_on="id", how='outer') | |
test = pd.merge(test, track_, left_on="track_uri",right_on="Track_uri", how='outer') | |
test = pd.merge(test, artist_, left_on="artist_uri",right_on="Artist_uri", how='outer') | |
test.rename(columns={'genres': 'Artist_genres'}, inplace=True) | |
test.drop(columns=['Track_uri', 'Artist_uri_x','Artist_uri_y', 'Album_uri', 'id'], axis=1, inplace=True) | |
test.dropna(axis=0, inplace=True) | |
test['Track_pop'] = test['Track_pop'].apply(lambda x: int(x/5)) | |
test['Artist_pop'] = test['Artist_pop'].apply(lambda x: int(x/5)) | |
test['Track_release_date'] = test['Track_release_date'].apply(lambda x: x.split('-')[0]) | |
test['Track_release_date'] = test['Track_release_date'].astype('int16') | |
test['Track_release_date'] = test['Track_release_date'].apply(lambda x: int(x/5)) | |
test[['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']] = test[['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature']].astype('float16') | |
test[['duration_ms']] = test[['duration_ms']].astype('float32') | |
test[['Track_release_date', 'Track_pop', 'Artist_pop']] = test[['Track_release_date', 'Track_pop', 'Artist_pop']].astype('int8') | |
except Exception as e: | |
log.append(e) | |
log.append('Finish extraction') | |
grow = test.copy() | |
test['Artist_genres'] = test['Artist_genres'].apply(lambda x: x.split(" ")) | |
tfidf = TfidfVectorizer(max_features=max_gen) | |
tfidf_matrix = tfidf.fit_transform(test['Artist_genres'].apply(lambda x: " ".join(x))) | |
genre_df = pd.DataFrame(tfidf_matrix.toarray()) | |
genre_df.columns = ['genre' + "|" +i for i in tfidf.get_feature_names_out()] | |
genre_df = genre_df.astype('float16') | |
test.drop(columns=['Artist_genres'], axis=1, inplace=True) | |
test = pd.concat([test.reset_index(drop=True),genre_df.reset_index(drop=True)], axis=1) | |
Fresult = pd.DataFrame() | |
x = 1 | |
for i in range(int(lendf/2), lendf+1, int(lendf/2)): | |
try: | |
df = pd.read_csv('data/streamlit.csv',names= col_name,dtype=dtypes,skiprows=x,nrows=i) | |
log.append('reading data frame chunks from {} to {}'.format(x,i)) | |
except Exception as e: | |
log.append('Failed to load grow') | |
log.append(e) | |
grow = grow[~grow['track_uri'].isin(df['track_uri'].values)] | |
df = df[~df['track_uri'].isin(test['track_uri'].values)] | |
df['Artist_genres'] = df['Artist_genres'].apply(lambda x: x.split(" ")) | |
tfidf_matrix = tfidf.transform(df['Artist_genres'].apply(lambda x: " ".join(x))) | |
genre_df = pd.DataFrame(tfidf_matrix.toarray()) | |
genre_df.columns = ['genre' + "|" +i for i in tfidf.get_feature_names_out()] | |
genre_df = genre_df.astype('float16') | |
df.drop(columns=['Artist_genres'], axis=1, inplace=True) | |
df = pd.concat([df.reset_index(drop=True), | |
genre_df.reset_index(drop=True)], axis=1) | |
del genre_df | |
try: | |
df.drop(columns=['genre|unknown'], axis=1, inplace=True) | |
test.drop(columns=['genre|unknown'], axis=1, inplace=True) | |
except: | |
log.append('genre|unknown not found') | |
log.append('Scaling the data .....') | |
if x == 1: | |
sc = pickle.load(open('data/sc.sav','rb')) | |
df.iloc[:, 3:19] = sc.transform(df.iloc[:, 3:19]) | |
test.iloc[:, 3:19] = sc.transform(test.iloc[:, 3:19]) | |
log.append("Creating playlist vector") | |
playvec = pd.DataFrame(test.sum(axis=0)).T | |
else: | |
df.iloc[:, 3:19] = sc.transform(df.iloc[:, 3:19]) | |
x = i | |
if model == 'Model 1': | |
df['sim']=cosine_similarity(df.drop(['track_uri', 'artist_uri', 'album_uri'], axis = 1),playvec.drop(['track_uri', 'artist_uri', 'album_uri'], axis = 1)) | |
df['sim2']=cosine_similarity(df.iloc[:,16:-1],playvec.iloc[:,16:]) | |
df['sim3']=cosine_similarity(df.iloc[:,19:-2],playvec.iloc[:,19:]) | |
df = df.sort_values(['sim3','sim2','sim'],ascending = False,kind='stable').groupby('artist_uri').head(same_art).head(5) | |
Fresult = pd.concat([Fresult, df], axis=0) | |
Fresult = Fresult.sort_values(['sim3', 'sim2', 'sim'],ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).head(5) | |
elif model == 'Model 2': | |
df['sim'] = cosine_similarity(df.iloc[:, 3:16], playvec.iloc[:, 3:16]) | |
df['sim2'] = cosine_similarity(df.loc[:, df.columns.str.startswith('T') | df.columns.str.startswith('A')], playvec.loc[:, playvec.columns.str.startswith('T') | playvec.columns.str.startswith('A')]) | |
df['sim3'] = cosine_similarity(df.loc[:, df.columns.str.startswith('genre')], playvec.loc[:, playvec.columns.str.startswith('genre')]) | |
df['sim4'] = (df['sim']+df['sim2']+df['sim3'])/3 | |
df = df.sort_values(['sim4'], ascending=False,kind='stable').groupby('artist_uri').head(same_art).head(5) | |
Fresult = pd.concat([Fresult, df], axis=0) | |
Fresult = Fresult.sort_values(['sim4'], ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).head(5) | |
del test | |
try: | |
del df | |
log.append('Getting Result') | |
except: | |
log.append('Getting Result') | |
if model == 'Model 1': | |
Fresult = Fresult.sort_values(['sim3', 'sim2', 'sim'],ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).track_uri.head(5) | |
elif model == 'Model 2': | |
Fresult = Fresult.sort_values(['sim4'], ascending=False,kind='stable') | |
Fresult.drop_duplicates(subset=['track_uri'], inplace=True,keep='first') | |
Fresult = Fresult.groupby('artist_uri').head(same_art).track_uri.head(5) | |
log.append('{} New Tracks Found'.format(len(grow))) | |
if(len(grow)>=1): | |
try: | |
new=pd.read_csv('data/new_tracks.csv',dtype=dtypes) | |
new=pd.concat([new, grow], axis=0) | |
new=new[new.Track_pop >0] | |
new.drop_duplicates(subset=['track_uri'], inplace=True,keep='last') | |
new.to_csv('data/new_tracks.csv',index=False) | |
except: | |
grow.to_csv('data/new_tracks.csv', index=False) | |
log.append('Model run successfully') | |
except Exception as e: | |
log.append("Model Failed") | |
log.append(e) | |
return Fresult, log | |
def update_dataset(): | |
col_name= ['track_uri', 'artist_uri', 'album_uri', 'danceability', 'energy', 'key', | |
'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', | |
'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature', | |
'Track_release_date', 'Track_pop', 'Artist_pop', 'Artist_genres'] | |
dtypes = {'track_uri': 'object', 'artist_uri': 'object', 'album_uri': 'object', 'danceability': 'float16', 'energy': 'float16', 'key': 'float16', | |
'loudness': 'float16', 'mode': 'float16', 'speechiness': 'float16', 'acousticness': 'float16', 'instrumentalness': 'float16', | |
'liveness': 'float16', 'valence': 'float16', 'tempo': 'float16', 'duration_ms': 'float32', 'time_signature': 'float16', | |
'Track_release_date': 'int8', 'Track_pop': 'int8', 'Artist_pop': 'int8', 'Artist_genres': 'object'} | |
df = pd.read_csv('data/streamlit.csv',dtype=dtypes) | |
grow = pd.read_csv('data/new_tracks.csv',dtype=dtypes) | |
cur = len(df) | |
df=pd.concat([df,grow],axis=0) | |
grow=pd.DataFrame(columns=col_name) | |
grow.to_csv('data/new_tracks.csv',index=False) | |
df=df[df.Track_pop >0] | |
df.drop_duplicates(subset=['track_uri'],inplace=True,keep='last') | |
df.dropna(axis=0,inplace=True) | |
df.to_csv('data/streamlit.csv',index=False) | |
return (len(df)-cur) | |