|
|
|
|
|
|
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
import json |
|
|
import pickle |
|
|
import os |
|
|
import re |
|
|
from tqdm import tqdm |
|
|
import time |
|
|
import litellm |
|
|
from config import HF_TOKEN, LLM_MODEL_NAME |
|
|
|
|
|
|
|
|
class NumpyEncoder(json.JSONEncoder): |
|
|
"""Custom encoder for numpy data types to avoid JSON serializable errors.""" |
|
|
def default(self, obj): |
|
|
if isinstance(obj, np.integer): return int(obj) |
|
|
if isinstance(obj, np.floating): return float(obj) |
|
|
if isinstance(obj, np.ndarray): return obj.tolist() |
|
|
return super(NumpyEncoder, self).default(obj) |
|
|
|
|
|
|
|
|
MOVIELENS_DIR = 'data/ml-latest-small' |
|
|
PROCESSED_DIR = 'data/processed' |
|
|
MIN_RATINGS_THRESHOLD = 20 |
|
|
MIN_USER_RATINGS = 20 |
|
|
DISCOVERY_ERA_YEAR = 1980 |
|
|
|
|
|
|
|
|
PERSONA_TARGETS = { |
|
|
"Action Junkie": ["Action", "Adventure", "Sci-Fi"], |
|
|
"Romantic Dreamer": ["Romance"], |
|
|
"Cinephile Critic": ["Drama", "Crime", "Mystery", "Thriller"] |
|
|
} |
|
|
|
|
|
os.makedirs(PROCESSED_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
def call_llm_for_hook(prompt): |
|
|
"""Calls the LLM via LiteLLM to generate snappy movie hooks.""" |
|
|
try: |
|
|
print(f"DEBUG: Calling LLM for hook with model: {LLM_MODEL_NAME}") |
|
|
start_ts = time.time() |
|
|
response = litellm.completion( |
|
|
model=LLM_MODEL_NAME, |
|
|
messages=[{"role": "user", "content": prompt}], |
|
|
api_key=HF_TOKEN, |
|
|
max_tokens=40, |
|
|
temperature=0.8 |
|
|
) |
|
|
print(f"DEBUG: Full LLM Response Object: {response}") |
|
|
print(f"DEBUG: LLM call completed in {time.time() - start_ts:.4f}s") |
|
|
return response.choices[0].message.content.strip().replace('"', '') |
|
|
except Exception as e: |
|
|
return "A standout pick for your collection." |
|
|
|
|
|
def extract_year(title): |
|
|
"""Regex to pull (YYYY) from movie titles.""" |
|
|
match = re.search(r'\((\d{4})\)', title) |
|
|
return int(match.group(1)) if match else 0 |
|
|
|
|
|
|
|
|
def prepare_data(): |
|
|
print("Step 1: Loading raw MovieLens data...") |
|
|
ratings = pd.read_csv(f'{MOVIELENS_DIR}/ratings.csv') |
|
|
movies = pd.read_csv(f'{MOVIELENS_DIR}/movies.csv') |
|
|
|
|
|
|
|
|
print("Step 2: Processing metadata and applying thresholds...") |
|
|
movies['year'] = movies['title'].apply(extract_year) |
|
|
|
|
|
popular_ids = ratings.groupby('movieId').size() |
|
|
popular_ids = popular_ids[popular_ids >= MIN_RATINGS_THRESHOLD].index |
|
|
|
|
|
filtered_movies = movies[movies['movieId'].isin(popular_ids)].copy() |
|
|
movie_meta = {} |
|
|
for _, row in filtered_movies.iterrows(): |
|
|
movie_meta[str(row['movieId'])] = { |
|
|
'movie_title': row['title'], |
|
|
'genres': row['genres'].split('|'), |
|
|
'year': row['year'] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(" - Normalizing ratings to remove user bias...") |
|
|
user_avg_ratings = ratings.groupby('userId')['rating'].mean().to_dict() |
|
|
ratings['rating_normalized'] = ratings.apply( |
|
|
lambda row: row['rating'] - user_avg_ratings.get(row['userId'], 0), axis=1 |
|
|
) |
|
|
|
|
|
|
|
|
ratings_normalized = ratings[['userId', 'movieId', 'rating_normalized']].copy() |
|
|
ratings_normalized.columns = ['userId', 'movieId', 'rating'] |
|
|
print(" - Normalization complete. All subsequent steps use deviation-adjusted ratings.\n") |
|
|
|
|
|
|
|
|
print("Step 3: Identifying Specialist Centroids (Top 5 users per persona)...") |
|
|
step_start = time.time() |
|
|
persona_archetypes = {} |
|
|
|
|
|
for persona, target_genres in PERSONA_TARGETS.items(): |
|
|
|
|
|
genre_movies = movies[movies['genres'].str.contains('|'.join(target_genres))]['movieId'] |
|
|
|
|
|
|
|
|
active_users = ratings_normalized.groupby('userId').size() |
|
|
active_users = active_users[active_users >= MIN_USER_RATINGS].index |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
specialization_scores = {} |
|
|
|
|
|
for user_id in active_users: |
|
|
user_ratings = ratings_normalized[ratings_normalized['userId'] == user_id] |
|
|
total_user_ratings = len(user_ratings) |
|
|
|
|
|
|
|
|
genre_ratings = user_ratings[user_ratings['movieId'].isin(genre_movies)] |
|
|
genre_rating_count = len(genre_ratings) |
|
|
|
|
|
|
|
|
if genre_rating_count == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
genre_density = genre_rating_count / total_user_ratings |
|
|
genre_passion = genre_ratings['rating'].mean() |
|
|
|
|
|
|
|
|
specialization_scores[user_id] = genre_density * genre_passion |
|
|
|
|
|
|
|
|
top_5_specialists = sorted(specialization_scores.items(), key=lambda x: x[1], reverse=True)[:5] |
|
|
top_5_specialists = [user_id for user_id, score in top_5_specialists] |
|
|
print(f" - Found specialists for {persona}: {top_5_specialists}") |
|
|
|
|
|
|
|
|
centroid_ratings = ratings_normalized[ratings_normalized['userId'].isin(top_5_specialists)] |
|
|
|
|
|
aggregated_history = centroid_ratings.groupby('movieId')['rating'].mean().to_dict() |
|
|
|
|
|
persona_archetypes[persona] = { |
|
|
"specialist_ids": top_5_specialists, |
|
|
"target_genres": target_genres, |
|
|
"consolidated_history": aggregated_history |
|
|
} |
|
|
|
|
|
print(f">>> Step 3 (Specialist Centroids) complete in {time.time() - step_start:.2f}s") |
|
|
|
|
|
|
|
|
print("Step 4: Pre-computing Item-Item Similarities (O(1) Lookups)...") |
|
|
step_start = time.time() |
|
|
pivot = ratings_normalized.pivot(index='userId', columns='movieId', values='rating').fillna(0) |
|
|
item_sim_matrix = cosine_similarity(pivot.T) |
|
|
|
|
|
|
|
|
m_ids = pivot.columns.tolist() |
|
|
m_id_to_index = {m_id: idx for idx, m_id in enumerate(m_ids)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Analyzing genre coverage for different K values...") |
|
|
genre_coverage_analysis = {} |
|
|
K_CANDIDATES = [20, 30, 50, 100] |
|
|
optimal_k = 50 |
|
|
|
|
|
for genre in ["Action", "Drama", "Romance", "Comedy", "Sci-Fi", "Thriller"]: |
|
|
genre_movie_ids = set(movies[movies['genres'].str.contains(genre)]['movieId']) |
|
|
genre_coverage_analysis[genre] = {} |
|
|
|
|
|
|
|
|
for k in K_CANDIDATES: |
|
|
coverage_count = 0 |
|
|
for m_id in m_ids: |
|
|
m_idx = m_id_to_index[m_id] |
|
|
sim_scores = item_sim_matrix[m_idx] |
|
|
top_k_indices = np.argsort(sim_scores)[-(k+1):-1] |
|
|
genre_match = sum(1 for idx in top_k_indices if m_ids[idx] in genre_movie_ids) |
|
|
coverage_count += genre_match |
|
|
|
|
|
avg_coverage = coverage_count / len(m_ids) |
|
|
genre_coverage_analysis[genre][k] = avg_coverage |
|
|
|
|
|
print("Avg similar items per movie in each genre:") |
|
|
for genre, coverage_dict in genre_coverage_analysis.items(): |
|
|
print(f" {genre}: {coverage_dict}") |
|
|
|
|
|
|
|
|
TARGET_MIN_COVERAGE = 1.0 |
|
|
for k in sorted(K_CANDIDATES): |
|
|
min_coverage = min(genre_coverage_analysis[g][k] for g in genre_coverage_analysis.keys()) |
|
|
if min_coverage >= TARGET_MIN_COVERAGE: |
|
|
optimal_k = k |
|
|
print(f"\n✅ Optimal K selected: {optimal_k} (min genre coverage: {min_coverage:.2f})") |
|
|
break |
|
|
|
|
|
print(f"Using K={optimal_k} for top similar items\n") |
|
|
|
|
|
|
|
|
top_sim_dict = {} |
|
|
for i, m_id in enumerate(tqdm(m_ids, desc="Similarities")): |
|
|
if str(m_id) not in movie_meta: continue |
|
|
sim_scores = item_sim_matrix[i] |
|
|
|
|
|
top_indices = np.argsort(sim_scores)[-(optimal_k+1):-1] |
|
|
top_sim_dict[str(m_id)] = {str(m_ids[idx]): float(sim_scores[idx]) for idx in top_indices} |
|
|
|
|
|
print(f">>> Step 4 (Item-Item Similarities with K={optimal_k}) complete in {time.time() - step_start:.2f}s\n") |
|
|
|
|
|
|
|
|
with open(f'{PROCESSED_DIR}/movie_metadata.json', 'w') as f: |
|
|
json.dump(movie_meta, f, indent=4) |
|
|
with open(f'{PROCESSED_DIR}/persona_archetypes.json', 'w') as f: |
|
|
json.dump(persona_archetypes, f, cls=NumpyEncoder, indent=4) |
|
|
with open(f'{PROCESSED_DIR}/user_avg_ratings.json', 'w') as f: |
|
|
json.dump(user_avg_ratings, f, indent=4) |
|
|
with open(f'{PROCESSED_DIR}/top_similar_items.pkl', 'wb') as f: |
|
|
pickle.dump(top_sim_dict, f) |
|
|
|
|
|
return persona_archetypes, movie_meta, pivot |
|
|
|
|
|
|
|
|
def compute_home_recommendations(persona_archetypes, movie_meta, pivot): |
|
|
print("Step 5: Computing Layer 4 Home Recs with Recency Bias...") |
|
|
home_recs = {} |
|
|
|
|
|
for persona, data in persona_archetypes.items(): |
|
|
history = data['consolidated_history'] |
|
|
target_genres = data['target_genres'] |
|
|
|
|
|
print(f" - Building centroid and finding neighborhood for {persona}...") |
|
|
|
|
|
centroid_vec = pd.Series(0.0, index=pivot.columns, dtype=float) |
|
|
for m_id, rating in history.items(): |
|
|
if m_id in centroid_vec: centroid_vec[m_id] = rating |
|
|
|
|
|
|
|
|
user_sims = cosine_similarity([centroid_vec], pivot)[0] |
|
|
neighbor_idx = np.argsort(user_sims)[-50:] |
|
|
neighbor_ratings = pivot.iloc[neighbor_idx] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
numerator = neighbor_ratings.multiply(user_sims[neighbor_idx], axis=0).sum() |
|
|
denominator = user_sims[neighbor_idx].sum() |
|
|
candidates = numerator / denominator if denominator > 0 else numerator |
|
|
|
|
|
print(f" - Applying Layer 4 re-ranking for {persona}...") |
|
|
final_list = [] |
|
|
for m_id, raw_score in candidates.items(): |
|
|
m_id_str = str(m_id) |
|
|
if m_id_str not in movie_meta: continue |
|
|
if m_id in history: continue |
|
|
|
|
|
meta = movie_meta[m_id_str] |
|
|
|
|
|
|
|
|
genre_match = any(g in target_genres for g in meta['genres']) |
|
|
genre_multiplier = 2.5 if genre_match else 0.4 |
|
|
|
|
|
|
|
|
|
|
|
if meta['year'] < DISCOVERY_ERA_YEAR: |
|
|
|
|
|
age_factor = max(0.5, 1.0 - (DISCOVERY_ERA_YEAR - meta['year']) / 120) |
|
|
else: |
|
|
|
|
|
age_factor = min(1.3, 1.0 + (meta['year'] - DISCOVERY_ERA_YEAR) / 100) |
|
|
|
|
|
final_score = raw_score * genre_multiplier * age_factor |
|
|
|
|
|
final_list.append({ |
|
|
'movie_id': m_id, |
|
|
'movie_title': meta['movie_title'], |
|
|
'genres': meta['genres'], |
|
|
'score': final_score |
|
|
}) |
|
|
|
|
|
|
|
|
home_recs[persona] = sorted(final_list, key=lambda x: x['score'], reverse=True)[:6] |
|
|
|
|
|
top_movies_str = ", ".join([f"'{r['movie_title']}'" for r in home_recs[persona][:2]]) |
|
|
print(f" - Top recs for {persona}: {top_movies_str}...") |
|
|
|
|
|
with open(f'{PROCESSED_DIR}/home_recommendations.json', 'w') as f: |
|
|
json.dump(home_recs, f, indent=4) |
|
|
return home_recs |
|
|
|
|
|
def generate_hooks(home_recs): |
|
|
print("Step 6: Generating LLM Hooks for Discovery Feed...") |
|
|
cached_hooks = {} |
|
|
for persona, recs in home_recs.items(): |
|
|
cached_hooks[persona] = {} |
|
|
for r in recs: |
|
|
title = r['movie_title'] |
|
|
prompt = f"Generate a 5-10 word snappy, atmospheric hook for the movie: {title}." |
|
|
hook = call_llm_for_hook(prompt) |
|
|
cached_hooks[persona][str(r['movie_id'])] = hook |
|
|
time.sleep(0.5) |
|
|
|
|
|
with open(f'{PROCESSED_DIR}/cached_hooks.json', 'w') as f: |
|
|
json.dump(cached_hooks, f, indent=4) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
total_start = time.time() |
|
|
|
|
|
|
|
|
step_start = time.time() |
|
|
archetypes, meta, p_matrix = prepare_data() |
|
|
print(f">>> Phase 1 (Data Prep) complete in {time.time() - step_start:.2f}s\n") |
|
|
|
|
|
|
|
|
step_start = time.time() |
|
|
recs = compute_home_recommendations(archetypes, meta, p_matrix) |
|
|
print(f">>> Phase 2 (Home Recs) complete in {time.time() - step_start:.2f}s\n") |
|
|
|
|
|
|
|
|
step_start = time.time() |
|
|
generate_hooks(recs) |
|
|
print(f">>> Phase 3 (LLM Hooks) complete in {time.time() - step_start:.2f}s\n") |
|
|
|
|
|
print(f"✅ SUCCESS: Full data pipeline complete in {time.time() - total_start:.2f} seconds.") |