Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import plotly.express as px | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import lightgbm as lgb | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
# Page configuration | |
st.set_page_config(page_title="Customer Insights App", page_icon=":bar_chart:") | |
# Load CSV files at the top, only once | |
df = pd.read_csv("df_clean.csv") | |
nombres_proveedores = pd.read_csv("nombres_proveedores.csv", sep=';') | |
euros_proveedor = pd.read_csv("euros_proveedor.csv", sep=',') | |
ventas_clientes = pd.read_csv("ventas_clientes.csv", sep=',') | |
customer_clusters = pd.read_csv('predicts/customer_clusters.csv') # Load the customer clusters here | |
# Ensure customer codes are strings | |
df['CLIENTE'] = df['CLIENTE'].astype(str) | |
nombres_proveedores['codigo'] = nombres_proveedores['codigo'].astype(str) | |
euros_proveedor['CLIENTE'] = euros_proveedor['CLIENTE'].astype(str) | |
customer_clusters['cliente_id'] = customer_clusters['cliente_id'].astype(str) # Ensure customer IDs are strings | |
fieles_df = pd.read_csv("clientes_relevantes.csv") | |
cestas = pd.read_csv("cestas.csv") | |
productos = pd.read_csv("productos.csv") | |
# Convert all columns except 'CLIENTE' to float in euros_proveedor | |
for col in euros_proveedor.columns: | |
if col != 'CLIENTE': | |
euros_proveedor[col] = pd.to_numeric(euros_proveedor[col], errors='coerce') | |
# Check for NaN values after conversion | |
if euros_proveedor.isna().any().any(): | |
st.warning("Some values in euros_proveedor couldn't be converted to numbers. Please review the input data.") | |
# Ignore the last two columns of df | |
df = df.iloc[:, :-2] | |
# Function to get supplier name | |
def get_supplier_name(code): | |
code = str(code) # Ensure code is a string | |
name = nombres_proveedores[nombres_proveedores['codigo'] == code]['nombre'].values | |
return name[0] if len(name) > 0 else code | |
# Function to create radar chart with square root transformation | |
def radar_chart(categories, values, amounts, title): | |
N = len(categories) | |
angles = [n / float(N) * 2 * np.pi for n in range(N)] | |
angles += angles[:1] | |
fig, ax = plt.subplots(figsize=(12, 12), subplot_kw=dict(projection='polar')) | |
# Apply square root transformation | |
sqrt_values = np.sqrt(values) | |
sqrt_amounts = np.sqrt(amounts) | |
max_sqrt_value = max(sqrt_values) | |
normalized_values = [v / max_sqrt_value for v in sqrt_values] | |
# Adjust scaling for spend values | |
max_sqrt_amount = max(sqrt_amounts) | |
scaling_factor = 0.7 # Adjust this value to control how much the spend values are scaled up | |
normalized_amounts = [min((a / max_sqrt_amount) * scaling_factor, 1.0) for a in sqrt_amounts] | |
normalized_values += normalized_values[:1] | |
ax.plot(angles, normalized_values, 'o-', linewidth=2, color='#FF69B4', label='% Units (sqrt)') | |
ax.fill(angles, normalized_values, alpha=0.25, color='#FF69B4') | |
normalized_amounts += normalized_amounts[:1] | |
ax.plot(angles, normalized_amounts, 'o-', linewidth=2, color='#4B0082', label='% Spend (sqrt)') | |
ax.fill(angles, normalized_amounts, alpha=0.25, color='#4B0082') | |
ax.set_xticks(angles[:-1]) | |
ax.set_xticklabels(categories, size=8, wrap=True) | |
ax.set_ylim(0, 1) | |
circles = np.linspace(0, 1, 5) | |
for circle in circles: | |
ax.plot(angles, [circle]*len(angles), '--', color='gray', alpha=0.3, linewidth=0.5) | |
ax.set_yticklabels([]) | |
ax.spines['polar'].set_visible(False) | |
plt.title(title, size=16, y=1.1) | |
plt.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1)) | |
return fig | |
# Main page design | |
st.title("Welcome to Customer Insights App") | |
st.markdown(""" | |
This app helps businesses analyze customer behaviors and provide personalized recommendations based on purchase history. | |
Use the tools below to dive deeper into your customer data. | |
""") | |
# Navigation menu | |
page = st.selectbox("Select the tool you want to use", ["", "Customer Analysis", "Articles Recommendations"]) | |
# Home Page | |
if page == "": | |
st.markdown("## Welcome to the Customer Insights App") | |
st.write("Use the dropdown menu to navigate between the different sections.") | |
# Customer Analysis Page | |
elif page == "Customer Analysis": | |
st.title("Customer Analysis") | |
st.markdown("Use the tools below to explore your customer data.") | |
partial_code = st.text_input("Enter part of Customer Code (or leave empty to see all)") | |
if partial_code: | |
filtered_customers = df[df['CLIENTE'].str.contains(partial_code)] | |
else: | |
filtered_customers = df | |
customer_list = filtered_customers['CLIENTE'].unique() | |
customer_code = st.selectbox("Select Customer Code", customer_list) | |
if st.button("Calcular"): | |
if customer_code: | |
customer_data = df[df["CLIENTE"] == str(customer_code)] | |
customer_euros = euros_proveedor[euros_proveedor["CLIENTE"] == str(customer_code)] | |
# Check if customer data exists | |
if not customer_data.empty and not customer_euros.empty: | |
st.write(f"### Analysis for Customer {customer_code}") | |
# **Find Customer's Cluster** | |
customer_match = customer_clusters[customer_clusters['cliente_id'] == customer_code] | |
if not customer_match.empty: | |
cluster = customer_match['cluster_id'].values[0] | |
st.write(f"Customer {customer_code} belongs to cluster {cluster}") | |
else: | |
st.error(f"Customer {customer_code} not found in customer_clusters.") | |
st.stop() # Stop further execution if no cluster is found | |
# **Step 2: Load the Corresponding Model** | |
model_path = f'models/modelo_cluster_{cluster}.txt' | |
gbm = lgb.Booster(model_file=model_path) | |
st.write(f"Loaded model for cluster {cluster}") | |
# **Step 3: Load X_predict for that cluster and extract customer-specific data** | |
X_predict_cluster = pd.read_csv(f'predicts/X_predict_cluster_{cluster}.csv') | |
X_cliente = X_predict_cluster[X_predict_cluster['cliente_id'] == customer_code] | |
if not X_cliente.empty: | |
# **Step 4: Make Prediction for the selected customer** | |
y_pred = gbm.predict(X_cliente.drop(columns=['cliente_id']), num_iteration=gbm.best_iteration) | |
st.write(f"Predicted sales for Customer {customer_code}: {y_pred[0]:.2f}") | |
# **Step 5: Merge with actual data from df_agg_2024** | |
df_agg_2024 = pd.read_csv('predicts/df_agg_2024.csv') | |
actual_sales = df_agg_2024[(df_agg_2024['cliente_id'] == customer_code) & (df_agg_2024['marca_id_encoded'].isin(X_cliente['marca_id_encoded']))] | |
if not actual_sales.empty: | |
merged_data = pd.merge( | |
pd.DataFrame({'cliente_id': [customer_code], 'ventas_predichas': y_pred}), | |
actual_sales[['cliente_id', 'marca_id_encoded', 'precio_total']], | |
on='cliente_id', | |
how='left' | |
) | |
merged_data.rename(columns={'precio_total': 'ventas_reales'}, inplace=True) | |
# Calculate metrics (MAE, MAPE, RMSE, SMAPE) | |
mae = mean_absolute_error(merged_data['ventas_reales'], merged_data['ventas_predichas']) | |
mape = np.mean(np.abs((merged_data['ventas_reales'] - merged_data['ventas_predichas']) / merged_data['ventas_reales'])) * 100 | |
rmse = np.sqrt(mean_squared_error(merged_data['ventas_reales'], merged_data['ventas_predichas'])) | |
smape_value = smape(merged_data['ventas_reales'], merged_data['ventas_predichas']) | |
st.write(f"MAE: {mae:.2f}") | |
st.write(f"MAPE: {mape:.2f}%") | |
st.write(f"RMSE: {rmse:.2f}") | |
st.write(f"SMAPE: {smape_value:.2f}%") | |
# **Step 6: Analysis of results (show insights if the customer is performing well or not)** | |
if mae < threshold_good: | |
st.success(f"Customer {customer_code} is performing well based on the predictions.") | |
else: | |
st.warning(f"Customer {customer_code} is not performing well based on the predictions.") | |
else: | |
st.warning(f"No actual sales data found for customer {customer_code} in df_agg_2024.") | |
# **Show the radar chart** | |
all_manufacturers = customer_data.iloc[:, 1:].T # Exclude CLIENTE column | |
all_manufacturers.index = all_manufacturers.index.astype(str) | |
sales_data = customer_euros.iloc[:, 1:].T # Exclude CLIENTE column | |
sales_data.index = sales_data.index.astype(str) | |
sales_data_filtered = sales_data.drop(index='CLIENTE', errors='ignore') | |
sales_data_filtered = sales_data_filtered.apply(pd.to_numeric, errors='coerce') | |
top_units = all_manufacturers.sort_values(by=all_manufacturers.columns[0], ascending=False).head(10) | |
top_sales = sales_data_filtered.sort_values(by=sales_data_filtered.columns[0], ascending=False).head(10) | |
combined_top = pd.concat([top_units, top_sales]).index.unique()[:20] | |
combined_top = [m for m in combined_top if m in all_manufacturers.index and m in sales_data_filtered.index] | |
combined_data = pd.DataFrame({ | |
'units': all_manufacturers.loc[combined_top, all_manufacturers.columns[0]], | |
'sales': sales_data_filtered.loc[combined_top, sales_data_filtered.columns[0]] | |
}).fillna(0) | |
combined_data_sorted = combined_data.sort_values(by=['units', 'sales'], ascending=False) | |
non_zero_manufacturers = combined_data_sorted[combined_data_sorted['units'] > 0] | |
if len(non_zero_manufacturers) < 3: | |
zero_manufacturers = combined_data_sorted[combined_data_sorted['units'] == 0].head(3 - len(non_zero_manufacturers)) | |
manufacturers_to_show = pd.concat([non_zero_manufacturers, zero_manufacturers]) | |
else: | |
manufacturers_to_show = non_zero_manufacturers | |
values = manufacturers_to_show['units'].tolist() | |
amounts = manufacturers_to_show['sales'].tolist() | |
manufacturers = [get_supplier_name(m) for m in manufacturers_to_show.index] | |
st.write(f"### Results for top {len(manufacturers)} manufacturers:") | |
for manufacturer, value, amount in zip(manufacturers, values, amounts): | |
st.write(f"{manufacturer} = {value:.2f}% of units, €{amount:.2f} total sales") | |
if manufacturers: | |
fig = radar_chart(manufacturers, values, amounts, f'Radar Chart for Top {len(manufacturers)} Manufacturers of Customer {customer_code}') | |
st.pyplot(fig) | |
else: | |
st.warning("No data available to create the radar chart.") | |
# **Show sales over the years graph** | |
sales_columns = ['VENTA_2021', 'VENTA_2022', 'VENTA_2023'] | |
if all(col in ventas_clientes.columns for col in sales_columns): | |
years = ['2021', '2022', '2023'] | |
customer_sales = ventas_clientes[ventas_clientes['codigo_cliente'] == customer_code][sales_columns].values[0] | |
fig_sales = px.line(x=years, y=customer_sales, markers=True, title=f'Sales Over the Years for Customer {customer_code}') | |
fig_sales.update_layout(xaxis_title="Year", yaxis_title="Sales") | |
st.plotly_chart(fig_sales) | |
else: | |
st.warning("Sales data for 2021-2023 not available.") | |
else: | |
st.warning(f"No prediction data found for customer {customer_code}.") | |
else: | |
st.warning(f"No data found for customer {customer_code}. Please check the code.") | |
else: | |
st.warning("Please select a customer.") | |
# Customer Recommendations Page | |
elif page == "Articles Recommendations": | |
st.title("Articles Recommendations") | |
st.markdown(""" | |
Get tailored recommendations for your customers based on their basket. | |
""") | |
# Campo input para cliente | |
partial_code = st.text_input("Enter part of Customer Code for Recommendations (or leave empty to see all)") | |
if partial_code: | |
filtered_customers = df[df['CLIENTE'].str.contains(partial_code)] | |
else: | |
filtered_customers = df | |
customer_list = filtered_customers['CLIENTE'].unique() | |
customer_code = st.selectbox("Select Customer Code for Recommendations", [""] + list(customer_list)) | |
# Definición de la función recomienda | |
def recomienda(new_basket): | |
# Calcular la matriz TF-IDF | |
tfidf = TfidfVectorizer() | |
tfidf_matrix = tfidf.fit_transform(cestas['Cestas']) | |
# Convertir la nueva cesta en formato TF-IDF | |
new_basket_str = ' '.join(new_basket) | |
new_basket_tfidf = tfidf.transform([new_basket_str]) | |
# Comparar la nueva cesta con las anteriores | |
similarities = cosine_similarity(new_basket_tfidf, tfidf_matrix) | |
# Obtener los índices de las cestas más similares | |
similar_indices = similarities.argsort()[0][-3:] # Las 3 más similares | |
# Crear un diccionario para contar las recomendaciones | |
recommendations_count = {} | |
total_similarity = 0 | |
# Recomendar productos de cestas similares | |
for idx in similar_indices: | |
sim_score = similarities[0][idx] | |
total_similarity += sim_score | |
products = cestas.iloc[idx]['Cestas'].split() | |
for product in products: | |
if product.strip() not in new_basket: # Evitar recomendar lo que ya está en la cesta | |
if product.strip() in recommendations_count: | |
recommendations_count[product.strip()] += sim_score | |
else: | |
recommendations_count[product.strip()] = sim_score | |
# Calcular la probabilidad relativa de cada producto recomendado | |
recommendations_with_prob = [] | |
if total_similarity > 0: # Verificar que total_similarity no sea cero | |
recommendations_with_prob = [(product, score / total_similarity) for product, score in recommendations_count.items()] | |
else: | |
print("No se encontraron similitudes suficientes para calcular probabilidades.") | |
recommendations_with_prob.sort(key=lambda x: x[1], reverse=True) # Ordenar por puntuación | |
# Crear un nuevo DataFrame para almacenar las recomendaciones con descripciones y probabilidades | |
recommendations_df = pd.DataFrame(columns=['ARTICULO', 'DESCRIPCION', 'PROBABILIDAD']) | |
# Agregar las recomendaciones al DataFrame usando pd.concat | |
for product, prob in recommendations_with_prob: | |
# Buscar la descripción en el DataFrame de productos | |
description = productos.loc[productos['ARTICULO'] == product, 'DESCRIPCION'] | |
if not description.empty: | |
# Crear un nuevo DataFrame temporal para la recomendación | |
temp_df = pd.DataFrame({ | |
'ARTICULO': [product], | |
'DESCRIPCION': [description.values[0]], # Obtener el primer valor encontrado | |
'PROBABILIDAD': [prob] | |
}) | |
# Concatenar el DataFrame temporal al DataFrame de recomendaciones | |
recommendations_df = pd.concat([recommendations_df, temp_df], ignore_index=True) | |
return recommendations_df | |
# Comprobar si el cliente está en el CSV de fieles | |
is_fiel = customer_code in fieles_df['Cliente'].astype(str).values | |
if customer_code: | |
if is_fiel: | |
st.write(f"### Customer {customer_code} is a loyal customer.") | |
option = st.selectbox("Select Recommendation Type", ["Select an option", "By Purchase History", "By Current Basket"]) | |
if option == "By Purchase History": | |
st.warning("Option not available... aún") | |
elif option == "By Current Basket": | |
st.write("Select the items and assign quantities for the basket:") | |
# Mostrar lista de artículos disponibles | |
available_articles = productos['ARTICULO'].unique() | |
selected_articles = st.multiselect("Select Articles", available_articles) | |
# Crear inputs para ingresar las cantidades de cada artículo seleccionado | |
quantities = {} | |
for article in selected_articles: | |
quantities[article] = st.number_input(f"Quantity for {article}", min_value=0, step=1) | |
if st.button("Calcular"): # Añadimos el botón "Calcular" | |
# Crear una lista de artículos basada en la selección | |
new_basket = [f"{article} x{quantities[article]}" for article in selected_articles if quantities[article] > 0] | |
if new_basket: | |
# Procesar la lista para recomendar | |
recommendations_df = recomienda(new_basket) | |
if not recommendations_df.empty: | |
st.write("### Recommendations based on the current basket:") | |
st.dataframe(recommendations_df) | |
else: | |
st.warning("No recommendations found for the provided basket.") | |
else: | |
st.warning("Please select at least one article and set its quantity.") | |
else: | |
st.write(f"### Customer {customer_code} is not a loyal customer.") | |
st.write("Select items and assign quantities for the basket:") | |
# Mostrar lista de artículos disponibles | |
available_articles = productos['ARTICULO'].unique() | |
selected_articles = st.multiselect("Select Articles", available_articles) | |
# Crear inputs para ingresar las cantidades de cada artículo seleccionado | |
quantities = {} | |
for article in selected_articles: | |
quantities[article] = st.number_input(f"Quantity for {article}", min_value=0, step=1) | |
if st.button("Calcular"): # Añadimos el botón "Calcular" | |
# Crear una lista de artículos basada en la selección | |
new_basket = [f"{article} x{quantities[article]}" for article in selected_articles if quantities[article] > 0] | |
if new_basket: | |
# Procesar la lista para recomendar | |
recommendations_df = recomienda(new_basket) | |
if not recommendations_df.empty: | |
st.write("### Recommendations based on the current basket:") | |
st.dataframe(recommendations_df) | |
else: | |
st.warning("No recommendations found for the provided basket.") | |
else: | |
st.warning("Please select at least one article and set its quantity.") | |