|
from flask import Flask, render_template, request, jsonify, current_app |
|
import pandas as pd |
|
import numpy as np |
|
from sklearn.preprocessing import MinMaxScaler |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import os |
|
import logging |
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]') |
|
logger = logging.getLogger(__name__) |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
DF = None |
|
ALL_TOPPINGS = [] |
|
FEATURE_DF = None |
|
SCALER = None |
|
NUMERICAL_COLS = ['Price', 'Slices', 'Rating', 'Spice_Level', 'Preparation_Time', 'Calories'] |
|
CATEGORICAL_FEATURES = [ |
|
'Serving_Size', 'Popular_Group', 'Dietary_Category', |
|
'Sauce_Type', 'Cheese_Amount', 'Restaurant_Chain', |
|
'Seasonal_Availability', 'Bread_Type' |
|
] |
|
CRUST_TYPE_COL = None |
|
DEFAULT_IMAGE_URL = 'https://images.dominos.co.in/new_margherita_2502.jpg' |
|
|
|
|
|
def preprocess_data(df_path='pizza.csv'): |
|
global DF, ALL_TOPPINGS, FEATURE_DF, SCALER, CATEGORICAL_FEATURES, CRUST_TYPE_COL |
|
logger.info(f"Attempting to preprocess data from relative path: {df_path}") |
|
|
|
|
|
|
|
base_dir = os.path.dirname(os.path.abspath(__file__)) |
|
absolute_df_path = os.path.join(base_dir, df_path) |
|
logger.info(f"Absolute path for CSV: {absolute_df_path}") |
|
|
|
if not os.path.exists(absolute_df_path): |
|
logger.error(f"Dataset file '{absolute_df_path}' not found.") |
|
raise FileNotFoundError(f"Dataset file '{absolute_df_path}' not found. Ensure it's in the same directory as app.py.") |
|
|
|
DF = pd.read_csv(absolute_df_path) |
|
logger.info(f"Successfully loaded '{absolute_df_path}'. Original DataFrame shape: {DF.shape}") |
|
logger.info(f"Original DataFrame columns: {DF.columns.tolist()}") |
|
|
|
|
|
potential_crust_cols = ['Crust_Type', 'Cr_Type'] |
|
valid_crust_cols = [col for col in potential_crust_cols if col in DF.columns] |
|
if valid_crust_cols: |
|
valid_crust_cols.sort(key=lambda col: DF[col].isnull().sum()) |
|
CRUST_TYPE_COL = valid_crust_cols[0] |
|
logger.info(f"Using '{CRUST_TYPE_COL}' for crust type.") |
|
if CRUST_TYPE_COL not in CATEGORICAL_FEATURES: |
|
CATEGORICAL_FEATURES.append(CRUST_TYPE_COL) |
|
|
|
for col in potential_crust_cols: |
|
if col != CRUST_TYPE_COL and col in CATEGORICAL_FEATURES: |
|
CATEGORICAL_FEATURES.remove(col) |
|
else: |
|
logger.warning("Crust type column (Crust_Type or Cr_Type) not found. Crust type will not be used.") |
|
CRUST_TYPE_COL = None |
|
|
|
|
|
text_cols_to_fill = list(set(CATEGORICAL_FEATURES + ['Toppings', 'Description', 'Allergens', 'Image_Url', 'Pizza_Name'])) |
|
for col in text_cols_to_fill: |
|
if col and col in DF.columns: |
|
DF[col] = DF[col].fillna('') |
|
logger.info("Filled NaNs in text-based categorical columns with empty strings.") |
|
|
|
|
|
numerical_cols_in_df = ['Price_Rs', 'Slices', 'Rating', 'Rating_Count', 'Preparation_Time_min', 'Calories_per_Slice'] |
|
for col in numerical_cols_in_df: |
|
if col in DF.columns: |
|
if pd.api.types.is_numeric_dtype(DF[col]): |
|
median_val = DF[col].median() |
|
DF[col] = DF[col].fillna(median_val) |
|
logger.info(f"Filled NaNs in numerical column '{col}' with its median ({median_val}).") |
|
else: |
|
|
|
numeric_series = pd.to_numeric(DF[col], errors='coerce') |
|
median_val = 0 |
|
if not numeric_series.isnull().all(): |
|
median_val = numeric_series.median() |
|
DF[col] = numeric_series.fillna(median_val) |
|
logger.warning(f"Column '{col}' was not purely numeric. Converted to numeric, filled NaNs with median/0 ({median_val}).") |
|
else: |
|
logger.warning(f"Expected numerical column '{col}' not found in DataFrame. It will be missing from features if not handled.") |
|
|
|
|
|
if 'Rating_Count' in DF.columns: |
|
DF['Rating_Count'] = DF['Rating_Count'].fillna(0).astype(int) |
|
|
|
|
|
if 'Toppings' in DF.columns: |
|
DF['Toppings_list_internal'] = DF['Toppings'].astype(str).str.split(r';\s*') |
|
DF['Toppings_list_internal'] = DF['Toppings_list_internal'].apply( |
|
lambda x: [t.strip() for t in x if isinstance(t, str) and t.strip()]) |
|
current_all_toppings = set() |
|
for toppings_list in DF['Toppings_list_internal'].dropna(): |
|
current_all_toppings.update(t for t in toppings_list if t) |
|
ALL_TOPPINGS = sorted(list(current_all_toppings)) |
|
logger.info(f"Found {len(ALL_TOPPINGS)} unique toppings. Example: {ALL_TOPPINGS[:5] if ALL_TOPPINGS else 'None'}") |
|
else: |
|
logger.warning("'Toppings' column not found. Topping features will be empty.") |
|
DF['Toppings_list_internal'] = pd.Series([[] for _ in range(len(DF))]) |
|
ALL_TOPPINGS = [] |
|
|
|
|
|
|
|
feature_data = {} |
|
num_feature_map = { |
|
'Price': 'Price_Rs', 'Slices': 'Slices', 'Rating': 'Rating', |
|
'Preparation_Time': 'Preparation_Time_min', 'Calories': 'Calories_per_Slice' |
|
} |
|
for feature_col, df_col in num_feature_map.items(): |
|
if df_col in DF.columns: |
|
feature_data[feature_col] = DF[df_col].copy() |
|
else: |
|
logger.warning(f"Numerical source column '{df_col}' for feature '{feature_col}' not found. Filling with zeros.") |
|
feature_data[feature_col] = pd.Series([0.0] * len(DF)) |
|
|
|
|
|
if 'Spice_Level' in DF.columns: |
|
DF['Spice_Level'] = DF['Spice_Level'].fillna('Mild') |
|
spice_map = {'Mild': 1, 'Medium': 2, 'Hot': 3} |
|
feature_data['Spice_Level'] = DF['Spice_Level'].map(spice_map).fillna(1.0) |
|
else: |
|
logger.warning("'Spice_Level' column not found. Filling 'Spice_Level' feature with default (1.0).") |
|
feature_data['Spice_Level'] = pd.Series([1.0] * len(DF)) |
|
|
|
|
|
for feature_cat_col in CATEGORICAL_FEATURES: |
|
if feature_cat_col and feature_cat_col in DF.columns: |
|
|
|
DF[feature_cat_col] = DF[feature_cat_col].astype(str) |
|
for value in DF[feature_cat_col].unique(): |
|
if pd.notnull(value) and value.strip() != '': |
|
feature_data[f"{feature_cat_col}_{value}"] = (DF[feature_cat_col] == value).astype(int) |
|
elif feature_cat_col: |
|
logger.warning(f"Categorical source column '{feature_cat_col}' for one-hot encoding not found in DataFrame.") |
|
|
|
|
|
for topping in ALL_TOPPINGS: |
|
if topping: |
|
feature_data[f"Topping_{topping}"] = DF['Toppings_list_internal'].apply( |
|
lambda x: 1 if topping in x else 0 |
|
) |
|
|
|
FEATURE_DF = pd.DataFrame(feature_data) |
|
logger.info(f"FEATURE_DF created. Shape: {FEATURE_DF.shape}. Columns: {FEATURE_DF.columns.tolist()[:10]}...") |
|
|
|
|
|
for col in NUMERICAL_COLS: |
|
if col not in FEATURE_DF.columns: |
|
logger.warning(f"Numerical column '{col}' is missing from FEATURE_DF after construction. Adding as zeros.") |
|
FEATURE_DF[col] = 0.0 |
|
if FEATURE_DF[col].isnull().any(): |
|
mean_val = FEATURE_DF[col].mean() |
|
fill_val = mean_val if pd.notna(mean_val) else 0.0 |
|
logger.info(f"Filling NaNs in numerical feature column '{col}' with {fill_val}.") |
|
FEATURE_DF[col] = FEATURE_DF[col].fillna(fill_val) |
|
|
|
|
|
SCALER = MinMaxScaler() |
|
if not FEATURE_DF.empty and all(col in FEATURE_DF.columns for col in NUMERICAL_COLS): |
|
try: |
|
FEATURE_DF[NUMERICAL_COLS] = SCALER.fit_transform(FEATURE_DF[NUMERICAL_COLS]) |
|
logger.info(f"Numerical columns ({NUMERICAL_COLS}) scaled. FEATURE_DF shape: {FEATURE_DF.shape}") |
|
except Exception as e: |
|
logger.error(f"Error during scaling of numerical columns: {e}. FEATURE_DF might be problematic.") |
|
|
|
elif FEATURE_DF.empty: |
|
logger.error("FEATURE_DF is empty before scaling. Scaling skipped. This will likely cause issues.") |
|
else: |
|
missing_cols = [col for col in NUMERICAL_COLS if col not in FEATURE_DF.columns] |
|
logger.error(f"Not all numerical columns ({NUMERICAL_COLS}) found in FEATURE_DF for scaling. Missing: {missing_cols}. Scaling skipped.") |
|
|
|
logger.info(f"Preprocessing done. DF is None: {DF is None}, FEATURE_DF is None: {FEATURE_DF is None}, SCALER is None: {SCALER is None}") |
|
if FEATURE_DF is not None: |
|
logger.info(f"Final FEATURE_DF shape: {FEATURE_DF.shape}") |
|
if DF is not None: |
|
logger.info(f"Final DF shape: {DF.shape}") |
|
|
|
|
|
@app.route('/') |
|
def index_route(): |
|
global DF, ALL_TOPPINGS, CATEGORICAL_FEATURES, CRUST_TYPE_COL, FEATURE_DF, DEFAULT_IMAGE_URL |
|
|
|
if DF is None: |
|
current_app.logger.error("DF is None when trying to serve '/'. Data preprocessing might have failed or not run.") |
|
return "Error: Pizza data (DF) not loaded. Please check server logs.", 500 |
|
if FEATURE_DF is None: |
|
current_app.logger.error("FEATURE_DF is None when trying to serve '/'. Data preprocessing might have failed.") |
|
return "Error: Pizza feature data (FEATURE_DF) not loaded. Please check server logs.", 500 |
|
|
|
filter_options = {} |
|
|
|
cols_for_filters_set = set(cat_col for cat_col in CATEGORICAL_FEATURES if cat_col and cat_col in DF.columns) |
|
if 'Spice_Level' in DF.columns: |
|
cols_for_filters_set.add('Spice_Level') |
|
|
|
|
|
for col_name in list(cols_for_filters_set): |
|
|
|
key_name = col_name.lower().replace('_', '') |
|
|
|
|
|
unique_values = sorted([v for v in DF[col_name].astype(str).dropna().unique() if v.strip() != '']) |
|
if unique_values: |
|
filter_options[key_name] = unique_values |
|
|
|
|
|
|
|
if 'Rating' in DF.columns: |
|
default_recommendations_df = DF.sort_values('Rating', ascending=False).copy() |
|
else: |
|
logger.warning("'Rating' column not found in DF. Cannot sort for default recommendations. Using unsorted DF.") |
|
default_recommendations_df = DF.copy() |
|
|
|
default_recs_list = [] |
|
frontend_keys = [ |
|
'id', 'name', 'toppings', 'price', 'slices', 'serving_size', 'rating', 'rating_count', |
|
'description', 'popular_group', 'dietary_category', 'spice_level', 'sauce_type', |
|
'cheese_amount', 'calories', 'allergens', 'prep_time', 'restaurant', 'seasonal', |
|
'bread_type', 'image_url', 'crust_type' |
|
] |
|
df_to_frontend_map = { |
|
'id': None, 'name': 'Pizza_Name', 'toppings': 'Toppings', 'price': 'Price_Rs', 'slices': 'Slices', |
|
'serving_size': 'Serving_Size', 'rating': 'Rating', 'rating_count': 'Rating_Count', |
|
'description': 'Description', 'popular_group': 'Popular_Group', |
|
'dietary_category': 'Dietary_Category', 'spice_level': 'Spice_Level', |
|
'sauce_type': 'Sauce_Type', 'cheese_amount': 'Cheese_Amount', |
|
'calories': 'Calories_per_Slice', 'allergens': 'Allergens', |
|
'prep_time': 'Preparation_Time_min', 'restaurant': 'Restaurant_Chain', |
|
'seasonal': 'Seasonal_Availability', 'bread_type': 'Bread_Type', |
|
'image_url': 'Image_Url', 'crust_type': CRUST_TYPE_COL |
|
} |
|
|
|
for original_idx, pizza_row in default_recommendations_df.iterrows(): |
|
rec_item = {} |
|
for key in frontend_keys: |
|
df_col = df_to_frontend_map.get(key) |
|
if key == 'id': |
|
rec_item[key] = int(original_idx) |
|
elif df_col and df_col in pizza_row: |
|
value = pizza_row[df_col] |
|
|
|
if isinstance(value, np.integer): value = int(value) |
|
elif isinstance(value, np.floating): value = float(value) |
|
elif isinstance(value, np.ndarray): value = value.tolist() |
|
rec_item[key] = "" if pd.isna(value) else value |
|
elif key == 'crust_type' and not CRUST_TYPE_COL : |
|
rec_item[key] = "N/A" |
|
else: |
|
rec_item[key] = "" |
|
|
|
rec_item['rating_count'] = int(rec_item.get('rating_count', 0) or 0) |
|
rec_item['image_url'] = rec_item.get('image_url') if rec_item.get('image_url') else DEFAULT_IMAGE_URL |
|
|
|
|
|
for k_final, v_final in rec_item.items(): |
|
if isinstance(v_final, np.generic): rec_item[k_final] = v_final.item() |
|
default_recs_list.append(rec_item) |
|
|
|
current_app.logger.info(f"Serving {len(default_recs_list)} pizzas for initial display.") |
|
current_app.logger.info(f"Filter options for template: {filter_options}") |
|
current_app.logger.info(f"ALL_TOPPINGS for template: {ALL_TOPPINGS[:5] if ALL_TOPPINGS else 'None'}") |
|
|
|
|
|
return render_template('index.html', |
|
toppings=ALL_TOPPINGS, |
|
filter_options=filter_options, |
|
default_recommendations=default_recs_list, |
|
default_image_url=DEFAULT_IMAGE_URL) |
|
|
|
|
|
def get_recommendations(preferences): |
|
global DF, FEATURE_DF, SCALER, CRUST_TYPE_COL, DEFAULT_IMAGE_URL |
|
|
|
if DF is None or FEATURE_DF is None or SCALER is None: |
|
current_app.logger.error("Data not fully initialized (DF, FEATURE_DF, or SCALER is None) for get_recommendations.") |
|
return [] |
|
|
|
current_indices = DF.index.to_list() |
|
current_app.logger.info(f"Starting with {len(current_indices)} pizzas before filtering. Preferences: {preferences}") |
|
|
|
|
|
|
|
if 'toppings' in preferences and preferences['toppings'] and 'Toppings_list_internal' in DF.columns: |
|
selected_toppings = set(preferences['toppings']) |
|
if selected_toppings: |
|
topping_mask = DF.loc[current_indices, 'Toppings_list_internal'].apply( |
|
lambda x_toppings: isinstance(x_toppings, list) and any(t in selected_toppings for t in x_toppings) |
|
) |
|
current_indices = DF.loc[current_indices][topping_mask].index.to_list() |
|
current_app.logger.info(f"After toppings filter: {len(current_indices)} pizzas remaining") |
|
if not current_indices: return [] |
|
|
|
|
|
if 'price_range' in preferences and preferences['price_range'] and 'Price_Rs' in DF.columns: |
|
try: |
|
min_price = float(preferences['price_range'][0]) |
|
max_price = float(preferences['price_range'][1]) |
|
price_mask = (DF.loc[current_indices, 'Price_Rs'] >= min_price) & \ |
|
(DF.loc[current_indices, 'Price_Rs'] <= max_price) |
|
current_indices = DF.loc[current_indices][price_mask].index.to_list() |
|
current_app.logger.info(f"After price filter ({min_price}-{max_price}): {len(current_indices)} pizzas") |
|
if not current_indices: return [] |
|
except (TypeError, ValueError, IndexError) as e: |
|
current_app.logger.warning(f"Invalid price_range preference: {preferences['price_range']}. Error: {e}") |
|
|
|
|
|
|
|
if 'slices' in preferences and preferences['slices'] is not None and 'Slices' in DF.columns: |
|
try: |
|
min_slices = int(preferences['slices']) |
|
slices_mask = DF.loc[current_indices, 'Slices'] >= min_slices |
|
current_indices = DF.loc[current_indices][slices_mask].index.to_list() |
|
current_app.logger.info(f"After slices filter (>= {min_slices}): {len(current_indices)} pizzas") |
|
if not current_indices: return [] |
|
except ValueError: |
|
current_app.logger.warning(f"Invalid value for slices: {preferences['slices']}") |
|
|
|
|
|
if 'rating' in preferences and preferences['rating'] is not None and 'Rating' in DF.columns: |
|
try: |
|
min_rating = float(preferences['rating']) |
|
rating_mask = DF.loc[current_indices, 'Rating'] >= min_rating |
|
current_indices = DF.loc[current_indices][rating_mask].index.to_list() |
|
current_app.logger.info(f"After rating filter (>= {min_rating}): {len(current_indices)} pizzas") |
|
if not current_indices: return [] |
|
except ValueError: |
|
current_app.logger.warning(f"Invalid value for rating: {preferences['rating']}") |
|
|
|
|
|
if 'prep_time' in preferences and preferences['prep_time'] is not None and 'Preparation_Time_min' in DF.columns: |
|
try: |
|
max_prep_time = int(str(preferences['prep_time']).lower().replace("min", "").strip()) |
|
prep_mask = DF.loc[current_indices, 'Preparation_Time_min'] <= max_prep_time |
|
current_indices = DF.loc[current_indices][prep_mask].index.to_list() |
|
current_app.logger.info(f"After prep time filter (<= {max_prep_time}): {len(current_indices)} pizzas") |
|
if not current_indices: return [] |
|
except ValueError: |
|
current_app.logger.warning(f"Could not parse prep_time value: {preferences['prep_time']}") |
|
|
|
|
|
|
|
categorical_pref_map = { |
|
"servingsize": "Serving_Size", "populargroup": "Popular_Group", |
|
"dietarycategory": "Dietary_Category", "spicelevel": "Spice_Level", |
|
"saucetype": "Sauce_Type", "cheeseamount": "Cheese_Amount", |
|
"restaurantchain": "Restaurant_Chain", "seasonalavailability": "Seasonal_Availability", |
|
"breadtype": "Bread_Type", "crusttype": CRUST_TYPE_COL |
|
} |
|
for pref_key, df_col_name in categorical_pref_map.items(): |
|
if df_col_name and pref_key in preferences and preferences[pref_key]: |
|
pref_value_list = preferences[pref_key] |
|
if isinstance(pref_value_list, list) and pref_value_list: |
|
if df_col_name in DF.columns: |
|
cat_mask = DF.loc[current_indices, df_col_name].isin(pref_value_list) |
|
current_indices = DF.loc[current_indices][cat_mask].index.to_list() |
|
current_app.logger.info(f"After {pref_key} filter (isin {pref_value_list}): {len(current_indices)} pizzas") |
|
if not current_indices: return [] |
|
else: |
|
current_app.logger.warning(f"Column '{df_col_name}' for preference '{pref_key}' not found in DF. Filter skipped.") |
|
|
|
|
|
if not current_indices: |
|
current_app.logger.info("No pizzas match all hard filter criteria.") |
|
return [] |
|
|
|
|
|
|
|
valid_indices_for_feature_df = FEATURE_DF.index.intersection(current_indices) |
|
if valid_indices_for_feature_df.empty: |
|
current_app.logger.info("No valid indices remain for FEATURE_DF after hard filters.") |
|
return [] |
|
|
|
filtered_feature_df = FEATURE_DF.loc[valid_indices_for_feature_df] |
|
if filtered_feature_df.empty: |
|
current_app.logger.warning("Filtered FEATURE_DF is empty. This is unexpected.") |
|
return [] |
|
|
|
|
|
user_vector = pd.Series(0.0, index=FEATURE_DF.columns) |
|
|
|
|
|
if 'toppings' in preferences and preferences['toppings']: |
|
for topping in preferences['toppings']: |
|
col_name = f"Topping_{topping}" |
|
if col_name in user_vector.index: |
|
user_vector[col_name] = 1.0 |
|
|
|
|
|
|
|
for pref_key, df_col_prefix in categorical_pref_map.items(): |
|
if df_col_prefix and pref_key in preferences and preferences[pref_key]: |
|
selected_values = preferences[pref_key] |
|
for val_item in selected_values: |
|
|
|
one_hot_col_name = f"{df_col_prefix}_{val_item}" |
|
if one_hot_col_name in user_vector.index: |
|
user_vector[one_hot_col_name] = 1.0 |
|
|
|
|
|
raw_user_num_prefs_dict = {} |
|
spice_map_for_num_pref = {'Mild': 1.0, 'Medium': 2.0, 'Hot': 3.0} |
|
|
|
if 'price_range' in preferences and preferences['price_range']: |
|
try: |
|
raw_user_num_prefs_dict['Price'] = (float(preferences['price_range'][0]) + float(preferences['price_range'][1])) / 2 |
|
except: pass |
|
if 'slices' in preferences and preferences['slices'] is not None: |
|
try: raw_user_num_prefs_dict['Slices'] = float(preferences['slices']) |
|
except: pass |
|
if 'rating' in preferences and preferences['rating'] is not None: |
|
try: raw_user_num_prefs_dict['Rating'] = float(preferences['rating']) |
|
except: pass |
|
if 'prep_time' in preferences and preferences['prep_time'] is not None: |
|
try: raw_user_num_prefs_dict['Preparation_Time'] = float(str(preferences['prep_time']).lower().replace("min","").strip()) |
|
except: pass |
|
|
|
|
|
if 'spicelevel' in preferences and isinstance(preferences['spicelevel'], list) and len(preferences['spicelevel']) == 1: |
|
selected_spice = preferences['spicelevel'][0] |
|
if selected_spice in spice_map_for_num_pref: |
|
raw_user_num_prefs_dict['Spice_Level'] = spice_map_for_num_pref[selected_spice] |
|
|
|
|
|
|
|
temp_scaling_df = pd.DataFrame(columns=NUMERICAL_COLS, index=[0]) |
|
for col in NUMERICAL_COLS: |
|
|
|
|
|
|
|
default_val = 0.0 |
|
if hasattr(SCALER, 'data_min_') and col in FEATURE_DF.columns: |
|
|
|
col_idx_in_scaler = -1 |
|
try: col_idx_in_scaler = NUMERICAL_COLS.index(col) |
|
except ValueError: pass |
|
|
|
if col_idx_in_scaler != -1 and col_idx_in_scaler < len(SCALER.data_min_): |
|
default_val = SCALER.data_min_[col_idx_in_scaler] |
|
else: |
|
logger.warning(f"Column {col} not found in SCALER's fitted columns during user vector creation. Defaulting to 0.") |
|
|
|
temp_scaling_df.loc[0, col] = raw_user_num_prefs_dict.get(col, default_val) |
|
|
|
|
|
if hasattr(SCALER, 'n_features_in_') : |
|
scaled_user_num_values = SCALER.transform(temp_scaling_df[NUMERICAL_COLS])[0] |
|
for i, col_name in enumerate(NUMERICAL_COLS): |
|
if col_name in raw_user_num_prefs_dict: |
|
user_vector[col_name] = scaled_user_num_values[i] |
|
else: |
|
logger.warning("SCALER is not fit. Cannot scale user's numerical preferences. Using raw values (0-1 range assumed).") |
|
for col_name in NUMERICAL_COLS: |
|
if col_name in raw_user_num_prefs_dict: |
|
|
|
|
|
user_vector[col_name] = raw_user_num_prefs_dict[col_name] / 100.0 |
|
|
|
|
|
|
|
feature_matrix_filtered = filtered_feature_df.values |
|
user_array = user_vector.values.reshape(1, -1) |
|
|
|
|
|
if user_array.shape[1] != feature_matrix_filtered.shape[1]: |
|
current_app.logger.error( |
|
f"Shape mismatch! User vector: {user_array.shape}, Feature matrix: {feature_matrix_filtered.shape}. " |
|
f"User cols: {user_vector.index.tolist()[:5]}, Feature cols: {filtered_feature_df.columns.tolist()[:5]}" |
|
) |
|
|
|
common_cols = filtered_feature_df.columns.intersection(user_vector.index) |
|
aligned_user_vector = pd.Series(0.0, index=filtered_feature_df.columns) |
|
aligned_user_vector[common_cols] = user_vector[common_cols] |
|
user_array = aligned_user_vector.values.reshape(1, -1) |
|
|
|
if user_array.shape[1] != feature_matrix_filtered.shape[1]: |
|
current_app.logger.critical(f"Persistent shape mismatch even after alignment. Cannot compute similarity.") |
|
return [] |
|
|
|
|
|
similarities = cosine_similarity(user_array, feature_matrix_filtered)[0] |
|
|
|
sorted_indices_in_filtered_df = similarities.argsort()[::-1] |
|
|
|
final_recommendation_indices = valid_indices_for_feature_df[sorted_indices_in_filtered_df] |
|
|
|
|
|
recommendations_list = [] |
|
|
|
|
|
frontend_keys_rec = [ |
|
'id', 'name', 'toppings', 'price', 'slices', 'serving_size', 'rating', 'rating_count', |
|
'description', 'popular_group', 'dietary_category', 'spice_level', 'sauce_type', |
|
'cheese_amount', 'calories', 'allergens', 'prep_time', 'restaurant', 'seasonal', |
|
'bread_type', 'image_url', 'crust_type' |
|
] |
|
df_to_frontend_map_rec = { |
|
'id': None, 'name': 'Pizza_Name', 'toppings': 'Toppings', 'price': 'Price_Rs', 'slices': 'Slices', |
|
'serving_size': 'Serving_Size', 'rating': 'Rating', 'rating_count': 'Rating_Count', |
|
'description': 'Description', 'popular_group': 'Popular_Group', |
|
'dietary_category': 'Dietary_Category', 'spice_level': 'Spice_Level', |
|
'sauce_type': 'Sauce_Type', 'cheese_amount': 'Cheese_Amount', |
|
'calories': 'Calories_per_Slice', 'allergens': 'Allergens', |
|
'prep_time': 'Preparation_Time_min', 'restaurant': 'Restaurant_Chain', |
|
'seasonal': 'Seasonal_Availability', 'bread_type': 'Bread_Type', |
|
'image_url': 'Image_Url', 'crust_type': CRUST_TYPE_COL |
|
} |
|
|
|
for original_idx in final_recommendation_indices: |
|
pizza_series = DF.iloc[original_idx] |
|
rec_item = {} |
|
for key in frontend_keys_rec: |
|
df_col = df_to_frontend_map_rec.get(key) |
|
if key == 'id': |
|
rec_item[key] = int(original_idx) |
|
elif df_col and df_col in pizza_series: |
|
value = pizza_series[df_col] |
|
if isinstance(value, np.integer): value = int(value) |
|
elif isinstance(value, np.floating): value = float(value) |
|
elif isinstance(value, np.ndarray): value = value.tolist() |
|
rec_item[key] = "" if pd.isna(value) else value |
|
elif key == 'crust_type' and not CRUST_TYPE_COL : |
|
rec_item[key] = "N/A" |
|
else: |
|
rec_item[key] = "" |
|
|
|
rec_item['rating_count'] = int(rec_item.get('rating_count', 0) or 0) |
|
rec_item['image_url'] = rec_item.get('image_url') if rec_item.get('image_url') else DEFAULT_IMAGE_URL |
|
for k_final, v_final in rec_item.items(): |
|
if isinstance(v_final, np.generic): rec_item[k_final] = v_final.item() |
|
recommendations_list.append(rec_item) |
|
|
|
current_app.logger.info(f"Final recommendations count: {len(recommendations_list)}") |
|
return recommendations_list |
|
|
|
|
|
@app.route('/recommend', methods=['POST']) |
|
def recommend(): |
|
try: |
|
data = request.json |
|
preferences = {} |
|
current_app.logger.info(f"Received recommendation request with data: {data}") |
|
|
|
|
|
|
|
simple_numerical_prefs_js = ['slices', 'rating', 'prep_time'] |
|
for key_js in simple_numerical_prefs_js: |
|
if key_js in data and data[key_js] is not None: |
|
try: |
|
if key_js == 'rating': preferences[key_js] = float(data[key_js]) |
|
else: preferences[key_js] = int(data[key_js]) |
|
except ValueError: |
|
current_app.logger.warning(f"Could not parse numerical preference '{key_js}': {data[key_js]}") |
|
|
|
if 'price_range' in data and data['price_range']: |
|
try: |
|
preferences['price_range'] = [float(p) for p in data['price_range']] |
|
except (ValueError, TypeError): |
|
current_app.logger.warning(f"Could not parse price_range: {data['price_range']}") |
|
|
|
|
|
|
|
multi_select_prefs_js = [ |
|
'toppings', 'servingsize', 'populargroup', 'dietarycategory', |
|
'spicelevel', 'saucetype', 'cheeseamount', 'restaurantchain', |
|
'seasonalavailability', 'breadtype', 'crusttype' |
|
] |
|
for key_js in multi_select_prefs_js: |
|
if key_js in data and isinstance(data[key_js], list): |
|
preferences[key_js] = data[key_js] |
|
elif key_js in data: |
|
current_app.logger.warning(f"Preference for '{key_js}' was not a list: {data[key_js]}. Treating as empty (Any).") |
|
preferences[key_js] = [] |
|
|
|
current_app.logger.info(f"Processed preferences for filtering: {preferences}") |
|
recommendations = get_recommendations(preferences) |
|
current_app.logger.info(f"Returning {len(recommendations)} recommendations after filtering and scoring.") |
|
return jsonify(recommendations) |
|
|
|
except Exception as e: |
|
current_app.logger.error(f"Error in /recommend endpoint: {e}", exc_info=True) |
|
return jsonify({"error": "Failed to get recommendations due to a server issue.", "details": str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
logger.info("----- Starting data preprocessing at module load... -----") |
|
preprocess_data() |
|
logger.info("----- Data preprocessing completed successfully at module load. -----") |
|
if DF is None: |
|
logger.critical("CRITICAL AT STARTUP: Global DF is None after preprocess_data(). App will likely fail.") |
|
if FEATURE_DF is None: |
|
logger.critical("CRITICAL AT STARTUP: Global FEATURE_DF is None after preprocess_data(). App will likely fail.") |
|
if SCALER is None: |
|
logger.critical("CRITICAL AT STARTUP: Global SCALER is None after preprocess_data(). App will likely fail.") |
|
|
|
except FileNotFoundError as e: |
|
logger.critical(f"CRITICAL ERROR AT MODULE LOAD (FileNotFoundError): {e}. Ensure 'pizza.csv' is in the /app directory (or same dir as app.py).") |
|
|
|
|
|
except Exception as e: |
|
logger.critical(f"Unexpected critical startup error during preprocessing at module load: {e}", exc_info=True) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
logger.info("----- Running Flask app directly (e.g., python app.py) -----") |
|
|
|
if DF is None or FEATURE_DF is None or SCALER is None: |
|
logger.warning("One or more global data variables (DF, FEATURE_DF, SCALER) are None before local app.run(). This is unexpected if module-level preprocessing ran.") |
|
|
|
|
|
|
|
|
|
app.run(debug=True, host='0.0.0.0', port=7860, use_reloader=False) |
|
|
|
|