Spaces:
Running
Running
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
import pickle | |
import os | |
from collections import defaultdict | |
import random | |
import warnings | |
import logging | |
from langchain_community.embeddings import HuggingFaceBgeEmbeddings | |
from langchain_community.vectorstores import FAISS | |
warnings.filterwarnings("ignore") | |
random.seed(5153) | |
logging.basicConfig(level=logging.DEBUG) | |
class Model: | |
def __init__(self): | |
self.cache_path = "Data/cache.pkl" | |
self.is_loaded = False | |
self.dataset = None | |
self.predictions = None | |
self.user_details = None | |
self.temp_store = None | |
self.pipeline = None | |
self.chosen_books_per_user = None | |
self.all_books = pd.read_csv("Data/books.csv") | |
logging.info("Initialized model") | |
def run_predictions_on_full_test(self): | |
if self.is_loaded: | |
logging.info("Model is already loaded") | |
return | |
if self.does_cache_exist(): | |
logging.info("Retrieving cached full-test predictions") | |
self.retrieve_cache() | |
logging.info("Completed full-test") | |
return | |
logging.info("Generating full-test predictions") | |
reviews_df = pd.read_csv("Data/final_dataset/reviews_test.csv") | |
good_reviews = reviews_df[reviews_df['rating'] > 3] | |
good_user_books_dict = good_reviews.groupby('user_id')['book_id'].unique().apply(list).to_dict() | |
# to further minimize compute time, we only use 20 (randomly sampled) users | |
num_random_users = 20 | |
randomly_sampled_users = random.sample(list(good_user_books_dict.keys()), num_random_users) | |
sampled_good_user_books_dict = {user_id: good_user_books_dict[user_id] for user_id in randomly_sampled_users} | |
# to minimize compute time, we take only 150 random (good) books per user | |
# prepare it in the form of user_id -> list[book_id] | |
num_rand_books_per_user = 150 | |
chosen_books_per_user = { | |
user_id: random.sample(books, min(len(books), num_rand_books_per_user)) | |
for user_id, books in sampled_good_user_books_dict.items() | |
} | |
# save this for reference | |
self.chosen_books_per_user = chosen_books_per_user | |
# run predictions on all of the above users | |
self.prepare_predictions(chosen_books_per_user) | |
logging.info("Caching full-test predictions") | |
self.cache_results() | |
logging.info("Completed full-test") | |
def run_prediction_on_adhoc_user(self, chosen_book_ids): | |
self.prepare_predictions( | |
{'current_user': chosen_book_ids} | |
) | |
def prepare_predictions(self, target_users_and_books): | |
""" | |
Given a dictionary of user_id to list[book_id], where the list of book IDs are the books favored by | |
the associated user, this function returns the recommended books for each user provided in the dictionary | |
:param target_users_and_books: Dictionary of user ID to favored books (as book IDs) | |
:return: Dataframe of user IDs and associated recommended books, plus individual model scores | |
""" | |
target_user_list = list(target_users_and_books.keys()) | |
file_dict = {} | |
for filename in ['reviews_test', 'users_test', 'reviews_sub']: | |
file_dict[filename] = pd.read_csv(f'Data/final_dataset/{filename}.csv') | |
file_dict['users'] = file_dict['users_test'] | |
file_dict['reviews'] = file_dict['reviews_test'] | |
file_dict['good_reviews'] = file_dict['reviews'][file_dict['reviews']['rating'] > 3] | |
file_dict['books'] = pd.read_csv('Data/books.csv') | |
################################################################################# | |
# GENRE MODEL; DESCRIPTION MODEL; TITLE MODEL; BOOK STATS CLUSTER MODEL | |
################################################################################# | |
clusterbooks = pd.DataFrame( | |
np.load('Data/Recommended Storage/cluster_books.npy', allow_pickle=True), | |
columns=['target_book', 'recco_book_id', 'similarity_score']).astype(float) # wasn't saved as float | |
genrebooks = pd.DataFrame( | |
np.load('Data/Recommended Storage/genres_books.npy', allow_pickle=True), | |
columns=['target_book', 'recco_book_id', 'similarity_score']) | |
descbooks = pd.DataFrame( | |
np.load('Data/Recommended Storage/description_books.npy', allow_pickle=True), | |
columns=['target_book', 'recco_book_id', 'similarity_score']) | |
revbooks = pd.DataFrame( | |
np.load('Data/Recommended Storage/reviews_books_new.npy', allow_pickle=True), | |
columns=['target_book', 'recco_book_id', 'similarity_score']) | |
def optimized_converter(simbooks, user_id_list, name, prog_bar_description): | |
user_ratings_list = pd.DataFrame(columns=['user_id', 'recco_book_id', 'similarity_score']) | |
for curr_user_id in tqdm(user_id_list, desc=prog_bar_description): | |
curr_user_books = pd.Series(target_users_and_books[curr_user_id]) | |
relevant_simbooks = simbooks[simbooks['target_book'].isin(curr_user_books)] | |
summed_scores = relevant_simbooks.groupby('recco_book_id')['similarity_score'].sum().reset_index() | |
summed_scores['user_id'] = curr_user_id | |
if not curr_user_books.empty: | |
summed_scores = summed_scores[~summed_scores['recco_book_id'].isin(curr_user_books)] | |
# TODO: Think about how to adjust this for small number of books | |
summed_scores['similarity_score'] /= len(curr_user_books) | |
top_30 = summed_scores.nlargest(30, 'similarity_score') | |
user_ratings_list = pd.concat([user_ratings_list, top_30], ignore_index=True) | |
return user_ratings_list.rename(columns={'recco_book_id': 'book_id', 'similarity_score': name}) | |
genre_users = optimized_converter(genrebooks, target_user_list, 'gen_score', "Generating recs (genre)") | |
cluster_users = optimized_converter(clusterbooks, target_user_list, 'clus_score', | |
"Generating recs (book stats cluster)") | |
description_users = optimized_converter(descbooks, target_user_list, 'desc_score', | |
"Generating recs (description)") | |
reviews_users = optimized_converter(revbooks, target_user_list, 'rev_score', "Generating recs (reviews)") | |
################################################################################# | |
# USER SIMILARITY CLUSTERING MODEL | |
################################################################################# | |
def jaccard_similarity_pandas(target_user, reviews_sub, n): | |
target_user_books = target_users_and_books[target_user] | |
relevant_reviews = reviews_sub[reviews_sub['book_id'].isin(target_user_books)] | |
intersections = relevant_reviews.groupby('user_id').size() | |
# all_books = pd.concat( | |
# [df[df['user_id'] == target_user]['book_id'], reviews_sub['book_id']]).drop_duplicates() | |
user_book_counts = reviews_sub.groupby('user_id')['book_id'].nunique() | |
unions = len(target_user_books) + user_book_counts - intersections | |
jaccard_index = intersections / unions | |
top_n_users = jaccard_index.nlargest(n) | |
return top_n_users.reset_index().values.tolist() | |
def recommend_books(target_user_id, reviews_sub, num_books): | |
# df = reviews_sub[(reviews_sub['rating'].isin([4, 5]))] | |
top_n_similar_users = jaccard_similarity_pandas(target_user_id, reviews_sub, n=20) | |
target_user_books = target_users_and_books[target_user_id] | |
similar_users_reviews = reviews_sub[reviews_sub['user_id'].isin([user[0] for user in top_n_similar_users])] | |
recommended_books = defaultdict(float) | |
for curr_user_id, similarity_score in top_n_similar_users: | |
user_reviews = similar_users_reviews[similar_users_reviews['user_id'] == curr_user_id] | |
for _, row in user_reviews.iterrows(): | |
if row['book_id'] not in target_user_books: | |
recommended_books[row['book_id']] += similarity_score | |
# Return top recommended books sorted by score | |
sorted_recommended_books = sorted(recommended_books.items(), key=lambda x: x[1], reverse=True) | |
return [(target_user_id, book_id, book_score) for book_id, book_score in | |
sorted_recommended_books[:num_books]] | |
all_recommendations = [] | |
for each_user_id in tqdm(target_user_list, desc="Generating recs (users)"): | |
recommendations = recommend_books(each_user_id, file_dict['reviews_sub'], 30) | |
all_recommendations.extend(recommendations) | |
user_users = pd.DataFrame(all_recommendations, columns=['user_id', 'book_id', 'user_score']) | |
user_users.head() | |
################################################################################# | |
# TITLE SIMILARITY MODEL | |
################################################################################# | |
store = FAISS.load_local( | |
"Data/faiss_store", | |
HuggingFaceBgeEmbeddings( | |
model_kwargs={"device": "cpu"}, | |
encode_kwargs={"normalize_embeddings": True} | |
), | |
allow_dangerous_deserialization=True | |
) | |
title_output = [] | |
for user_id, books in tqdm(target_users_and_books.items(), desc="Generating recs (title)"): | |
user_book_id = target_users_and_books[user_id] | |
user_books = file_dict['books'][(file_dict['books']['book_id'].isin(user_book_id))] | |
titles = '\n'.join(user_books['title_without_series']) # Using titles without series for queries | |
results = store.similarity_search_with_score(titles, k=80) | |
for result, score in results: | |
if result.metadata.get('book_id') not in user_books: | |
title_output.append([user_id, result.metadata.get('book_id'), 1 - score]) | |
# Save formatted | |
title_users = pd.DataFrame(title_output, columns=['user_id', 'book_id', 'tit_score']) | |
################################################################################# | |
# COMBINING MODEL OUTPUTS | |
################################################################################# | |
self.temp_store = { | |
'cluster': cluster_users, | |
'genre': genre_users, | |
'desc': description_users, | |
'reviews': reviews_users, | |
'users': user_users, | |
'title': title_users, | |
} | |
combined_df = pd.merge(cluster_users, genre_users, on=['user_id', 'book_id'], how='outer') | |
combined_df = pd.merge(combined_df, description_users, on=['user_id', 'book_id'], how='outer') | |
combined_df = pd.merge(combined_df, reviews_users, on=['user_id', 'book_id'], how='outer') | |
combined_df = pd.merge(combined_df, user_users, on=['user_id', 'book_id'], how='outer') | |
combined_df = pd.merge(combined_df, title_users, on=['user_id', 'book_id'], how='outer') | |
combined_df.fillna(0, inplace=True) | |
combined_df['book_id'] = combined_df['book_id'].astype(int) | |
combined_df['tit_score'] = combined_df['tit_score'].astype(float) | |
reviews_df = file_dict['reviews'][file_dict['reviews']['rating'].isin([1, 2, 3, 4, 5])] | |
reviews_filtered = reviews_df[['user_id', 'book_id', 'rating']] | |
combined_df = combined_df.merge(reviews_filtered, on=['user_id', 'book_id'], how='left') | |
combined_df.rename(columns={'rating': 'target'}, inplace=True) | |
combined_df['binary'] = np.where(combined_df['target'] >= 4, 1, 0) | |
# remove books which are not recommended at all | |
combined_df = combined_df[ | |
(combined_df[['clus_score', 'gen_score', 'desc_score', 'rev_score', 'user_score', 'tit_score']] != 0).any( | |
axis=1)] | |
with open("Data/final_model.pkl", 'rb') as file: | |
self.pipeline = pickle.load(file) | |
X_test = combined_df.drop(columns=['user_id', 'book_id', 'target', 'binary']) | |
predictions_df = combined_df[ | |
['user_id', 'book_id', 'clus_score', 'gen_score', 'desc_score', 'rev_score', 'user_score', | |
'tit_score', 'target', 'binary']].copy() | |
predictions_df['final_score'] = self.pipeline.predict_proba(X_test).T[1] | |
predictions_df['would_recommend'] = predictions_df['final_score'] >= 0.45 # peak f2 score at this threshold | |
predictions_df = predictions_df.sort_values(['user_id', 'final_score'], ascending=[True, False]) | |
self.dataset = combined_df | |
self.predictions = predictions_df | |
def prepare_user_details(self): | |
users_list = self.dataset['user_id'].unique() | |
users_df = pd.read_csv("Data/final_dataset/users_test.csv") | |
books_df = pd.read_csv("Data/final_dataset/books_test.csv") | |
# filter to keep only relevant users | |
users_df = users_df[users_df['user_id'].isin(users_list)] | |
# merge to get book and review data | |
full_df = users_df.merge(books_df, on="user_id") | |
user_details = pd.DataFrame() | |
top_books_per_user = full_df.groupby("user_id").apply( | |
lambda x: x.sort_values('rating').nlargest(n=5, columns='rating')['title_without_series'].tolist()) | |
user_details['top_books'] = top_books_per_user | |
self.user_details = user_details | |
def get_user_predictions(self, chosen_user): | |
logging.info(f"Generating predictions for user: {chosen_user}") | |
user_predictions = self.predictions[self.predictions['user_id'] == chosen_user] | |
user_predictions = user_predictions.dropna(subset=['target']) | |
if len(user_predictions) == 0: | |
logging.info(f"No predictions hit! Exiting early") | |
return None | |
# transform model scores using the pipeline (scaler + logistic regression coefficients) | |
# specifically, apply scaler then apply linear layer of logistic regression | |
model_score_cols = [c for c in user_predictions.columns if c.endswith('_score') and c != 'final_score'] | |
scaled_model_scores = self.pipeline['scaler'].transform(user_predictions[model_score_cols]) | |
multed_model_scores = scaled_model_scores * self.pipeline['classifier'].coef_[0] | |
final_model_scores = pd.DataFrame(multed_model_scores, columns=model_score_cols) | |
final_model_scores['intercept'] = self.pipeline['classifier'].intercept_[0] | |
columns = ['book_id', 'target', 'final_score', 'would_recommend'] | |
predictions_and_score = pd.concat( | |
[user_predictions[columns].reset_index(drop=True), final_model_scores], | |
axis=1 | |
) | |
return predictions_and_score.merge(self.all_books[['book_id', 'title_without_series']], on='book_id') | |
def cache_results(self): | |
with open(self.cache_path, 'wb+') as f: | |
to_pickle = dict() | |
to_pickle['dataset'] = self.dataset | |
to_pickle['predictions'] = self.predictions | |
to_pickle['temp_store'] = self.temp_store | |
to_pickle['pipeline'] = self.pipeline | |
to_pickle['chosen_books'] = self.chosen_books_per_user | |
# to_pickle['user_details'] = self.user_details | |
pickle.dump(to_pickle, f) | |
self.is_loaded = True | |
def does_cache_exist(self): | |
return os.path.exists(self.cache_path) | |
def retrieve_cache(self): | |
with open(self.cache_path, 'rb') as f: | |
unpickled = pickle.load(f) | |
for key, val in unpickled.items(): | |
exec(f"self.{key} = val") | |
self.is_loaded = True | |