|
import pandas as pd |
|
from surprise import Dataset, Reader, SVD |
|
from surprise.model_selection import train_test_split |
|
from datetime import datetime, timedelta |
|
import pickle |
|
import random |
|
from sqlalchemy.orm import Session |
|
from sqlalchemy import func |
|
from typing import List |
|
from orders.models import Order, Meal, RecommendationModel |
|
from users.models import User |
|
|
|
class MealRecommender: |
|
def __init__(self, db: Session): |
|
self.db = db |
|
self.retrain_interval = timedelta(days=1) |
|
self.algo = self.load_or_train_model() |
|
|
|
def fetch_data(self): |
|
|
|
batch_size = 1000 |
|
offset = 0 |
|
data = [] |
|
while True: |
|
batch = self.db.query(Order.user_id, Order.meal_id, Order.quantity).offset(offset).limit(batch_size).all() |
|
if not batch: |
|
break |
|
data.extend(batch) |
|
offset += batch_size |
|
return pd.DataFrame(data, columns=['user_id', 'meal_id', 'quantity']) |
|
|
|
def train_model(self): |
|
data = self.fetch_data() |
|
if data.empty: |
|
return None |
|
|
|
reader = Reader(rating_scale=(1, 5)) |
|
dataset = Dataset.load_from_df(data[['user_id', 'meal_id', 'quantity']], reader) |
|
|
|
trainset = dataset.build_full_trainset() |
|
algo = SVD() |
|
algo.fit(trainset) |
|
|
|
|
|
model_binary = pickle.dumps(algo) |
|
model_record = RecommendationModel(model=model_binary, created_at=datetime.now()) |
|
self.db.add(model_record) |
|
self.db.commit() |
|
|
|
return algo |
|
|
|
def load_or_train_model(self): |
|
latest_model = self.db.query(RecommendationModel).order_by(RecommendationModel.created_at.desc()).first() |
|
|
|
if latest_model and datetime.now() - latest_model.created_at <= self.retrain_interval: |
|
return pickle.loads(latest_model.model) |
|
else: |
|
return self.train_model() |
|
|
|
def get_recommendations(self, user: User): |
|
if self.algo is None: |
|
return self.get_random_recommendations() |
|
|
|
all_meals = self.db.query(Meal).all() |
|
meal_ids = [meal.id for meal in all_meals] |
|
|
|
predictions = [self.algo.predict(str(user.id), str(meal_id)) for meal_id in meal_ids] |
|
sorted_predictions = sorted(predictions, key=lambda x: x.est, reverse=True) |
|
top_recommendations = self.db.query(Meal).filter(Meal.id.in_([int(pred.iid) for pred in sorted_predictions[:20]])).all() |
|
|
|
top_recommendations = self.adjust_for_preferences(user, top_recommendations) |
|
|
|
return top_recommendations[:5] |
|
|
|
def adjust_for_preferences(self, user: User, recommendations: List[Meal]) -> List[Meal]: |
|
preferences = user.preferences if user.preferences else [] |
|
preference_scores = {meal.id: 0 for meal in recommendations} |
|
|
|
for meal in recommendations: |
|
for preferred in preferences: |
|
if preferred.lower() in meal.name.lower() or preferred.lower() in meal.description.lower(): |
|
preference_scores[meal.id] += 1 |
|
|
|
sorted_recommendations = sorted(recommendations, key=lambda meal: preference_scores[meal.id], reverse=True) |
|
|
|
return sorted_recommendations |
|
|
|
def get_random_recommendations(self): |
|
all_meals = self.db.query(Meal).all() |
|
return random.sample(all_meals, min(5, len(all_meals))) |
|
|