Breakfast-Poll / app.py
MarcosRodrigo's picture
Update app.py
ab0ec7c verified
raw
history blame
21 kB
import streamlit as st
import pandas as pd
from datetime import datetime
import os
import matplotlib.pyplot as plt
import seaborn as sns
from huggingface_hub import HfApi, upload_file, list_repo_files, hf_hub_download
# Configuration for Hugging Face Repository
REPO_ID = "MarcosRodrigo/Breakfast-Poll"
HISTORY_DIR = "history"
TEMP_FILE = "current_selections.csv"
# Hugging Face API (requires a token with write access)
hf_token = st.secrets["HF_TOKEN"]
api = HfApi()
# Initialize all required session state variables
if "users" not in st.session_state:
st.session_state.users = []
if "current_selections" not in st.session_state:
st.session_state.current_selections = []
if "step" not in st.session_state:
st.session_state.step = 1
if "history" not in st.session_state:
st.session_state.history = []
# Load temporary selections from the shared file
def load_current_selections():
if os.path.exists(TEMP_FILE):
return pd.read_csv(TEMP_FILE)
else:
return pd.DataFrame(columns=["Name", "Drinks", "Food"])
# Save current user selections to the shared CSV file without overwriting previous data
def save_current_selection_to_file(current_selections):
current_selections["Drinks"] = current_selections["Drinks"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)
current_selections["Food"] = current_selections["Food"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)
if os.path.exists(TEMP_FILE):
existing_selections = pd.read_csv(TEMP_FILE)
combined_selections = pd.concat([existing_selections, current_selections]).drop_duplicates()
else:
combined_selections = current_selections
combined_selections.to_csv(TEMP_FILE, index=False)
# Upload the shared file to Hugging Face repository for persistence
def upload_temp_file_to_repo():
if os.path.exists(TEMP_FILE):
upload_file(
path_or_fileobj=TEMP_FILE,
path_in_repo=TEMP_FILE,
repo_id=REPO_ID,
token=hf_token,
repo_type="space"
)
# Delete a file from the repository (e.g., `current_selections.csv`)
def delete_file_from_repo(filename):
api.delete_file(
path_in_repo=filename,
repo_id=REPO_ID,
token=hf_token,
repo_type="space"
)
# Download the shared file from the repository to ensure persistence and real-time updates
def download_temp_file_from_repo():
try:
hf_hub_download(repo_id=REPO_ID, filename=TEMP_FILE, repo_type="space", token=hf_token, local_dir=".")
except Exception:
pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False)
# Load history from the repository
def load_history():
history = []
files_in_repo = list_repo_files(REPO_ID, token=hf_token, repo_type="space")
history_files = [f for f in files_in_repo if f.startswith(f"{HISTORY_DIR}/") and f.endswith(".txt")]
for file in history_files:
local_filepath = hf_hub_download(repo_id=REPO_ID, filename=file, token=hf_token, repo_type="space")
summary_df = pd.read_csv(local_filepath)
date = file.split("/")[-1].split(".txt")[0]
history.append({"Date": date, "Summary": summary_df})
return history
# Save the current summary to a text file in the history directory
def save_summary_to_history():
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
history_filename = f"{HISTORY_DIR}/{timestamp}.txt"
if not os.path.exists(HISTORY_DIR):
os.makedirs(HISTORY_DIR)
if os.path.exists(TEMP_FILE):
summary_df = pd.read_csv(TEMP_FILE)
summary_df.to_csv(history_filename, index=False)
upload_file(path_or_fileobj=history_filename, path_in_repo=history_filename, repo_id=REPO_ID, token=hf_token, repo_type="space")
return timestamp
# Load persistent history and temporary selections on app start
if "history" not in st.session_state:
download_temp_file_from_repo()
st.session_state.history = load_history()
st.session_state.current_selections = load_current_selections().to_dict(orient="records")
# Sidebar for navigating through different views
menu = st.sidebar.selectbox("Select View", ["Poll", "Current", "History", "Graph"])
# Function to reset the current selections after submission
def reset_selections():
st.session_state.users = []
st.session_state.current_selections = []
# Poll view with four consecutive steps
if menu == "Poll":
st.title("Breakfast Poll Application")
# Step 1: User's Name
st.header("Step 1: Enter your name")
name = st.text_input("Name:")
if st.button("Next", key="step1_next") and name:
st.session_state.users.append(name)
st.session_state.step = 2 # Set the next step to be visible
# Show Step 2 only if Step 1 is completed
if st.session_state.step >= 2:
st.header("Step 2: Select your drink(s)")
drinks_options = [
"Café con leche", "Colacao", "Descafeinado con leche", "Cortado",
"Aguasusia", "Aguasusia susia", "Café descafeinado con leche desnatada",
"Italiano", "Café con soja", "Té", "Manzanilla", "Nada"
]
selected_drinks = st.multiselect("Choose your drinks:", drinks_options)
if st.button("Next", key="step2_next") and selected_drinks:
st.session_state.current_selections.append({"Name": st.session_state.users[-1], "Drinks": selected_drinks})
st.session_state.step = 3 # Set the next step to be visible
# Show Step 3 only if Step 2 is completed
if st.session_state.step >= 3:
st.header("Step 3: Select your food(s)")
food_options = [
"Barrita aceite", "Barrita tomate", "Palmera chocolate",
"Palmera chocolate blanco", "Yogurt", "Tortilla", "Nada"
]
selected_food = st.multiselect("Choose your food:", food_options)
if st.button("Save Selections", key="save_selections") and selected_food:
st.session_state.current_selections[-1]["Food"] = selected_food
df = pd.DataFrame(st.session_state.current_selections)
save_current_selection_to_file(df)
upload_temp_file_to_repo()
st.success(f"Selections saved for {st.session_state.users[-1]}!")
st.session_state.step = 1 # Reset to step 1 for the next user
# History view to check past summaries
elif menu == "History":
st.title("Breakfast Poll History")
# Reload history if it's not already loaded
if not st.session_state.history:
st.session_state.history = load_history()
if st.session_state.history:
# Display history in reverse chronological order
for record in reversed(st.session_state.history):
st.subheader(f"Date: {record['Date']}")
st.table(record["Summary"])
else:
st.write("No history records found.")
# # "Current" view to display the current summary of all users' selections and submit to history
# elif menu == "Current":
# st.title("Current Selections of All Users")
# if st.button("Reload Selections"):
# download_temp_file_from_repo()
# current_df = load_current_selections()
# st.table(current_df)
# if st.button("Submit Summary to History"):
# timestamp = save_summary_to_history()
# st.success(f"Summary saved to history at {timestamp}")
# st.session_state.history = load_history()
# # Clear local and remote current selections
# if os.path.exists(TEMP_FILE):
# os.remove(TEMP_FILE)
# delete_file_from_repo(TEMP_FILE) # Delete the file from the remote repo
# # Create an empty CSV to replace the deleted one
# pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False)
# upload_temp_file_to_repo()
# "Current" view to display the current summary of all users' selections and generate ticket
elif menu == "Current":
st.title("Current Selections of All Users")
if st.button("Reload Selections"):
download_temp_file_from_repo()
# Load the current selections from the session state or from the file
current_df = load_current_selections()
st.table(current_df)
# Define item prices
item_prices = {
"Cafés": {
"Café con leche": 1.20,
"Descafeinado con leche": 1.20,
"Cortado": 1.20,
"Aguasusia": 1.20,
"Aguasusia susia": 1.20,
"Descafeinado con leche desnatada": 1.20,
"Italiano": 1.20,
"Café con soja": 1.20,
"Café sin lactosa": 1.20,
},
"Infusiones": {
"Té": 0.90,
"Manzanilla": 0.90,
},
"Colacaos": {
"Colacao": 1.50,
},
"Palmeras": {
"Palmera chocolate": 1.50,
"Palmera chocolate blanco": 1.50,
},
"Barrita aceite": 0.65,
"Barrita tomate": 1.30,
"Napolitana": 0.65,
"Croissant": 0.65,
"Yogurt": 1.00,
"Tortilla": 1.50,
"Nada": 0.00,
}
# # Define item prices
# item_prices = {
# "Café con leche": 1.20,
# "Colacao": 1.50,
# "Café Descafeinado con leche": 1.20,
# "Cortado": 1.20,
# "Café Aguasusia": 1.20,
# "Café Aguasusia susia": 1.20,
# "Café descafeinado con leche desnatada": 1.20,
# "Café Italiano": 1.20,
# "Café con soja": 1.20,
# "Té": 0.90,
# "Manzanilla": 0.90,
# "Nada": 0.00,
# "Barrita con aceite": 0.65,
# "Barrita con tomate": 1.30,
# "Palmera de chocolate": 1.50,
# "Palmera de chocolate blanco": 1.50,
# "Yogurt": 1.00,
# "Pincho de tortilla": 1.50
# }
# Define combined prices for special combinations
combo_prices = {
"desayuno + café (aceite)": 1.85,
"desayuno + café (tomate)": 2.50,
"desayuno + café (napolitana)": 1.85,
}
# Use session state to persist ticket generation status
if "ticket_generated" not in st.session_state:
st.session_state.ticket_generated = False
# Generate Ticket Button and Logic
if st.button("Generate Ticket"):
ticket = []
# Iterate over each user's selections
drinks = []
foods = []
for _, row in current_df.iterrows():
drinks.append(row['Drinks'])
foods.append(row['Food'])
# Simplify items
drinks = ["Café" if x in item_prices["Cafés"].keys() else x for x in drinks]
drinks = ["Infusión" if x in item_prices["Infusiones"].keys() else x for x in drinks]
foods = ["Palmera" if x in item_prices["Palmeras"].keys() else x for x in foods]
# Display selections
st.write(f"Drinks: {drinks}")
st.write(f"Foods: {foods}")
# Count items
item_count = {
"Café": len([x for x in drinks if x == "Café"]),
"Infusión": len([x for x in drinks if x == "Infusión"]),
"Colacao": len([x for x in drinks if x == "Colacao"]),
"Barrita aceite": len([x for x in drinks if x == "Barrita aceite"]),
"Barrita tomate": len([x for x in drinks if x == "Barrita tomate"]),
"Napolitana": len([x for x in drinks if x == "Napolitana"]),
"Palmera": len([x for x in drinks if x == "Palmeras"]),
"Tortilla": len([x for x in drinks if x == "Tortilla"]),
"Croissant": len([x for x in drinks if x == "Croissant"]),
"Yogurt": len([x for x in drinks if x == "Yogurt"]),
}
# Make associations
item_association = {}
food_count = item_count["Barrita aceite"] + item_count["Barrita tomate"] + item_count["Napolitana"] + item_count["Croissant"]
if item_count["Café"] >= food_count:
item_association["Desayuno + Café (tomate)"] = item_count["Barrita tomate"]
item_association["Desayuno + Café (aceite)"] = item_count["Barrita aceite"]
item_association["Desayuno + Café (napolitana)"] = item_count["Napolitana"]
item_association["Desayuno + Café (croissant)"] = item_count["Croissant"]
item_association["Café"] = item_count["Café"] - food_count
else:
raise NotImplementedError
item_association["Colacaos"] = item_count["Colacao"]
item_association["Yogurts"] = item_count["Yogurt"]
item_association["Tortillas"] = item_count["Tortilla"]
item_association["Palmeras"] = item_count["Palmera"]
item_association["Infusiones"] = item_count["Infusión"]
st.write(item_association)
# drinks = row['Drinks'].split(", ") if isinstance(row['Drinks'], str) else []
# food = row['Food'].split(", ") if isinstance(row['Food'], str) else []
# used_drinks = set()
# used_food = set()
# # Handle combinations of café + barrita con aceite
# for drink in drinks:
# if "café" in drink.lower() and "Barrita con aceite" in food:
# ticket.append({"Item": "desayuno + café (aceite)", "Price": combo_prices["desayuno + café (aceite)"]})
# used_drinks.add(drink)
# used_food.add("Barrita con aceite")
# break
# # Handle combinations of café + barrita con tomate
# for drink in drinks:
# if "café" in drink.lower() and "Barrita con tomate" in food and drink not in used_drinks:
# ticket.append({"Item": "desayuno + café (tomate)", "Price": combo_prices["desayuno + café (tomate)"]})
# used_drinks.add(drink)
# used_food.add("Barrita con tomate")
# break
# # Add remaining individual drinks not used in combinations
# for drink in drinks:
# if drink not in used_drinks and drink in item_prices:
# ticket.append({"Item": drink, "Price": item_prices[drink]})
# used_drinks.add(drink)
# # Add remaining individual food not used in combinations
# for f in food:
# if f not in used_food and f in item_prices:
# ticket.append({"Item": f, "Price": item_prices[f]})
# used_food.add(f)
# Create a DataFrame to display the ticket
ticket_df = pd.DataFrame(ticket)
# Format prices to show only 2 decimals
ticket_df["Price"] = ticket_df["Price"].apply(lambda x: f"{x:.2f}")
st.subheader("Generated Ticket")
st.table(ticket_df)
# Calculate and display the total price
total_price = sum([float(price) for price in ticket_df["Price"]])
st.write(f"**Total Price:** {total_price:.2f} €")
# Set ticket_generated to True in session state
st.session_state.ticket_generated = True
# Only show the "Submit Summary to History" button if a ticket is generated
if st.session_state.ticket_generated:
if st.button("Submit Summary to History"):
timestamp = save_summary_to_history()
st.success(f"Summary saved to history at {timestamp}")
st.session_state.history = load_history()
# Clear local and remote current selections
if os.path.exists(TEMP_FILE):
os.remove(TEMP_FILE)
delete_file_from_repo(TEMP_FILE)
# Create an empty CSV to replace the deleted one
pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False)
upload_temp_file_to_repo()
# Reset session state for current selections and ticket generation status
st.session_state.current_selections = []
st.session_state.ticket_generated = False
# Reload the current selections to show an empty table
current_df = pd.DataFrame(columns=["Name", "Drinks", "Food"])
st.table(current_df)
# History view to check past summaries
elif menu == "History":
st.title("Breakfast Poll History")
# Reload history if it's not already loaded
if not st.session_state.history:
st.session_state.history = load_history()
if st.session_state.history:
# Display history in reverse chronological order
for record in reversed(st.session_state.history):
st.subheader(f"Date: {record['Date']}")
st.table(record["Summary"])
else:
st.write("No history records found.")
# Graph view to display a line chart of item selections over time
elif menu == "Graph":
st.title("Breakfast Poll History - Graph View")
# Load the history if not already loaded
if not st.session_state.history:
st.session_state.history = load_history()
# Prepare data for plotting
if st.session_state.history:
history_data = []
user_data = {} # Store user-specific data
for record in st.session_state.history:
# Extract only the date part (YYYY-MM-DD) for display
date = record['Date'].split("_")[0] # Use only the YYYY-MM-DD portion of the date
for index, row in record['Summary'].iterrows():
user = row['Name']
for drink in row['Drinks'].split(', '):
history_data.append({'Date': date, 'Item': drink, 'Type': 'Drink', 'User': user})
for food in row['Food'].split(', '):
history_data.append({'Date': date, 'Item': food, 'Type': 'Food', 'User': user})
# Append user data for selection
if user not in user_data:
user_data[user] = True # Initialize all users as visible by default
# Create a DataFrame from history data
history_df = pd.DataFrame(history_data)
# Count occurrences of each item per date
item_counts = history_df.groupby(['Date', 'Item', 'Type', 'User']).size().reset_index(name='Count')
# Separate items into Drinks and Food, and sort them alphabetically
drinks = sorted(item_counts[item_counts['Type'] == 'Drink']['Item'].unique())
foods = sorted(item_counts[item_counts['Type'] == 'Food']['Item'].unique())
# Create a dictionary to store the checkbox values for each item
item_visibility = {}
# Create interactive checkboxes for Drinks, Food, and Users in the sidebar
st.sidebar.header("Select Items to Display")
# Drinks Section
if drinks:
st.sidebar.subheader("Drinks")
for item in drinks:
# Add a unique key to each checkbox to avoid duplicate widget IDs
item_visibility[item] = st.sidebar.checkbox(item, value=True, key=f"checkbox_{item}_Drink")
# Food Section
if foods:
st.sidebar.subheader("Food")
for item in foods:
# Add a unique key to each checkbox to avoid duplicate widget IDs
item_visibility[item] = st.sidebar.checkbox(item, value=True, key=f"checkbox_{item}_Food")
# User Section: Create a checkbox for each user to toggle their visibility
st.sidebar.subheader("Users")
for user in user_data.keys():
user_data[user] = st.sidebar.checkbox(user, value=True, key=f"checkbox_user_{user}")
# Filter the data based on selected items and users
selected_items = [item for item, visible in item_visibility.items() if visible]
selected_users = [user for user, visible in user_data.items() if visible]
filtered_item_counts = item_counts[item_counts['Item'].isin(selected_items) & item_counts['User'].isin(selected_users)]
# Check if there is data to display
if not filtered_item_counts.empty:
# Create a line plot for each selected item over time
plt.figure(figsize=(12, 6))
sns.lineplot(data=filtered_item_counts, x='Date', y='Count', hue='Item', marker='o')
# Customize the y-axis to show only integer labels
y_max = max(filtered_item_counts['Count'].max() + 1, 1) # Set y_max to at least 1 to avoid errors
plt.yticks(range(0, y_max)) # Show only integer labels on the y-axis
# Customize the plot
plt.xticks(rotation=45)
plt.title('Item Selections Over Time')
plt.xlabel('Date')
plt.ylabel('Number of Selections')
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3)
# Display the plot
st.pyplot(plt.gcf())
else:
st.write("No data to display. Please select at least one user and one item.")
else:
st.write("No historical data available to plot.")