Implement MLOps API
Browse files- recommender.py +142 -0
recommender.py
ADDED
@@ -0,0 +1,142 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import numpy as np
|
2 |
+
import pandas as pd
|
3 |
+
from sklearn.utils.extmath import randomized_svd
|
4 |
+
import mlflow
|
5 |
+
|
6 |
+
|
7 |
+
def recommend_top_k(preds_df, ratings_df, movie, userId, k=10):
|
8 |
+
user_row = userId - 1
|
9 |
+
sorted_user_predictions = preds_df.iloc[user_row].sort_values(ascending=False)
|
10 |
+
user_data = ratings_df[ratings_df.userId == (userId)]
|
11 |
+
user_rated = user_data.merge(movie, how='left', left_on='movieId', right_on='movieId'). \
|
12 |
+
sort_values(['rating'], ascending=False)
|
13 |
+
user_preds = movie.merge(pd.DataFrame(sorted_user_predictions).reset_index(), how='left',
|
14 |
+
on='movieId').rename(columns={user_row: 'prediction'}). \
|
15 |
+
sort_values('prediction', ascending=False). \
|
16 |
+
iloc[:k, :]
|
17 |
+
return user_rated, user_preds
|
18 |
+
|
19 |
+
|
20 |
+
def precision_at_k(df, k=10, y_test: str = 'rating', y_pred='prediction'):
|
21 |
+
dfK = df.head(k)
|
22 |
+
sum_df = dfK[y_pred].sum()
|
23 |
+
true_pred = dfK[dfK[y_pred] & dfK[y_test]].shape[0]
|
24 |
+
if sum_df > 0:
|
25 |
+
return true_pred / sum_df
|
26 |
+
else:
|
27 |
+
return None
|
28 |
+
|
29 |
+
|
30 |
+
def recall_at_k(df, k=10, y_test='rating', y_pred='prediction'):
|
31 |
+
dfK = df.head(k)
|
32 |
+
sum_df = df[y_test].sum()
|
33 |
+
true_pred = dfK[dfK[y_pred] & dfK[y_test]].shape[0]
|
34 |
+
if sum_df > 0:
|
35 |
+
return true_pred / sum_df
|
36 |
+
else:
|
37 |
+
return None
|
38 |
+
|
39 |
+
|
40 |
+
class CollaborativeModel(mlflow.pyfunc.PythonModel):
|
41 |
+
|
42 |
+
def fit(self, num_components=15, threshold=2):
|
43 |
+
"""### explore datasets"""
|
44 |
+
|
45 |
+
rating = pd.read_csv('IMDB/ratings_small.csv')
|
46 |
+
movie = pd.read_csv('IMDB/movies_metadata.csv')
|
47 |
+
movie = movie.rename(columns={'id': 'movieId'})
|
48 |
+
|
49 |
+
"""### data preprocessing
|
50 |
+
|
51 |
+
There are three rows entered by mistake, so we remove that row.
|
52 |
+
"""
|
53 |
+
|
54 |
+
movie = movie[
|
55 |
+
(movie['movieId'] != '1997-08-20') & (movie['movieId'] != '2012-09-29') & (
|
56 |
+
movie['movieId'] != '2014-01-01')]
|
57 |
+
|
58 |
+
def find_names(x):
|
59 |
+
if x == '':
|
60 |
+
return ''
|
61 |
+
genre_arr = eval(str(x))
|
62 |
+
return ','.join(i['name'] for i in eval(str(x)))
|
63 |
+
|
64 |
+
movie['genres'] = movie['genres'].fillna('')
|
65 |
+
|
66 |
+
movie['genres'] = movie['genres'].apply(find_names)
|
67 |
+
|
68 |
+
movie.movieId = movie.movieId.astype("uint64")
|
69 |
+
|
70 |
+
self.movie = movie
|
71 |
+
|
72 |
+
"""only keep rating for movies with metadata in movie dataset"""
|
73 |
+
|
74 |
+
new_rating = pd.merge(rating, movie, how='inner', on=["movieId"])
|
75 |
+
|
76 |
+
new_rating = new_rating[["userId", "movieId", "rating"]]
|
77 |
+
|
78 |
+
self.new_rating = new_rating
|
79 |
+
|
80 |
+
"""### matrix factorization"""
|
81 |
+
|
82 |
+
inter_mat_df = rating.pivot(index='userId', columns='movieId', values='rating').fillna(0)
|
83 |
+
inter_mat = inter_mat_df.to_numpy()
|
84 |
+
ratings_mean = np.mean(inter_mat, axis=1)
|
85 |
+
inter_mat_normal = inter_mat - ratings_mean.reshape(-1, 1)
|
86 |
+
|
87 |
+
"""We use singular value decomposition for matrix factorization"""
|
88 |
+
|
89 |
+
svd_U, svd_sigma, svd_V = randomized_svd(inter_mat_normal,
|
90 |
+
n_components=num_components,
|
91 |
+
n_iter=5,
|
92 |
+
random_state=47)
|
93 |
+
|
94 |
+
"""This function gives the diagonal form"""
|
95 |
+
|
96 |
+
svd_sigma = np.diag(svd_sigma)
|
97 |
+
|
98 |
+
"""Making predictions"""
|
99 |
+
|
100 |
+
rating_weights = np.dot(np.dot(svd_U, svd_sigma), svd_V) + ratings_mean.reshape(-1, 1)
|
101 |
+
self.weights_df = pd.DataFrame(rating_weights, columns=inter_mat_df.columns)
|
102 |
+
|
103 |
+
def predict(self, context, model_input):
|
104 |
+
return self.my_custom_function(model_input)
|
105 |
+
|
106 |
+
def my_custom_function(self, model_input):
|
107 |
+
# do something with the model input
|
108 |
+
self.fit(15, 2)
|
109 |
+
print(model_input)
|
110 |
+
self.user_rated, self.user_preds = recommend_top_k(self.weights_df, self.new_rating, self.movie,
|
111 |
+
int(model_input), 100)
|
112 |
+
return self.user_preds
|
113 |
+
|
114 |
+
def eval_metrics(self):
|
115 |
+
df_res = self.user_preds[["movieId", "prediction"]]. \
|
116 |
+
merge(self.user_rated[["movieId", "rating"]], how='outer', on='movieId')
|
117 |
+
|
118 |
+
df_res.sort_values(by='prediction', ascending=False, inplace=True)
|
119 |
+
|
120 |
+
df_res['prediction'] = df_res['prediction'] >= threshold
|
121 |
+
df_res['rating'] = df_res['rating'] >= threshold
|
122 |
+
prec_at_k = precision_at_k(df_res, 100, y_test='rating', y_pred='prediction')
|
123 |
+
rec_at_k = recall_at_k(df_res, 100, y_test='rating', y_pred='prediction')
|
124 |
+
|
125 |
+
print("precision@k: ", prec_at_k)
|
126 |
+
print("recall@k: ", rec_at_k)
|
127 |
+
return prec_at_k, rec_at_k
|
128 |
+
|
129 |
+
|
130 |
+
c_model = CollaborativeModel()
|
131 |
+
with mlflow.start_run():
|
132 |
+
model_info = mlflow.pyfunc.log_model(artifact_path="model", python_model=c_model)
|
133 |
+
num_components = 15
|
134 |
+
threshold = 2
|
135 |
+
c_model.fit(num_components, threshold)
|
136 |
+
c_model.predict(context=pd.DataFrame([]), model_input=220)
|
137 |
+
prec_at_k, rec_at_k = c_model.eval_metrics()
|
138 |
+
mlflow.log_param("num_components", num_components)
|
139 |
+
mlflow.log_param("threshold", threshold)
|
140 |
+
mlflow.log_metric("precision_at_k", prec_at_k)
|
141 |
+
mlflow.log_metric("recall_at_k", rec_at_k)
|
142 |
+
print("Model saved in run %s" % mlflow.active_run().info.run_uuid)
|