Yoma
Initial commit with LFS
a1a5ffc
# data_prep.py - COMPLETE ARCHETYPE EDITION
# Integrates: 20-rating threshold, Recency Bias Penalty, and Specialist Centroid Logic.
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import json
import pickle
import os
import re
from tqdm import tqdm
import time
import litellm
from config import HF_TOKEN, LLM_MODEL_NAME
# --- CUSTOM JSON ENCODER ---
class NumpyEncoder(json.JSONEncoder):
"""Custom encoder for numpy data types to avoid JSON serializable errors."""
def default(self, obj):
if isinstance(obj, np.integer): return int(obj)
if isinstance(obj, np.floating): return float(obj)
if isinstance(obj, np.ndarray): return obj.tolist()
return super(NumpyEncoder, self).default(obj)
# --- CONSTANTS & CONFIG ---
MOVIELENS_DIR = 'data/ml-latest-small'
PROCESSED_DIR = 'data/processed'
MIN_RATINGS_THRESHOLD = 20 # Popularity filter for items
MIN_USER_RATINGS = 20 # Minimum activity for specialist candidates
DISCOVERY_ERA_YEAR = 1980 # Threshold for Recency Bias logic
# Specialist Persona Targets (Used to find the top 5 specialists per category)
PERSONA_TARGETS = {
"Action Junkie": ["Action", "Adventure", "Sci-Fi"],
"Romantic Dreamer": ["Romance"],
"Cinephile Critic": ["Drama", "Crime", "Mystery", "Thriller"]
}
os.makedirs(PROCESSED_DIR, exist_ok=True)
# --- LLM HELPER ---
def call_llm_for_hook(prompt):
"""Calls the LLM via LiteLLM to generate snappy movie hooks."""
try:
print(f"DEBUG: Calling LLM for hook with model: {LLM_MODEL_NAME}")
start_ts = time.time()
response = litellm.completion(
model=LLM_MODEL_NAME,
messages=[{"role": "user", "content": prompt}],
api_key=HF_TOKEN,
max_tokens=40,
temperature=0.8
)
print(f"DEBUG: Full LLM Response Object: {response}")
print(f"DEBUG: LLM call completed in {time.time() - start_ts:.4f}s")
return response.choices[0].message.content.strip().replace('"', '')
except Exception as e:
return "A standout pick for your collection."
def extract_year(title):
"""Regex to pull (YYYY) from movie titles."""
match = re.search(r'\((\d{4})\)', title)
return int(match.group(1)) if match else 0
# --- CORE PREPARATION ---
def prepare_data():
print("Step 1: Loading raw MovieLens data...")
ratings = pd.read_csv(f'{MOVIELENS_DIR}/ratings.csv')
movies = pd.read_csv(f'{MOVIELENS_DIR}/movies.csv')
# 2 Process Metadata
print("Step 2: Processing metadata and applying thresholds...")
movies['year'] = movies['title'].apply(extract_year) # Extract (YYYY) from titles for recency bias
# Filter movies by popularity: count ratings per movie, then keep only those with ≥20 ratings
popular_ids = ratings.groupby('movieId').size()
popular_ids = popular_ids[popular_ids >= MIN_RATINGS_THRESHOLD].index
filtered_movies = movies[movies['movieId'].isin(popular_ids)].copy()
movie_meta = {}
for _, row in filtered_movies.iterrows():
movie_meta[str(row['movieId'])] = {
'movie_title': row['title'],
'genres': row['genres'].split('|'),
'year': row['year']
}
# === RATING NORMALIZATION (User Bias Correction) ===
# Problem: Different users have different rating scales (generous vs. strict)
# Solution: Subtract each user's average rating from their individual ratings
# This creates deviation scores that are comparable across users
# Formula: normalized_rating = rating - user_avg_rating
print(" - Normalizing ratings to remove user bias...")
user_avg_ratings = ratings.groupby('userId')['rating'].mean().to_dict()
ratings['rating_normalized'] = ratings.apply(
lambda row: row['rating'] - user_avg_ratings.get(row['userId'], 0), axis=1
)
# Use normalized ratings for all collaborative filtering steps
ratings_normalized = ratings[['userId', 'movieId', 'rating_normalized']].copy()
ratings_normalized.columns = ['userId', 'movieId', 'rating']
print(" - Normalization complete. All subsequent steps use deviation-adjusted ratings.\n")
# 3 Identify Specialist Persona Archetypes (The Centroid Logic)
print("Step 3: Identifying Specialist Centroids (Top 5 users per persona)...")
step_start = time.time()
persona_archetypes = {}
for persona, target_genres in PERSONA_TARGETS.items():
# Find movies in target genres
genre_movies = movies[movies['genres'].str.contains('|'.join(target_genres))]['movieId']
# Identify active users (MIN_USER_RATINGS ≥20 total ratings)
active_users = ratings_normalized.groupby('userId').size()
active_users = active_users[active_users >= MIN_USER_RATINGS].index
# Calculate Specialization Score for each active user using NORMALIZED ratings
# Formula: Specialization(u) = (Genre Density) × (Genre Passion)
# Genre Density = (Ratings in Target Genre / Total Ratings by User)
# Genre Passion = AvgRating(u, Target Genre) - based on NORMALIZED ratings
specialization_scores = {}
for user_id in active_users:
user_ratings = ratings_normalized[ratings_normalized['userId'] == user_id]
total_user_ratings = len(user_ratings)
# Get genre ratings for this user
genre_ratings = user_ratings[user_ratings['movieId'].isin(genre_movies)]
genre_rating_count = len(genre_ratings)
# Skip if user hasn't rated any genre movies
if genre_rating_count == 0:
continue
# Calculate specialization components (using normalized ratings)
genre_density = genre_rating_count / total_user_ratings # Proportion (normalized)
genre_passion = genre_ratings['rating'].mean() # Average DEVIATION for genre
# Specialization score (combined metric)
specialization_scores[user_id] = genre_density * genre_passion
# Select top 5 specialists by specialization score
top_5_specialists = sorted(specialization_scores.items(), key=lambda x: x[1], reverse=True)[:5]
top_5_specialists = [user_id for user_id, score in top_5_specialists]
print(f" - Found specialists for {persona}: {top_5_specialists}")
# Aggregate their NORMALIZED ratings (The Centroid)
centroid_ratings = ratings_normalized[ratings_normalized['userId'].isin(top_5_specialists)]
# Create centroid vector: average NORMALIZED rating per movie from the 5 specialists
aggregated_history = centroid_ratings.groupby('movieId')['rating'].mean().to_dict()
persona_archetypes[persona] = {
"specialist_ids": top_5_specialists,
"target_genres": target_genres,
"consolidated_history": aggregated_history
}
print(f">>> Step 3 (Specialist Centroids) complete in {time.time() - step_start:.2f}s")
# Step 4: Pre-compute Item-Item Similarities for app.py
print("Step 4: Pre-computing Item-Item Similarities (O(1) Lookups)...")
step_start = time.time()
pivot = ratings_normalized.pivot(index='userId', columns='movieId', values='rating').fillna(0)
item_sim_matrix = cosine_similarity(pivot.T)
# Define m_ids early and create O(1) lookup mapping
m_ids = pivot.columns.tolist()
m_id_to_index = {m_id: idx for idx, m_id in enumerate(m_ids)}
# Genre coverage analysis with O(1) lookup (fixed performance issue)
# === STEP 4A: Genre Coverage Analysis for Adaptive K Selection ===
# Problem: How many similar items (K) should we pre-compute for genre filtering?
# - Too low (K=20): Genre filter may not find enough candidates for all genres
# - Too high (K=100): Wastes storage and compute for marginal benefit
# Solution: Empirically test different K values and pick the smallest K that ensures
# every genre has sufficient similar items available (≥1.0 average per movie).
# This ensures genre-filtered recommendations in app.py won't be starved for choices.
print("Analyzing genre coverage for different K values...")
genre_coverage_analysis = {}
K_CANDIDATES = [20, 30, 50, 100]
optimal_k = 50 # Default fallback
for genre in ["Action", "Drama", "Romance", "Comedy", "Sci-Fi", "Thriller"]:
genre_movie_ids = set(movies[movies['genres'].str.contains(genre)]['movieId'])
genre_coverage_analysis[genre] = {}
# For each K candidate, measure coverage with O(1) lookup
for k in K_CANDIDATES:
coverage_count = 0
for m_id in m_ids:
m_idx = m_id_to_index[m_id] # O(1) lookup instead of m_ids.index()
sim_scores = item_sim_matrix[m_idx]
top_k_indices = np.argsort(sim_scores)[-(k+1):-1]
genre_match = sum(1 for idx in top_k_indices if m_ids[idx] in genre_movie_ids)
coverage_count += genre_match
avg_coverage = coverage_count / len(m_ids)
genre_coverage_analysis[genre][k] = avg_coverage
print("Avg similar items per movie in each genre:")
for genre, coverage_dict in genre_coverage_analysis.items():
print(f" {genre}: {coverage_dict}")
# Adaptively select K: find smallest K that gives >=1.0 avg items per genre
TARGET_MIN_COVERAGE = 1.0 # At least 1 similar item per genre on average
for k in sorted(K_CANDIDATES):
min_coverage = min(genre_coverage_analysis[g][k] for g in genre_coverage_analysis.keys())
if min_coverage >= TARGET_MIN_COVERAGE:
optimal_k = k
print(f"\n✅ Optimal K selected: {optimal_k} (min genre coverage: {min_coverage:.2f})")
break
print(f"Using K={optimal_k} for top similar items\n")
# Pre-compute similar items with adaptive K
top_sim_dict = {}
for i, m_id in enumerate(tqdm(m_ids, desc="Similarities")):
if str(m_id) not in movie_meta: continue
sim_scores = item_sim_matrix[i]
# Get top K similar (excluding self)
top_indices = np.argsort(sim_scores)[-(optimal_k+1):-1]
top_sim_dict[str(m_id)] = {str(m_ids[idx]): float(sim_scores[idx]) for idx in top_indices}
print(f">>> Step 4 (Item-Item Similarities with K={optimal_k}) complete in {time.time() - step_start:.2f}s\n")
# Save components
with open(f'{PROCESSED_DIR}/movie_metadata.json', 'w') as f:
json.dump(movie_meta, f, indent=4)
with open(f'{PROCESSED_DIR}/persona_archetypes.json', 'w') as f:
json.dump(persona_archetypes, f, cls=NumpyEncoder, indent=4)
with open(f'{PROCESSED_DIR}/user_avg_ratings.json', 'w') as f:
json.dump(user_avg_ratings, f, indent=4)
with open(f'{PROCESSED_DIR}/top_similar_items.pkl', 'wb') as f:
pickle.dump(top_sim_dict, f)
return persona_archetypes, movie_meta, pivot
# --- RECOMMENDATION ENGINE ---
def compute_home_recommendations(persona_archetypes, movie_meta, pivot):
print("Step 5: Computing Layer 4 Home Recs with Recency Bias...")
home_recs = {}
for persona, data in persona_archetypes.items():
history = data['consolidated_history']
target_genres = data['target_genres']
print(f" - Building centroid and finding neighborhood for {persona}...")
# Construct Centroid Vector
centroid_vec = pd.Series(0.0, index=pivot.columns, dtype=float) # Init with float to avoid warning
for m_id, rating in history.items():
if m_id in centroid_vec: centroid_vec[m_id] = rating
# Neighborhood Search (Find users similar to the centroid)
user_sims = cosine_similarity([centroid_vec], pivot)[0]
neighbor_idx = np.argsort(user_sims)[-50:]
neighbor_ratings = pivot.iloc[neighbor_idx]
# Weighted Candidate Scores (Layer 3: Collaborative Prediction)
# Formula: R_{p,m} = Σ(Similarity × Rating) / Σ|Similarity|
# This normalizes by total similarity to get average weighted score
numerator = neighbor_ratings.multiply(user_sims[neighbor_idx], axis=0).sum()
denominator = user_sims[neighbor_idx].sum()
candidates = numerator / denominator if denominator > 0 else numerator
print(f" - Applying Layer 4 re-ranking for {persona}...")
final_list = []
for m_id, raw_score in candidates.items():
m_id_str = str(m_id)
if m_id_str not in movie_meta: continue
if m_id in history: continue # Don't recommend what they've seen
meta = movie_meta[m_id_str]
# Layer 4a: Genre Affinity Boost
genre_match = any(g in target_genres for g in meta['genres'])
genre_multiplier = 2.5 if genre_match else 0.4
# Layer 4b: Recency Bias Logic (Claude's Penalty vs Bonus)
# Movies post-1980 get a bonus, older movies get a penalty proportional to age
if meta['year'] < DISCOVERY_ERA_YEAR:
# Penalty scales from 0.9 (1979) down to 0.5 (1930s)
age_factor = max(0.5, 1.0 - (DISCOVERY_ERA_YEAR - meta['year']) / 120)
else:
# Bonus scales from 1.0 (1980) up to 1.3 (Modern)
age_factor = min(1.3, 1.0 + (meta['year'] - DISCOVERY_ERA_YEAR) / 100)
final_score = raw_score * genre_multiplier * age_factor
final_list.append({
'movie_id': m_id,
'movie_title': meta['movie_title'],
'genres': meta['genres'],
'score': final_score
})
# Top 6 for Home Screen
home_recs[persona] = sorted(final_list, key=lambda x: x['score'], reverse=True)[:6]
top_movies_str = ", ".join([f"'{r['movie_title']}'" for r in home_recs[persona][:2]])
print(f" - Top recs for {persona}: {top_movies_str}...")
with open(f'{PROCESSED_DIR}/home_recommendations.json', 'w') as f:
json.dump(home_recs, f, indent=4)
return home_recs
def generate_hooks(home_recs):
print("Step 6: Generating LLM Hooks for Discovery Feed...")
cached_hooks = {}
for persona, recs in home_recs.items():
cached_hooks[persona] = {}
for r in recs:
title = r['movie_title']
prompt = f"Generate a 5-10 word snappy, atmospheric hook for the movie: {title}."
hook = call_llm_for_hook(prompt)
cached_hooks[persona][str(r['movie_id'])] = hook
time.sleep(0.5) # Increased sleep to avoid HF Backend Error 40001
with open(f'{PROCESSED_DIR}/cached_hooks.json', 'w') as f:
json.dump(cached_hooks, f, indent=4)
if __name__ == "__main__":
total_start = time.time()
# --- Phase 1: Data Loading, Cleaning, and Initial Computations ---
step_start = time.time()
archetypes, meta, p_matrix = prepare_data()
print(f">>> Phase 1 (Data Prep) complete in {time.time() - step_start:.2f}s\n")
# --- Phase 2: Compute Home Recommendations using Centroid Logic ---
step_start = time.time()
recs = compute_home_recommendations(archetypes, meta, p_matrix)
print(f">>> Phase 2 (Home Recs) complete in {time.time() - step_start:.2f}s\n")
# --- Phase 3: Generate LLM Hooks for the UI ---
step_start = time.time()
generate_hooks(recs)
print(f">>> Phase 3 (LLM Hooks) complete in {time.time() - step_start:.2f}s\n")
print(f"✅ SUCCESS: Full data pipeline complete in {time.time() - total_start:.2f} seconds.")