from flask import Flask, render_template, request, jsonify, current_app import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler from sklearn.metrics.pairwise import cosine_similarity import os import logging # --- Logging Configuration --- # Ensure logging is configured before any loggers are potentially used by imported modules logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]') logger = logging.getLogger(__name__) app = Flask(__name__) # --- Global Variables --- DF = None ALL_TOPPINGS = [] FEATURE_DF = None SCALER = None # Will be initialized in preprocess_data NUMERICAL_COLS = ['Price', 'Slices', 'Rating', 'Spice_Level', 'Preparation_Time', 'Calories'] CATEGORICAL_FEATURES = [ 'Serving_Size', 'Popular_Group', 'Dietary_Category', 'Sauce_Type', 'Cheese_Amount', 'Restaurant_Chain', 'Seasonal_Availability', 'Bread_Type' ] CRUST_TYPE_COL = None DEFAULT_IMAGE_URL = 'https://images.dominos.co.in/new_margherita_2502.jpg' def preprocess_data(df_path='pizza.csv'): global DF, ALL_TOPPINGS, FEATURE_DF, SCALER, CATEGORICAL_FEATURES, CRUST_TYPE_COL logger.info(f"Attempting to preprocess data from relative path: {df_path}") # Construct absolute path for the CSV file # This is crucial for environments like Docker where working directory might differ base_dir = os.path.dirname(os.path.abspath(__file__)) # Directory of the current script (app.py) absolute_df_path = os.path.join(base_dir, df_path) logger.info(f"Absolute path for CSV: {absolute_df_path}") if not os.path.exists(absolute_df_path): logger.error(f"Dataset file '{absolute_df_path}' not found.") raise FileNotFoundError(f"Dataset file '{absolute_df_path}' not found. Ensure it's in the same directory as app.py.") DF = pd.read_csv(absolute_df_path) logger.info(f"Successfully loaded '{absolute_df_path}'. Original DataFrame shape: {DF.shape}") logger.info(f"Original DataFrame columns: {DF.columns.tolist()}") # Determine Crust Type Column potential_crust_cols = ['Crust_Type', 'Cr_Type'] valid_crust_cols = [col for col in potential_crust_cols if col in DF.columns] if valid_crust_cols: valid_crust_cols.sort(key=lambda col: DF[col].isnull().sum()) # Prefer column with fewer NaNs CRUST_TYPE_COL = valid_crust_cols[0] logger.info(f"Using '{CRUST_TYPE_COL}' for crust type.") if CRUST_TYPE_COL not in CATEGORICAL_FEATURES: CATEGORICAL_FEATURES.append(CRUST_TYPE_COL) # Remove other potential crust columns if they were in CATEGORICAL_FEATURES for col in potential_crust_cols: if col != CRUST_TYPE_COL and col in CATEGORICAL_FEATURES: CATEGORICAL_FEATURES.remove(col) else: logger.warning("Crust type column (Crust_Type or Cr_Type) not found. Crust type will not be used.") CRUST_TYPE_COL = None # Fill NaN for text-based categorical columns and other text fields text_cols_to_fill = list(set(CATEGORICAL_FEATURES + ['Toppings', 'Description', 'Allergens', 'Image_Url', 'Pizza_Name'])) for col in text_cols_to_fill: if col and col in DF.columns: # Ensure col is not None (e.g. if CRUST_TYPE_COL is None) DF[col] = DF[col].fillna('') logger.info("Filled NaNs in text-based categorical columns with empty strings.") # Fill NaN for numerical columns from the CSV numerical_cols_in_df = ['Price_Rs', 'Slices', 'Rating', 'Rating_Count', 'Preparation_Time_min', 'Calories_per_Slice'] for col in numerical_cols_in_df: if col in DF.columns: if pd.api.types.is_numeric_dtype(DF[col]): median_val = DF[col].median() DF[col] = DF[col].fillna(median_val) logger.info(f"Filled NaNs in numerical column '{col}' with its median ({median_val}).") else: # Attempt to convert to numeric, then fill with median or 0 numeric_series = pd.to_numeric(DF[col], errors='coerce') median_val = 0 if not numeric_series.isnull().all(): median_val = numeric_series.median() DF[col] = numeric_series.fillna(median_val) logger.warning(f"Column '{col}' was not purely numeric. Converted to numeric, filled NaNs with median/0 ({median_val}).") else: logger.warning(f"Expected numerical column '{col}' not found in DataFrame. It will be missing from features if not handled.") if 'Rating_Count' in DF.columns: DF['Rating_Count'] = DF['Rating_Count'].fillna(0).astype(int) # Process Toppings if 'Toppings' in DF.columns: DF['Toppings_list_internal'] = DF['Toppings'].astype(str).str.split(r';\s*') # Use raw string for regex DF['Toppings_list_internal'] = DF['Toppings_list_internal'].apply( lambda x: [t.strip() for t in x if isinstance(t, str) and t.strip()]) # Filter out empty strings after split current_all_toppings = set() for toppings_list in DF['Toppings_list_internal'].dropna(): current_all_toppings.update(t for t in toppings_list if t) # Ensure t is not empty ALL_TOPPINGS = sorted(list(current_all_toppings)) logger.info(f"Found {len(ALL_TOPPINGS)} unique toppings. Example: {ALL_TOPPINGS[:5] if ALL_TOPPINGS else 'None'}") else: logger.warning("'Toppings' column not found. Topping features will be empty.") DF['Toppings_list_internal'] = pd.Series([[] for _ in range(len(DF))]) # Empty list for all rows ALL_TOPPINGS = [] # --- Feature Engineering --- feature_data = {} num_feature_map = { 'Price': 'Price_Rs', 'Slices': 'Slices', 'Rating': 'Rating', 'Preparation_Time': 'Preparation_Time_min', 'Calories': 'Calories_per_Slice' } for feature_col, df_col in num_feature_map.items(): if df_col in DF.columns: feature_data[feature_col] = DF[df_col].copy() else: logger.warning(f"Numerical source column '{df_col}' for feature '{feature_col}' not found. Filling with zeros.") feature_data[feature_col] = pd.Series([0.0] * len(DF)) # Ensure float for consistency # Spice Level Feature (Numerical) if 'Spice_Level' in DF.columns: DF['Spice_Level'] = DF['Spice_Level'].fillna('Mild') # Default for NaNs spice_map = {'Mild': 1, 'Medium': 2, 'Hot': 3} feature_data['Spice_Level'] = DF['Spice_Level'].map(spice_map).fillna(1.0) # Ensure float else: logger.warning("'Spice_Level' column not found. Filling 'Spice_Level' feature with default (1.0).") feature_data['Spice_Level'] = pd.Series([1.0] * len(DF)) # Default if column is missing # One-Hot Encode Categorical Features for feature_cat_col in CATEGORICAL_FEATURES: if feature_cat_col and feature_cat_col in DF.columns: # Check if col_name is not None and exists # Ensure the column is treated as string to avoid issues with mixed types in unique() DF[feature_cat_col] = DF[feature_cat_col].astype(str) for value in DF[feature_cat_col].unique(): if pd.notnull(value) and value.strip() != '': # Check for non-null and non-empty string values feature_data[f"{feature_cat_col}_{value}"] = (DF[feature_cat_col] == value).astype(int) elif feature_cat_col: # Log warning only if feature_cat_col was defined logger.warning(f"Categorical source column '{feature_cat_col}' for one-hot encoding not found in DataFrame.") # Topping Features (One-Hot Encoded) for topping in ALL_TOPPINGS: if topping: # Ensure topping string is not empty feature_data[f"Topping_{topping}"] = DF['Toppings_list_internal'].apply( lambda x: 1 if topping in x else 0 ) FEATURE_DF = pd.DataFrame(feature_data) logger.info(f"FEATURE_DF created. Shape: {FEATURE_DF.shape}. Columns: {FEATURE_DF.columns.tolist()[:10]}...") # Log first 10 cols # Ensure all NUMERICAL_COLS exist in FEATURE_DF and fill NaNs for col in NUMERICAL_COLS: if col not in FEATURE_DF.columns: logger.warning(f"Numerical column '{col}' is missing from FEATURE_DF after construction. Adding as zeros.") FEATURE_DF[col] = 0.0 # Ensure float if FEATURE_DF[col].isnull().any(): mean_val = FEATURE_DF[col].mean() fill_val = mean_val if pd.notna(mean_val) else 0.0 logger.info(f"Filling NaNs in numerical feature column '{col}' with {fill_val}.") FEATURE_DF[col] = FEATURE_DF[col].fillna(fill_val) # Scale Numerical Features SCALER = MinMaxScaler() # Initialize scaler if not FEATURE_DF.empty and all(col in FEATURE_DF.columns for col in NUMERICAL_COLS): try: FEATURE_DF[NUMERICAL_COLS] = SCALER.fit_transform(FEATURE_DF[NUMERICAL_COLS]) logger.info(f"Numerical columns ({NUMERICAL_COLS}) scaled. FEATURE_DF shape: {FEATURE_DF.shape}") except Exception as e: logger.error(f"Error during scaling of numerical columns: {e}. FEATURE_DF might be problematic.") # Fallback: Keep numerical columns unscaled if scaling fails, or handle as needed elif FEATURE_DF.empty: logger.error("FEATURE_DF is empty before scaling. Scaling skipped. This will likely cause issues.") else: missing_cols = [col for col in NUMERICAL_COLS if col not in FEATURE_DF.columns] logger.error(f"Not all numerical columns ({NUMERICAL_COLS}) found in FEATURE_DF for scaling. Missing: {missing_cols}. Scaling skipped.") logger.info(f"Preprocessing done. DF is None: {DF is None}, FEATURE_DF is None: {FEATURE_DF is None}, SCALER is None: {SCALER is None}") if FEATURE_DF is not None: logger.info(f"Final FEATURE_DF shape: {FEATURE_DF.shape}") if DF is not None: logger.info(f"Final DF shape: {DF.shape}") @app.route('/') def index_route(): global DF, ALL_TOPPINGS, CATEGORICAL_FEATURES, CRUST_TYPE_COL, FEATURE_DF, DEFAULT_IMAGE_URL # Critical check at the beginning of the route if DF is None: current_app.logger.error("DF is None when trying to serve '/'. Data preprocessing might have failed or not run.") return "Error: Pizza data (DF) not loaded. Please check server logs.", 500 if FEATURE_DF is None: # Also check FEATURE_DF as it's derived current_app.logger.error("FEATURE_DF is None when trying to serve '/'. Data preprocessing might have failed.") return "Error: Pizza feature data (FEATURE_DF) not loaded. Please check server logs.", 500 filter_options = {} # Ensure 'Spice_Level' is included for filter options if it exists in DF cols_for_filters_set = set(cat_col for cat_col in CATEGORICAL_FEATURES if cat_col and cat_col in DF.columns) # Filter out None or non-existent if 'Spice_Level' in DF.columns: cols_for_filters_set.add('Spice_Level') # CRUST_TYPE_COL is already in CATEGORICAL_FEATURES if found for col_name in list(cols_for_filters_set): # key_name for JS should be consistent (lowercase, no underscores) key_name = col_name.lower().replace('_', '') # No special handling for spicelevel or crusttype here, it's naturally handled by the line above. unique_values = sorted([v for v in DF[col_name].astype(str).dropna().unique() if v.strip() != '']) if unique_values: # Only add if there are actual values filter_options[key_name] = unique_values # Prepare default recommendations (e.g., top-rated) # Make sure 'Rating' column exists if 'Rating' in DF.columns: default_recommendations_df = DF.sort_values('Rating', ascending=False).copy() else: logger.warning("'Rating' column not found in DF. Cannot sort for default recommendations. Using unsorted DF.") default_recommendations_df = DF.copy() # Fallback to unsorted default_recs_list = [] frontend_keys = [ 'id', 'name', 'toppings', 'price', 'slices', 'serving_size', 'rating', 'rating_count', 'description', 'popular_group', 'dietary_category', 'spice_level', 'sauce_type', 'cheese_amount', 'calories', 'allergens', 'prep_time', 'restaurant', 'seasonal', 'bread_type', 'image_url', 'crust_type' ] df_to_frontend_map = { 'id': None, 'name': 'Pizza_Name', 'toppings': 'Toppings', 'price': 'Price_Rs', 'slices': 'Slices', 'serving_size': 'Serving_Size', 'rating': 'Rating', 'rating_count': 'Rating_Count', 'description': 'Description', 'popular_group': 'Popular_Group', 'dietary_category': 'Dietary_Category', 'spice_level': 'Spice_Level', 'sauce_type': 'Sauce_Type', 'cheese_amount': 'Cheese_Amount', 'calories': 'Calories_per_Slice', 'allergens': 'Allergens', 'prep_time': 'Preparation_Time_min', 'restaurant': 'Restaurant_Chain', 'seasonal': 'Seasonal_Availability', 'bread_type': 'Bread_Type', 'image_url': 'Image_Url', 'crust_type': CRUST_TYPE_COL # Uses the determined CRUST_TYPE_COL } for original_idx, pizza_row in default_recommendations_df.iterrows(): rec_item = {} for key in frontend_keys: df_col = df_to_frontend_map.get(key) if key == 'id': rec_item[key] = int(original_idx) # Pizza ID is its original index in DF elif df_col and df_col in pizza_row: # df_col can be None for 'id' or if CRUST_TYPE_COL is None value = pizza_row[df_col] # Type conversions for JSON serializability if isinstance(value, np.integer): value = int(value) elif isinstance(value, np.floating): value = float(value) elif isinstance(value, np.ndarray): value = value.tolist() rec_item[key] = "" if pd.isna(value) else value elif key == 'crust_type' and not CRUST_TYPE_COL : # If CRUST_TYPE_COL was not found rec_item[key] = "N/A" else: rec_item[key] = "" # Default for missing fields rec_item['rating_count'] = int(rec_item.get('rating_count', 0) or 0) # Ensure int rec_item['image_url'] = rec_item.get('image_url') if rec_item.get('image_url') else DEFAULT_IMAGE_URL # Final pass to convert any remaining numpy generic types for k_final, v_final in rec_item.items(): if isinstance(v_final, np.generic): rec_item[k_final] = v_final.item() default_recs_list.append(rec_item) current_app.logger.info(f"Serving {len(default_recs_list)} pizzas for initial display.") current_app.logger.info(f"Filter options for template: {filter_options}") current_app.logger.info(f"ALL_TOPPINGS for template: {ALL_TOPPINGS[:5] if ALL_TOPPINGS else 'None'}") return render_template('index.html', toppings=ALL_TOPPINGS, filter_options=filter_options, default_recommendations=default_recs_list, default_image_url=DEFAULT_IMAGE_URL) def get_recommendations(preferences): global DF, FEATURE_DF, SCALER, CRUST_TYPE_COL, DEFAULT_IMAGE_URL if DF is None or FEATURE_DF is None or SCALER is None: current_app.logger.error("Data not fully initialized (DF, FEATURE_DF, or SCALER is None) for get_recommendations.") return [] current_indices = DF.index.to_list() current_app.logger.info(f"Starting with {len(current_indices)} pizzas before filtering. Preferences: {preferences}") # --- Hard Filters --- # 1. Toppings if 'toppings' in preferences and preferences['toppings'] and 'Toppings_list_internal' in DF.columns: selected_toppings = set(preferences['toppings']) if selected_toppings: # Ensure not an empty list that would select nothing topping_mask = DF.loc[current_indices, 'Toppings_list_internal'].apply( lambda x_toppings: isinstance(x_toppings, list) and any(t in selected_toppings for t in x_toppings) ) current_indices = DF.loc[current_indices][topping_mask].index.to_list() current_app.logger.info(f"After toppings filter: {len(current_indices)} pizzas remaining") if not current_indices: return [] # 2. Max Price if 'price_range' in preferences and preferences['price_range'] and 'Price_Rs' in DF.columns: try: min_price = float(preferences['price_range'][0]) max_price = float(preferences['price_range'][1]) price_mask = (DF.loc[current_indices, 'Price_Rs'] >= min_price) & \ (DF.loc[current_indices, 'Price_Rs'] <= max_price) current_indices = DF.loc[current_indices][price_mask].index.to_list() current_app.logger.info(f"After price filter ({min_price}-{max_price}): {len(current_indices)} pizzas") if not current_indices: return [] except (TypeError, ValueError, IndexError) as e: current_app.logger.warning(f"Invalid price_range preference: {preferences['price_range']}. Error: {e}") # 3. Number of Slices (Min Slices) if 'slices' in preferences and preferences['slices'] is not None and 'Slices' in DF.columns: try: min_slices = int(preferences['slices']) slices_mask = DF.loc[current_indices, 'Slices'] >= min_slices current_indices = DF.loc[current_indices][slices_mask].index.to_list() current_app.logger.info(f"After slices filter (>= {min_slices}): {len(current_indices)} pizzas") if not current_indices: return [] except ValueError: current_app.logger.warning(f"Invalid value for slices: {preferences['slices']}") # 4. Minimum Rating if 'rating' in preferences and preferences['rating'] is not None and 'Rating' in DF.columns: try: min_rating = float(preferences['rating']) rating_mask = DF.loc[current_indices, 'Rating'] >= min_rating current_indices = DF.loc[current_indices][rating_mask].index.to_list() current_app.logger.info(f"After rating filter (>= {min_rating}): {len(current_indices)} pizzas") if not current_indices: return [] except ValueError: current_app.logger.warning(f"Invalid value for rating: {preferences['rating']}") # 5. Max Preparation Time if 'prep_time' in preferences and preferences['prep_time'] is not None and 'Preparation_Time_min' in DF.columns: try: max_prep_time = int(str(preferences['prep_time']).lower().replace("min", "").strip()) prep_mask = DF.loc[current_indices, 'Preparation_Time_min'] <= max_prep_time current_indices = DF.loc[current_indices][prep_mask].index.to_list() current_app.logger.info(f"After prep time filter (<= {max_prep_time}): {len(current_indices)} pizzas") if not current_indices: return [] except ValueError: current_app.logger.warning(f"Could not parse prep_time value: {preferences['prep_time']}") # 6. Categorical Filters (Multi-select OR logic) # JS keys: servingsize, populargroup, dietarycategory, spicelevel, saucetype, etc. categorical_pref_map = { "servingsize": "Serving_Size", "populargroup": "Popular_Group", "dietarycategory": "Dietary_Category", "spicelevel": "Spice_Level", "saucetype": "Sauce_Type", "cheeseamount": "Cheese_Amount", "restaurantchain": "Restaurant_Chain", "seasonalavailability": "Seasonal_Availability", "breadtype": "Bread_Type", "crusttype": CRUST_TYPE_COL } for pref_key, df_col_name in categorical_pref_map.items(): if df_col_name and pref_key in preferences and preferences[pref_key]: # Ensure df_col_name is not None pref_value_list = preferences[pref_key] # Expected to be a list from JS if isinstance(pref_value_list, list) and pref_value_list: # If list is not empty if df_col_name in DF.columns: cat_mask = DF.loc[current_indices, df_col_name].isin(pref_value_list) current_indices = DF.loc[current_indices][cat_mask].index.to_list() current_app.logger.info(f"After {pref_key} filter (isin {pref_value_list}): {len(current_indices)} pizzas") if not current_indices: return [] else: current_app.logger.warning(f"Column '{df_col_name}' for preference '{pref_key}' not found in DF. Filter skipped.") # If pref_value_list is empty, it means "Any" for this category, so no filtering. if not current_indices: current_app.logger.info("No pizzas match all hard filter criteria.") return [] # --- Similarity Scoring Part --- # Filter FEATURE_DF to only include pizzas remaining after hard filters valid_indices_for_feature_df = FEATURE_DF.index.intersection(current_indices) if valid_indices_for_feature_df.empty: current_app.logger.info("No valid indices remain for FEATURE_DF after hard filters.") return [] filtered_feature_df = FEATURE_DF.loc[valid_indices_for_feature_df] if filtered_feature_df.empty: # Should not happen if valid_indices_for_feature_df is not empty current_app.logger.warning("Filtered FEATURE_DF is empty. This is unexpected.") return [] # Create User Preference Vector (aligned with FEATURE_DF columns) user_vector = pd.Series(0.0, index=FEATURE_DF.columns) # Initialize with 0.0 for float consistency # 1. Toppings in User Vector if 'toppings' in preferences and preferences['toppings']: for topping in preferences['toppings']: col_name = f"Topping_{topping}" if col_name in user_vector.index: user_vector[col_name] = 1.0 # 2. Categorical Preferences (One-Hot) in User Vector # js_to_df_key_map_for_vector is same as categorical_pref_map but df_col_name is for one-hot prefix for pref_key, df_col_prefix in categorical_pref_map.items(): if df_col_prefix and pref_key in preferences and preferences[pref_key]: # df_col_prefix can be None for CRUST_TYPE_COL selected_values = preferences[pref_key] # This is a list for val_item in selected_values: # Construct the one-hot encoded column name (e.g., "Spice_Level_Mild") one_hot_col_name = f"{df_col_prefix}_{val_item}" if one_hot_col_name in user_vector.index: user_vector[one_hot_col_name] = 1.0 # 3. Numerical Preferences in User Vector raw_user_num_prefs_dict = {} spice_map_for_num_pref = {'Mild': 1.0, 'Medium': 2.0, 'Hot': 3.0} # Use floats if 'price_range' in preferences and preferences['price_range']: try: # Average of min/max price for preference raw_user_num_prefs_dict['Price'] = (float(preferences['price_range'][0]) + float(preferences['price_range'][1])) / 2 except: pass # Ignore if parsing fails if 'slices' in preferences and preferences['slices'] is not None: try: raw_user_num_prefs_dict['Slices'] = float(preferences['slices']) except: pass if 'rating' in preferences and preferences['rating'] is not None: try: raw_user_num_prefs_dict['Rating'] = float(preferences['rating']) except: pass if 'prep_time' in preferences and preferences['prep_time'] is not None: try: raw_user_num_prefs_dict['Preparation_Time'] = float(str(preferences['prep_time']).lower().replace("min","").strip()) except: pass # Numerical Spice_Level: Only if *one* spice level is selected, use its mapped value. # Otherwise, rely on the one-hot encoded spice level features. if 'spicelevel' in preferences and isinstance(preferences['spicelevel'], list) and len(preferences['spicelevel']) == 1: selected_spice = preferences['spicelevel'][0] if selected_spice in spice_map_for_num_pref: raw_user_num_prefs_dict['Spice_Level'] = spice_map_for_num_pref[selected_spice] # Scale these raw numerical preferences using the SCALER # Create a temporary DataFrame for scaling, ensuring all NUMERICAL_COLS are present temp_scaling_df = pd.DataFrame(columns=NUMERICAL_COLS, index=[0]) for col in NUMERICAL_COLS: # Default to the column's mean from FEATURE_DF if user didn't specify, # or 0 if that's also not available (shouldn't happen if SCALER is fit) # SCALER.data_min_ / SCALER.data_max_ or SCALER.mean_ could be used if available default_val = 0.0 if hasattr(SCALER, 'data_min_') and col in FEATURE_DF.columns: # Check if scaler is fit and col exists # Use the minimum of the scaled range as a neutral default if user didn't specify col_idx_in_scaler = -1 try: col_idx_in_scaler = NUMERICAL_COLS.index(col) except ValueError: pass if col_idx_in_scaler != -1 and col_idx_in_scaler < len(SCALER.data_min_): default_val = SCALER.data_min_[col_idx_in_scaler] # This is the original min, not scaled min (0) else: # Fallback if col not in NUMERICAL_COLS used for SCALER fitting logger.warning(f"Column {col} not found in SCALER's fitted columns during user vector creation. Defaulting to 0.") temp_scaling_df.loc[0, col] = raw_user_num_prefs_dict.get(col, default_val) if hasattr(SCALER, 'n_features_in_') : # Check if scaler has been fit scaled_user_num_values = SCALER.transform(temp_scaling_df[NUMERICAL_COLS])[0] for i, col_name in enumerate(NUMERICAL_COLS): if col_name in raw_user_num_prefs_dict: # Only update user_vector if user specified this preference user_vector[col_name] = scaled_user_num_values[i] else: logger.warning("SCALER is not fit. Cannot scale user's numerical preferences. Using raw values (0-1 range assumed).") for col_name in NUMERICAL_COLS: if col_name in raw_user_num_prefs_dict: # Attempt a rough normalization if scaler is not fit, assuming values are in a reasonable range # This is a fallback and might not be accurate. user_vector[col_name] = raw_user_num_prefs_dict[col_name] / 100.0 # Example, needs domain knowledge # Calculate Cosine Similarities feature_matrix_filtered = filtered_feature_df.values user_array = user_vector.values.reshape(1, -1) # Ensure shapes match if FEATURE_DF columns changed dynamically (should not happen with current setup) if user_array.shape[1] != feature_matrix_filtered.shape[1]: current_app.logger.error( f"Shape mismatch! User vector: {user_array.shape}, Feature matrix: {feature_matrix_filtered.shape}. " f"User cols: {user_vector.index.tolist()[:5]}, Feature cols: {filtered_feature_df.columns.tolist()[:5]}" ) # Attempt to align columns as a robust measure, though this indicates a deeper issue if it occurs. common_cols = filtered_feature_df.columns.intersection(user_vector.index) aligned_user_vector = pd.Series(0.0, index=filtered_feature_df.columns) aligned_user_vector[common_cols] = user_vector[common_cols] user_array = aligned_user_vector.values.reshape(1, -1) if user_array.shape[1] != feature_matrix_filtered.shape[1]: current_app.logger.critical(f"Persistent shape mismatch even after alignment. Cannot compute similarity.") return [] similarities = cosine_similarity(user_array, feature_matrix_filtered)[0] # Get indices sorted by similarity (descending) from the filtered_feature_df sorted_indices_in_filtered_df = similarities.argsort()[::-1] # Map these sorted indices back to original DF indices final_recommendation_indices = valid_indices_for_feature_df[sorted_indices_in_filtered_df] # Prepare list of recommendations recommendations_list = [] # frontend_keys and df_to_frontend_map are defined in index_route, can be reused or redefined here # For safety, redefine here or pass as argument if refactoring frontend_keys_rec = [ 'id', 'name', 'toppings', 'price', 'slices', 'serving_size', 'rating', 'rating_count', 'description', 'popular_group', 'dietary_category', 'spice_level', 'sauce_type', 'cheese_amount', 'calories', 'allergens', 'prep_time', 'restaurant', 'seasonal', 'bread_type', 'image_url', 'crust_type' ] df_to_frontend_map_rec = { 'id': None, 'name': 'Pizza_Name', 'toppings': 'Toppings', 'price': 'Price_Rs', 'slices': 'Slices', 'serving_size': 'Serving_Size', 'rating': 'Rating', 'rating_count': 'Rating_Count', 'description': 'Description', 'popular_group': 'Popular_Group', 'dietary_category': 'Dietary_Category', 'spice_level': 'Spice_Level', 'sauce_type': 'Sauce_Type', 'cheese_amount': 'Cheese_Amount', 'calories': 'Calories_per_Slice', 'allergens': 'Allergens', 'prep_time': 'Preparation_Time_min', 'restaurant': 'Restaurant_Chain', 'seasonal': 'Seasonal_Availability', 'bread_type': 'Bread_Type', 'image_url': 'Image_Url', 'crust_type': CRUST_TYPE_COL } for original_idx in final_recommendation_indices: pizza_series = DF.iloc[original_idx] rec_item = {} for key in frontend_keys_rec: df_col = df_to_frontend_map_rec.get(key) if key == 'id': rec_item[key] = int(original_idx) elif df_col and df_col in pizza_series: value = pizza_series[df_col] if isinstance(value, np.integer): value = int(value) elif isinstance(value, np.floating): value = float(value) elif isinstance(value, np.ndarray): value = value.tolist() rec_item[key] = "" if pd.isna(value) else value elif key == 'crust_type' and not CRUST_TYPE_COL : rec_item[key] = "N/A" else: rec_item[key] = "" rec_item['rating_count'] = int(rec_item.get('rating_count', 0) or 0) rec_item['image_url'] = rec_item.get('image_url') if rec_item.get('image_url') else DEFAULT_IMAGE_URL for k_final, v_final in rec_item.items(): # Final numpy type check if isinstance(v_final, np.generic): rec_item[k_final] = v_final.item() recommendations_list.append(rec_item) current_app.logger.info(f"Final recommendations count: {len(recommendations_list)}") return recommendations_list @app.route('/recommend', methods=['POST']) def recommend(): try: data = request.json preferences = {} # Store processed preferences current_app.logger.info(f"Received recommendation request with data: {data}") # Numerical/Range preferences from JS # Keys in `data` should match JS: 'slices', 'rating', 'prep_time', 'price_range' simple_numerical_prefs_js = ['slices', 'rating', 'prep_time'] for key_js in simple_numerical_prefs_js: if key_js in data and data[key_js] is not None: try: if key_js == 'rating': preferences[key_js] = float(data[key_js]) else: preferences[key_js] = int(data[key_js]) # slices, prep_time except ValueError: current_app.logger.warning(f"Could not parse numerical preference '{key_js}': {data[key_js]}") if 'price_range' in data and data['price_range']: try: preferences['price_range'] = [float(p) for p in data['price_range']] except (ValueError, TypeError): current_app.logger.warning(f"Could not parse price_range: {data['price_range']}") # Multi-select categorical preferences from JS # Keys in `data` should match JS: 'toppings', 'servingsize', 'dietarycategory', etc. multi_select_prefs_js = [ 'toppings', 'servingsize', 'populargroup', 'dietarycategory', 'spicelevel', 'saucetype', 'cheeseamount', 'restaurantchain', 'seasonalavailability', 'breadtype', 'crusttype' ] for key_js in multi_select_prefs_js: if key_js in data and isinstance(data[key_js], list): preferences[key_js] = data[key_js] # Expecting a list (can be empty for "Any") elif key_js in data: # If not a list, log warning current_app.logger.warning(f"Preference for '{key_js}' was not a list: {data[key_js]}. Treating as empty (Any).") preferences[key_js] = [] # Default to empty list if not a list current_app.logger.info(f"Processed preferences for filtering: {preferences}") recommendations = get_recommendations(preferences) current_app.logger.info(f"Returning {len(recommendations)} recommendations after filtering and scoring.") return jsonify(recommendations) except Exception as e: current_app.logger.error(f"Error in /recommend endpoint: {e}", exc_info=True) return jsonify({"error": "Failed to get recommendations due to a server issue.", "details": str(e)}), 500 # --- Main Application Execution --- # Call preprocess_data() at the module level. # This ensures it runs once when the application (or each Gunicorn worker) starts. try: logger.info("----- Starting data preprocessing at module load... -----") preprocess_data() # Use default 'pizza.csv' logger.info("----- Data preprocessing completed successfully at module load. -----") if DF is None: logger.critical("CRITICAL AT STARTUP: Global DF is None after preprocess_data(). App will likely fail.") if FEATURE_DF is None: logger.critical("CRITICAL AT STARTUP: Global FEATURE_DF is None after preprocess_data(). App will likely fail.") if SCALER is None: # SCALER should be initialized even if fitting fails logger.critical("CRITICAL AT STARTUP: Global SCALER is None after preprocess_data(). App will likely fail.") except FileNotFoundError as e: logger.critical(f"CRITICAL ERROR AT MODULE LOAD (FileNotFoundError): {e}. Ensure 'pizza.csv' is in the /app directory (or same dir as app.py).") # In a production Gunicorn setup, the app might still try to start, leading to errors in routes. # For Hugging Face, it's better to log and let it attempt to run, as exiting might obscure logs. except Exception as e: logger.critical(f"Unexpected critical startup error during preprocessing at module load: {e}", exc_info=True) if __name__ == '__main__': # This block is primarily for local development using `python app.py`. # preprocess_data() is already called above when the module is imported by Python interpreter. logger.info("----- Running Flask app directly (e.g., python app.py) -----") # Sanity check for local run, though globals should be set by the module-level call. if DF is None or FEATURE_DF is None or SCALER is None: logger.warning("One or more global data variables (DF, FEATURE_DF, SCALER) are None before local app.run(). This is unexpected if module-level preprocessing ran.") # Optionally, re-run preprocessing if critical for local dev and something went wrong with module-level load # logger.info("Attempting to re-run preprocess_data() for local development.") # preprocess_data() app.run(debug=True, host='0.0.0.0', port=7860, use_reloader=False) # use_reloader=False is generally better when you have global state initialized at module level. # If True, it might re-initialize globals on each reload, which can be slow.