MediaMixOptimization / pages /10_Saved_Scenarios.py
samkeet's picture
Upload 40 files
00b00eb verified
# Importing necessary libraries
import streamlit as st
st.set_page_config(
page_title="Saved Scenarios",
page_icon="โš–๏ธ",
layout="wide",
initial_sidebar_state="collapsed",
)
import io
import sys
import json
import pickle
import zipfile
import traceback
import numpy as np
import pandas as pd
from scenario import numerize
from openpyxl import Workbook
from post_gres_cred import db_cred
from log_application import log_message
from utilities import (
project_selection,
update_db,
set_header,
load_local_css,
name_formating,
)
schema = db_cred["schema"]
load_local_css("styles.css")
set_header()
# Initialize project name session state
if "project_name" not in st.session_state:
st.session_state["project_name"] = None
# Fetch project dictionary
if "project_dct" not in st.session_state:
project_selection()
st.stop()
# Display Username and Project Name
if "username" in st.session_state and st.session_state["username"] is not None:
cols1 = st.columns([2, 1])
with cols1[0]:
st.markdown(f"**Welcome {st.session_state['username']}**")
with cols1[1]:
st.markdown(f"**Current Project: {st.session_state['project_name']}**")
# Function to get saved scenarios dictionary
def get_saved_scenarios_dict():
return st.session_state["project_dct"]["saved_scenarios"][
"saved_scenarios_dict"
]
# Function to format values based on their size
def format_value(value):
return round(value, 4) if value < 1 else round(value, 1)
# Function to recursively convert non-serializable types to serializable ones
def convert_to_serializable(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, dict):
return {key: convert_to_serializable(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_to_serializable(element) for element in obj]
elif isinstance(obj, (int, float, str, bool, type(None))):
return obj
else:
# Fallback: convert the object to a string
return str(obj)
# Function to generate zip file of current scenario
@st.cache_data(show_spinner=False)
def download_as_zip(
df,
scenario_data,
excel_name="optimization_results.xlsx",
json_name="scenario_params.json",
):
# Create an in-memory bytes buffer for the ZIP file
buffer = io.BytesIO()
# Create a ZipFile object in memory
with zipfile.ZipFile(buffer, "w") as zip_file:
# Save the DataFrame to an Excel file in the zip using openpyxl
excel_buffer = io.BytesIO()
workbook = Workbook()
sheet = workbook.active
sheet.title = "Results"
# Write DataFrame headers
for col_num, column_title in enumerate(df.columns, 1):
sheet.cell(row=1, column=col_num, value=column_title)
# Write DataFrame data
for row_num, row_data in enumerate(df.values, 2):
for col_num, cell_value in enumerate(row_data, 1):
sheet.cell(row=row_num, column=col_num, value=cell_value)
# Save the workbook to the in-memory buffer
workbook.save(excel_buffer)
excel_buffer.seek(0) # Rewind the buffer to the beginning
zip_file.writestr(excel_name, excel_buffer.getvalue())
# Save the dictionary to a JSON file in the zip
json_buffer = io.BytesIO()
json_buffer.write(
json.dumps(convert_to_serializable(scenario_data), indent=4).encode("utf-8")
)
json_buffer.seek(0) # Rewind the buffer to the beginning
zip_file.writestr(json_name, json_buffer.getvalue())
buffer.seek(0) # Rewind the buffer to the beginning
return buffer
# Function to delete the selected scenario from the saved scenarios dictionary
def delete_selected_scenarios(selected_scenario):
if (
selected_scenario
in st.session_state["project_dct"]["saved_scenarios"]["saved_scenarios_dict"]
):
del st.session_state["project_dct"]["saved_scenarios"]["saved_scenarios_dict"][
selected_scenario
]
try:
# Page Title
st.title("Saved Scenarios")
# Placeholder to display scenarios name
scenarios_name_placeholder = st.empty()
# Get saved scenarios dictionary and scenario name list
saved_scenarios_dict = get_saved_scenarios_dict()
scenarios_list = list(saved_scenarios_dict.keys())
# Check if the list of saved scenarios is empty
if len(scenarios_list) == 0:
# Display a warning message if no scenarios are saved
st.warning("No scenarios saved. Please save a scenario to load.", icon="โš ๏ธ")
# Log message
log_message(
"warning",
"No scenarios saved. Please save a scenario to load.",
"Saved Scenarios",
)
st.stop()
# Columns for scenario selection and save progress
select_scenario_col, save_progress_col = st.columns(2)
save_message_display_placeholder = st.container()
# Display a dropdown saved scenario list
selected_scenario = select_scenario_col.selectbox(
"Pick a Scenario", sorted(scenarios_list), key="selected_scenario"
)
# Save page progress
with save_progress_col:
st.write("###")
with save_message_display_placeholder, st.spinner("Saving Progress ..."):
if save_progress_col.button("Save Progress", use_container_width=True):
# Update DB
update_db(
prj_id=st.session_state["project_number"],
page_nam="Saved Scenarios",
file_nam="project_dct",
pkl_obj=pickle.dumps(st.session_state["project_dct"]),
schema=schema,
)
# Display success message
st.success("Progress saved successfully!", icon="๐Ÿ’พ")
st.toast("Progress saved successfully!", icon="๐Ÿ’พ")
# Log message
log_message("info", "Progress saved successfully!", "Saved Scenarios")
selected_scenario_data = saved_scenarios_dict[selected_scenario]
# Scenarios Name
metrics_name = selected_scenario_data["metrics_selected"]
panel_name = selected_scenario_data["panel_selected"]
optimization_name = selected_scenario_data["optimization"]
multiplier = selected_scenario_data["multiplier"]
timeframe = selected_scenario_data["timeframe"]
# Display the scenario details with bold "Metric," "Panel," and "Optimization"
scenarios_name_placeholder.markdown(
f"**Metric**: {name_formating(metrics_name)}; **Panel**: {name_formating(panel_name)}; **Fix**: {name_formating(optimization_name)}; **Timeframe**: {name_formating(timeframe)}"
)
# Create columns for download and delete buttons
download_col, delete_col = st.columns(2)
save_message_display_placeholder = st.container()
# Channel List
channels_list = list(selected_scenario_data["channels"].keys())
# List to hold data for all channels
channels_data = []
# Iterate through each channel and gather required data
for channel in channels_list:
channel_conversion_rate = selected_scenario_data["channels"][channel][
"conversion_rate"
]
channel_actual_spends = (
selected_scenario_data["channels"][channel]["actual_total_spends"]
* channel_conversion_rate
)
channel_optimized_spends = (
selected_scenario_data["channels"][channel]["modified_total_spends"]
* channel_conversion_rate
)
channel_actual_metrics = selected_scenario_data["channels"][channel][
"actual_total_sales"
]
channel_optimized_metrics = selected_scenario_data["channels"][channel][
"modified_total_sales"
]
channel_roi_mroi_data = selected_scenario_data["channel_roi_mroi"][channel]
# Extract the ROI and MROI data
actual_roi = channel_roi_mroi_data["actual_roi"]
optimized_roi = channel_roi_mroi_data["optimized_roi"]
actual_mroi = channel_roi_mroi_data["actual_mroi"]
optimized_mroi = channel_roi_mroi_data["optimized_mroi"]
# Calculate spends per metric
spends_per_metrics_actual = channel_actual_spends / channel_actual_metrics
spends_per_metrics_optimized = (
channel_optimized_spends / channel_optimized_metrics
)
# Append the collected data as a dictionary to the list
channels_data.append(
{
"Channel Name": channel,
"Spends Actual": numerize(channel_actual_spends / multiplier),
"Spends Optimized": numerize(channel_optimized_spends / multiplier),
f"{name_formating(metrics_name)} Actual": numerize(
channel_actual_metrics / multiplier
),
f"{name_formating(metrics_name)} Optimized": numerize(
channel_optimized_metrics / multiplier
),
"ROI Actual": format_value(actual_roi),
"ROI Optimized": format_value(optimized_roi),
"MROI Actual": format_value(actual_mroi),
"MROI Optimized": format_value(optimized_mroi),
f"Spends per {name_formating(metrics_name)} Actual": round(
spends_per_metrics_actual, 2
),
f"Spends per {name_formating(metrics_name)} Optimized": round(
spends_per_metrics_optimized, 2
),
}
)
# Create a DataFrame from the collected data
df = pd.DataFrame(channels_data)
# Display the DataFrame
st.dataframe(df, hide_index=True)
# Generate download able data for selected scenario
buffer = download_as_zip(
df,
selected_scenario_data,
excel_name="optimization_results.xlsx",
json_name="scenario_params.json",
)
# Provide the buffer as a downloadable ZIP file
file_name = f"{selected_scenario}_scenario_data.zip"
if download_col.download_button(
label="Download",
data=buffer,
file_name=file_name,
mime="application/zip",
use_container_width=True,
):
# Log message
log_message(
"info",
f"FILE_NAME: {file_name} has been successfully downloaded.",
"Saved Scenarios",
)
# Button to trigger the deletion of the selected scenario
if delete_col.button(
"Delete",
use_container_width=True,
on_click=delete_selected_scenarios,
args=(selected_scenario,),
):
# Display success message
with save_message_display_placeholder:
st.success(
"Selected scenario successfully deleted. Click the 'Save Progress' button to ensure your changes are updated!",
icon="๐Ÿ—‘๏ธ",
)
st.toast(
"Selected scenario successfully deleted. Click the 'Save Progress' button to ensure your changes are updated!",
icon="๐Ÿ—‘๏ธ",
)
# Log message
log_message(
"info", "Selected scenario successfully deleted.", "Saved Scenarios"
)
except Exception as e:
# Capture the error details
exc_type, exc_value, exc_traceback = sys.exc_info()
error_message = "".join(
traceback.format_exception(exc_type, exc_value, exc_traceback)
)
# Log message
log_message("error", f"An error occurred: {error_message}.", "Saved Scenarios")
# Display a warning message
st.warning(
"Oops! Something went wrong. Please try refreshing the tool or creating a new project.",
icon="โš ๏ธ",
)