| import numpy as np
|
| import pandas as pd
|
| from sklearn.neighbors import KNeighborsClassifier
|
| from sklearn.preprocessing import MultiLabelBinarizer,LabelEncoder,MinMaxScaler
|
| from sklearn.feature_extraction.text import TfidfVectorizer
|
| import joblib
|
| from sklearn.decomposition import TruncatedSVD
|
| from sklearn.metrics import classification_report
|
| from xgboost import XGBClassifier
|
| import nltk
|
| from nltk.tokenize import word_tokenize
|
| from nltk.corpus import stopwords
|
| from nltk.stem import WordNetLemmatizer
|
| from nltk.tag import pos_tag
|
| import string
|
| import re
|
| import os
|
| nltk.download('punkt')
|
| nltk.download('averaged_perceptron_tagger_eng')
|
| nltk.download('wordnet')
|
| nltk.download('stopwords')
|
| nltk.download('averaged_perceptron_tagger')
|
|
|
| class CollaborativeRecommender:
|
| def __init__(self, svd_matrix, item_to_index, index_to_item):
|
| """
|
| svd_matrix: 2D numpy array (items x latent features)
|
| item_to_index: dict mapping app_id to row index in svd_matrix
|
| index_to_item: dict mapping row index to app_id
|
| """
|
| self.svd_matrix : TruncatedSVD = svd_matrix
|
| self.item_to_index = item_to_index
|
| self.index_to_item = index_to_item
|
|
|
| def save(self, path: str):
|
| """Save the entire model as a single file using joblib."""
|
| joblib.dump(self, path)
|
|
|
| @staticmethod
|
| def load(path: str):
|
| """Load the entire model from a joblib file."""
|
| return joblib.load(path)
|
|
|
| def _get_item_vector(self, app_id):
|
| idx = self.item_to_index.get(app_id)
|
| if idx is None:
|
| raise ValueError(f"app_id {app_id} not found in the model.")
|
| return self.svd_matrix[idx]
|
|
|
| def _cosine_similarity(self, vec, matrix):
|
|
|
| vec_norm = np.linalg.norm(vec)
|
| matrix_norms = np.linalg.norm(matrix, axis=1)
|
| similarity = (matrix @ vec) / (matrix_norms * vec_norm + 1e-10)
|
| return similarity
|
|
|
| def get_similarities(self, app_ids,top_n=None):
|
| """
|
| Input: app_ids - single app_id or list of app_ids
|
| Output: DataFrame with columns ['app_id', 'similarity'] sorted by similarity descending
|
| """
|
| if isinstance(app_ids, (str, int)):
|
| app_ids = [app_ids]
|
| elif not isinstance(app_ids, (list, tuple, np.ndarray)):
|
| raise TypeError("app_ids must be a string/int or a list of such")
|
|
|
| valid_vectors = []
|
| missing_ids = []
|
| for app_id in app_ids:
|
| try:
|
| vec = self._get_item_vector(app_id)
|
| valid_vectors.append(vec)
|
| except ValueError:
|
| missing_ids.append(app_id)
|
|
|
| if len(valid_vectors) == 0:
|
| raise ValueError("None of the input app_ids were found in the model.")
|
|
|
|
|
| aggregated_vec = np.mean(valid_vectors, axis=0)
|
|
|
|
|
| similarities = self._cosine_similarity(aggregated_vec, self.svd_matrix)
|
|
|
|
|
| result_df = pd.DataFrame({
|
| 'app_id': [self.index_to_item[i] for i in range(len(similarities))],
|
| 'collaborative_similarity': similarities
|
| })
|
|
|
|
|
| result_df = result_df[~result_df['app_id'].isin(app_ids)]
|
|
|
|
|
| result_df = result_df.sort_values('collaborative_similarity', ascending=False).reset_index(drop=True)
|
|
|
|
|
| if missing_ids:
|
| print(f"Warning: These app_ids were not found in the model and ignored: {missing_ids}")
|
| if top_n:
|
| return result_df.head(top_n)
|
| else:
|
| return result_df
|
|
|
| class GameContentRecommender:
|
| def __init__(self,model,genre_encoder,category_encoder,price_range_encoder,scaler,app_id_encoder):
|
| self.model : KNeighborsClassifier = model
|
| self.genre_encoder : MultiLabelBinarizer = genre_encoder
|
| self.category_encoder : MultiLabelBinarizer = category_encoder
|
| self.price_range_encoder : LabelEncoder = price_range_encoder
|
| self.scaler : MinMaxScaler = scaler
|
| self.app_id_encoder : LabelEncoder = app_id_encoder
|
|
|
| def save(self, path: str):
|
| """Save the entire model as a single file using joblib."""
|
| joblib.dump(self, path)
|
|
|
| @staticmethod
|
| def load(path: str):
|
| """Load the entire model from a joblib file."""
|
| return joblib.load(path)
|
|
|
| def predict(self, price_range, year_release, average_playtime, game_score, dlc_count, genres, categories, top_n=None):
|
| genre_dict = {g: 0 for g in self.genre_encoder.classes_}
|
| categories_dict = {c: 0 for c in self.category_encoder.classes_}
|
|
|
| for genre in genres:
|
| if genre != 'Unknown' and genre in genre_dict:
|
| genre_dict[genre] = 1
|
|
|
| for category in categories:
|
| if category != 'Unknown' and category in categories_dict:
|
| categories_dict[category] = 1
|
|
|
| price_range = self.price_range_encoder.transform(np.array(price_range).reshape(-1, 1))
|
| scaled_features = self.scaler.transform(np.array([[year_release, average_playtime, game_score, dlc_count]]))[0]
|
|
|
| user_vector = list(scaled_features) + list(price_range) + list(genre_dict.values()) + list(categories_dict.values())
|
|
|
| user_df = pd.DataFrame([user_vector])
|
|
|
| distances, indices = self.model.kneighbors(user_df)
|
| distances = distances.flatten()
|
| indices = indices.flatten()
|
|
|
| similarity = 1 / (1 + distances)
|
|
|
| app_ids = self.app_id_encoder.inverse_transform(indices)
|
|
|
| prediction = pd.DataFrame({
|
| 'app_id': app_ids,
|
| 'content_probability': similarity
|
| })
|
|
|
| if top_n:
|
| prediction = prediction.head(top_n)
|
|
|
| return prediction
|
|
|
|
|
|
|
| class TextBasedRecommendation():
|
| def __init__(self,classifier,vectorizer,app_id_encoder,history):
|
| self.classifier : XGBClassifier = classifier
|
| self.vectorizer : TfidfVectorizer = vectorizer
|
| self.app_id_encoder : LabelEncoder = app_id_encoder
|
| self.history = history
|
|
|
| def save(self, path_prefix: str):
|
| self.classifier.save_model(f"{path_prefix}_xgb.json")
|
|
|
| classifier_backup = self.classifier
|
| self.classifier = None
|
|
|
| joblib.dump(self, f"{path_prefix}_preprocessor.joblib")
|
|
|
| self.classifier = classifier_backup
|
|
|
| @staticmethod
|
| def load(path_prefix: str):
|
| obj = joblib.load(f"{path_prefix}_preprocessor.joblib")
|
| xgb = XGBClassifier()
|
| xgb.load_model(f"{path_prefix}_xgb.json")
|
| obj.classifier = xgb
|
|
|
| return obj
|
|
|
| def preprocess(self,text : str):
|
| stopword = stopwords.words('english')
|
| lemmatizer = WordNetLemmatizer()
|
| def convert_postag(postag:str):
|
| if postag.startswith('V'):
|
| return 'v'
|
| elif postag.startswith('R'):
|
| return 'r'
|
| elif postag.startswith('J'):
|
| return 'a'
|
| return 'n'
|
|
|
| def clean_space(text : str):
|
| if not isinstance(text, str):
|
| return ''
|
| cleaned = re.sub(r'\s+', ' ', text.replace('\n', ' ')).strip()
|
| return cleaned
|
|
|
| def tokenize(text : str):
|
| text = text.lower()
|
| text = clean_space(text)
|
| token = word_tokenize(text)
|
| token = [word for word in token if word not in
|
| string.punctuation and word not in stopword and word.isalpha()]
|
| return token
|
|
|
|
|
| def lemmatizing(token : str):
|
| postag = pos_tag(token)
|
| lemmatized = [lemmatizer.lemmatize(word,convert_postag(tag)) for word,tag in postag]
|
| return lemmatized
|
|
|
| token = tokenize(text)
|
| token = lemmatizing(token)
|
| return " ".join(token)
|
|
|
| def get_accuracy(self,X_test,y_test):
|
| y_pred = self.classifier.predict(self.vectorizer.transform(X_test))
|
| y_test = self.app_id_encoder.transform(y_test)
|
| print(classification_report(y_test,y_pred))
|
|
|
| def predict(self,text,top_n=None):
|
| cleaned_text = self.preprocess(text)
|
| vectorized_text = self.vectorizer.transform([cleaned_text])
|
| proba = self.classifier.predict_proba(vectorized_text)[0]
|
| class_indices = np.argsort(proba)[::-1]
|
| if top_n is not None:
|
| class_indices = class_indices[:top_n]
|
| class_labels = self.app_id_encoder.inverse_transform(class_indices)
|
| class_probs = proba[class_indices]
|
| return pd.DataFrame({
|
| 'app_id': class_labels,
|
| 'text_probability': class_probs
|
| })
|
|
|
| class GameRecommendationEnsemble:
|
| def __init__(self,game_content_recommeder,collaborative_recommender,text_based_recommender):
|
| self.game_content_recommeder : GameContentRecommender=game_content_recommeder
|
| self.collaborative_recommender : CollaborativeRecommender=collaborative_recommender
|
| self.text_based_recommender : TextBasedRecommendation = text_based_recommender
|
|
|
| def save(self, dir_path: str):
|
| os.makedirs(dir_path, exist_ok=True)
|
| self.game_content_recommeder.save(os.path.join(dir_path, "game_content_recommender.joblib"))
|
| self.collaborative_recommender.save(os.path.join(dir_path, "collaborative_recommender.joblib"))
|
| self.text_based_recommender.save(os.path.join(dir_path, "text_based_recommender"))
|
|
|
| @staticmethod
|
| def load(dir_path: str):
|
| game_content_recommender = GameContentRecommender.load(os.path.join(dir_path, "game_content_recommender.joblib"))
|
| collaborative_recommender = CollaborativeRecommender.load(os.path.join(dir_path, "collaborative_recommender.joblib"))
|
| text_based_recommender = TextBasedRecommendation.load(os.path.join(dir_path, "text_based_recommender"))
|
|
|
| return GameRecommendationEnsemble(
|
| game_content_recommender,
|
| collaborative_recommender,
|
| text_based_recommender
|
| )
|
|
|
| def scale_proba(self,series):
|
| if len(series)<=1:
|
| return pd.Series([1.0] * len(series), index=series.index)
|
| scaler = MinMaxScaler()
|
| scaled = scaler.fit_transform(series.values.reshape(-1, 1)).flatten()
|
| return pd.Series(scaled, index=series.index)
|
|
|
| def predict(self, description=None, app_ids=None, price_range=None, year_release=None,
|
| average_playtime=None, game_score=None, dlc_count=None,
|
| genres=None, categories=None, top_n=None,
|
| weight_text=1.0, weight_collab=1.0, weight_content=1.0):
|
|
|
| merge_dfs = []
|
| if description is not None:
|
| text_proba = self.text_based_recommender.predict(description)
|
| text_proba['app_id'] = text_proba['app_id'].astype(str)
|
| text_proba['text_probability'] = self.scale_proba(text_proba['text_probability'])
|
| merge_dfs.append(text_proba)
|
| else:
|
| weight_text=0
|
|
|
|
|
| if app_ids is not None:
|
| similar_app = self.collaborative_recommender.get_similarities(app_ids)
|
| similar_app['app_id'] = similar_app['app_id'].astype(str)
|
| similar_app['collaborative_similarity'] = self.scale_proba(similar_app['collaborative_similarity'])
|
| merge_dfs.append(similar_app)
|
| else:
|
| weight_collab = 0
|
|
|
| if None in (price_range, year_release,average_playtime,game_score,dlc_count, genres, categories):
|
| weight_content=0
|
| else:
|
| similar_content = self.game_content_recommeder.predict(price_range, year_release,average_playtime,game_score,dlc_count, genres, categories)
|
| similar_content['app_id'] = similar_content['app_id'].astype(str)
|
| similar_content['content_probability'] = self.scale_proba(similar_content['content_probability'])
|
| merge_dfs.append(similar_content)
|
|
|
| if not merge_dfs:
|
| return None
|
|
|
| from functools import reduce
|
| merged = reduce(lambda left, right: pd.merge(left, right, on='app_id', how='outer'), merge_dfs)
|
|
|
|
|
| merged = merged.fillna(0)
|
|
|
|
|
| def compute_aggregated_score(df, w_text, w_collab, w_content):
|
|
|
| total_weight = w_text + w_collab + w_content
|
| if total_weight == 0:
|
| raise ValueError("All weights are zero. At least one weight must be positive.")
|
|
|
| w_text /= total_weight
|
| w_collab /= total_weight
|
| w_content /= total_weight
|
|
|
| df['final_score'] = (
|
| df.get('text_probability', 0) * w_text +
|
| df.get('collaborative_similarity', 0) * w_collab +
|
| df.get('content_probability', 0) * w_content
|
| )
|
|
|
| return df.sort_values(by='final_score', ascending=False).reset_index(drop=True)
|
| final_df = compute_aggregated_score(merged, weight_text, weight_collab, weight_content)
|
| if top_n:
|
| return final_df.head(top_n)
|
| else:
|
| return final_df |