Spaces:
Sleeping
Sleeping
# Importing necessary libraries | |
import streamlit as st | |
st.set_page_config( | |
page_title="Data Import", | |
page_icon="⚖️", | |
layout="wide", | |
initial_sidebar_state="collapsed", | |
) | |
import re | |
import sys | |
import pickle | |
import numbers | |
import traceback | |
import pandas as pd | |
from scenario import numerize | |
from post_gres_cred import db_cred | |
from collections import OrderedDict | |
from log_application import log_message | |
from utilities import set_header, load_local_css, update_db, project_selection | |
from constants import ( | |
upload_rows_limit, | |
upload_column_limit, | |
word_length_limit_lower, | |
word_length_limit_upper, | |
minimum_percent_overlap, | |
minimum_row_req, | |
percent_drop_col_threshold, | |
) | |
schema = db_cred["schema"] | |
load_local_css("styles.css") | |
set_header() | |
# Initialize project name session state | |
if "project_name" not in st.session_state: | |
st.session_state["project_name"] = None | |
# Fetch project dictionary | |
if "project_dct" not in st.session_state: | |
project_selection() | |
st.stop() | |
# Display Username and Project Name | |
if "username" in st.session_state and st.session_state["username"] is not None: | |
cols1 = st.columns([2, 1]) | |
with cols1[0]: | |
st.markdown(f"**Welcome {st.session_state['username']}**") | |
with cols1[1]: | |
st.markdown(f"**Current Project: {st.session_state['project_name']}**") | |
# Initialize session state keys | |
if "granularity_selection_key" not in st.session_state: | |
st.session_state["granularity_selection_key"] = st.session_state["project_dct"][ | |
"data_import" | |
]["granularity_selection"] | |
# Function to format name | |
def name_format_func(name): | |
return str(name).strip().title() | |
# Function to get columns with specified prefix and remove prefix | |
def get_columns_with_prefix(df, prefix): | |
return [ | |
col.replace(prefix, "") | |
for col in df.columns | |
if col.startswith(prefix) and str(col) != str(prefix) | |
] | |
# Function to fetch columns info | |
def fetch_columns(gold_layer_df, data_upload_df): | |
# Get lists of columns starting with 'spends_' and 'response_metric_' from gold_layer_df | |
spends_columns_gold_layer = get_columns_with_prefix(gold_layer_df, "spends_") | |
response_metric_columns_gold_layer = get_columns_with_prefix( | |
gold_layer_df, "response_metric_" | |
) | |
# Get lists of columns starting with 'spends_' and 'response_metric_' from data_upload_df | |
spends_columns_upload = get_columns_with_prefix(data_upload_df, "spends_") | |
response_metric_columns_upload = get_columns_with_prefix( | |
data_upload_df, "response_metric_" | |
) | |
# Combine lists from both DataFrames | |
spends_columns = spends_columns_gold_layer + spends_columns_upload | |
# Remove 'total' from the spends_columns list if it exists | |
spends_columns = list( | |
set([col for col in spends_columns if not col.endswith("_total")]) | |
) | |
response_metric_columns = ( | |
response_metric_columns_gold_layer + response_metric_columns_upload | |
) | |
# Filter columns ending with '_total' and remove the '_total' suffix | |
response_metric_columns = list( | |
set( | |
[ | |
col[:-6] | |
for col in response_metric_columns | |
if col.endswith("_total") and len(col[:-6]) != 0 | |
] | |
) | |
) | |
# Get list of all columns from both DataFrames | |
gold_layer_columns = list(gold_layer_df.columns) | |
data_upload_columns = list(data_upload_df.columns) | |
# Combine all columns and get unique columns | |
all_columns = list(set(gold_layer_columns + data_upload_columns)) | |
return ( | |
spends_columns, | |
response_metric_columns, | |
all_columns, | |
gold_layer_columns, | |
data_upload_columns, | |
) | |
# Function to format values for display | |
def format_values_for_display(values_list): | |
# Format value | |
formatted_list = [value.lower().strip() for value in values_list] | |
# Join values with commas and 'and' before the last value | |
if len(formatted_list) > 1: | |
return ", ".join(formatted_list[:-1]) + ", and " + formatted_list[-1] | |
elif formatted_list: | |
return formatted_list[0] | |
return "No values available" | |
# Function to validate input DataFrame | |
def valid_input_df( | |
df, | |
spends_columns, | |
response_metric_columns, | |
total_columns, | |
gold_layer_columns, | |
data_upload_columns, | |
): | |
# Check if DataFrame is empty | |
if df.empty or len(df) < 1: | |
return (True, None) | |
# Check for invalid column names | |
invalid_columns = [ | |
col | |
for col in df.columns | |
if not re.match(r"^[A-Za-z0-9_]+$", col) | |
or not (word_length_limit_lower <= len(col) <= word_length_limit_upper) | |
] | |
if invalid_columns: | |
return ( | |
False, | |
f"Invalid column names: {format_values_for_display(invalid_columns)}. Use only letters, numbers, and underscores. Column name length should be {word_length_limit_lower} to {word_length_limit_upper} characters long.", | |
) | |
# Ensure 'panel' column values are strings and conform to specified pattern and length | |
if "panel" in df.columns: | |
df["panel"] = df["panel"].astype(str).str.strip() | |
invalid_panel_values = [ | |
val | |
for val in df["panel"].unique() | |
if not re.match(r"^[A-Za-z0-9_]+$", val) | |
or not (word_length_limit_lower <= len(val) <= word_length_limit_upper) | |
] | |
if invalid_panel_values: | |
return ( | |
False, | |
f"Invalid panel values: {format_values_for_display(invalid_panel_values)}. Use only letters, numbers, and underscores. Panel name length should be {word_length_limit_lower} to {word_length_limit_upper} characters long.", | |
) | |
# Check for missing required columns | |
required_columns = ["date", "panel"] | |
missing_columns = [col for col in required_columns if col not in df.columns] | |
if missing_columns: | |
return ( | |
False, | |
f"Missing compulsory columns: {format_values_for_display(missing_columns)}.", | |
) | |
# Check if all other columns are numeric | |
non_numeric_columns = [ | |
col | |
for col in df.columns | |
if col not in required_columns and not pd.api.types.is_numeric_dtype(df[col]) | |
] | |
if non_numeric_columns: | |
return ( | |
False, | |
f"Non-numeric columns: {format_values_for_display(non_numeric_columns)}. All columns except {format_values_for_display(required_columns)} should be numeric.", | |
) | |
# Ensure all columns in data_upload_columns are unique | |
duplicate_columns_in_upload = [ | |
col for col in data_upload_columns if data_upload_columns.count(col) > 1 | |
] | |
if duplicate_columns_in_upload: | |
return ( | |
False, | |
f"Duplicate columns found in the uploaded data: {format_values_for_display(set(duplicate_columns_in_upload))}.", | |
) | |
# Convert 'date' column to datetime format | |
try: | |
df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") | |
except: | |
return False, "The 'date' column is not in the correct format 'YYYY-MM-DD'." | |
# Check date frequency | |
unique_panels = df["panel"].unique() | |
for panel in unique_panels: | |
date_diff = df[df["panel"] == panel]["date"].diff().dropna() | |
if not ( | |
(date_diff == pd.Timedelta(days=1)).all() | |
or (date_diff == pd.Timedelta(weeks=1)).all() | |
): | |
return False, "The 'date' column does not have a daily or weekly frequency." | |
# Check for null values in 'date' or 'panel' columns | |
if df[required_columns].isnull().any().any(): | |
return ( | |
False, | |
f"The {format_values_for_display(required_columns)} should not contain null values.", | |
) | |
# Check for panels with less than 1% date overlap | |
if not gold_layer_df.empty: | |
panels_with_low_overlap = [] | |
unique_panels = list( | |
set(df["panel"].unique()).union(set(gold_layer_df["panel"].unique())) | |
) | |
for panel in unique_panels: | |
gold_layer_dates = set( | |
gold_layer_df[gold_layer_df["panel"] == panel]["date"] | |
) | |
data_upload_dates = set(df[df["panel"] == panel]["date"]) | |
if gold_layer_dates and data_upload_dates: | |
overlap = len(gold_layer_dates & data_upload_dates) / len( | |
gold_layer_dates | data_upload_dates | |
) | |
else: | |
overlap = 0 | |
if overlap < (minimum_percent_overlap / 100): | |
panels_with_low_overlap.append(panel) | |
if panels_with_low_overlap: | |
return ( | |
False, | |
f"Date columns in the gold layer and uploaded data do not have at least {minimum_percent_overlap}% overlap for panels: {format_values_for_display(panels_with_low_overlap)}.", | |
) | |
# Check if spends_columns is less than two | |
if len(spends_columns) < 2: | |
return False, "Please add at least two spends columns." | |
# Check if response_metric_columns is empty | |
if len(response_metric_columns) < 1: | |
return False, "Please add response metric columns." | |
# Check if all numeric columns are positive except those starting with 'exogenous_' or 'internal_' | |
valid_prefixes = ["exogenous_", "internal_"] | |
negative_values_columns = [ | |
col | |
for col in df.select_dtypes(include=[float, int]).columns | |
if not any(col.startswith(prefix) for prefix in valid_prefixes) | |
and (df[col] < 0).any() | |
] | |
if negative_values_columns: | |
return ( | |
False, | |
f"Negative values detected in columns: {format_values_for_display(negative_values_columns)}. Ensure all media and response metric columns are positive.", | |
) | |
# Check for unassociated columns | |
detected_channels = spends_columns + ["total"] | |
unassociated_columns = [] | |
for col in df.columns: | |
if (col.startswith("_") or col.endswith("_")) or not ( | |
col.startswith("exogenous_") # Column starts with "exogenous_" | |
or col.startswith("internal_") # Column starts with "internal_" | |
or any( | |
col == f"spends_{channel}" for channel in spends_columns | |
) # Column is not in the format "spends_<channel>" | |
or any( | |
col == f"response_metric_{metric}_{channel}" | |
for metric in response_metric_columns | |
for channel in detected_channels | |
) # Column is not in the format "response_metric_<metric>_<channel>" | |
or any( | |
col.startswith("media_") | |
and col.endswith(f"_{channel}") | |
and len(col) > len(f"media__{channel}") | |
for channel in spends_columns | |
) # Column is not in the format "media_<media_variable_name>_<channel>" | |
or col in ["date", "panel"] | |
): | |
unassociated_columns.append(col) | |
if unassociated_columns: | |
return ( | |
False, | |
f"Columns with incorrect format detected: {format_values_for_display(unassociated_columns)}.", | |
) | |
return True, "The data is valid and meets all requirements." | |
# Function to load the uploaded Excel file into a DataFrame | |
def load_and_transform_data(uploaded_file): | |
# Load the uploaded file into a DataFrame if a file is uploaded | |
if uploaded_file is not None: | |
df = pd.read_excel(uploaded_file) | |
else: | |
df = pd.DataFrame() | |
return df | |
# Check if DataFrame exceeds row and column limits | |
if len(df) > upload_rows_limit or len(df.columns) > upload_column_limit: | |
st.warning( | |
f"Data exceeds the row limit of {numerize(upload_rows_limit)} or column limit of {numerize(upload_column_limit)}. Please upload a smaller file.", | |
icon="⚠️", | |
) | |
# Log message | |
log_message( | |
"warning", | |
f"Data exceeds the row limit of {numerize(upload_rows_limit)} or column limit of {numerize(upload_column_limit)}. Please upload a smaller file.", | |
"Data Import", | |
) | |
return pd.DataFrame() | |
# If the DataFrame contains only 'panel' and 'date' columns, return empty DataFrame | |
if set(df.columns) == {"date", "panel"}: | |
return pd.DataFrame() | |
# Transform column names: lower, strip start and end, replace spaces with _ | |
df.columns = [str(col).strip().lower().replace(" ", "_") for col in df.columns] | |
# If 'panel' column exists, clean its values | |
try: | |
if "panel" in df.columns: | |
df["panel"] = ( | |
df["panel"].astype(str).str.lower().str.strip().str.replace(" ", "_") | |
) | |
except: | |
return df | |
try: | |
df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d") | |
except: | |
# The 'date' column is not in the correct format 'YYYY-MM-DD' | |
return df | |
# Check date frequency and convert to daily if needed | |
date_diff = df["date"].diff().dropna() | |
if (date_diff == pd.Timedelta(days=1)).all(): | |
# Data is already at daily level | |
return df | |
elif (date_diff == pd.Timedelta(weeks=1)).all(): | |
# Data is at weekly level, convert to daily | |
weekly_data = df.copy() | |
daily_data = [] | |
for index, row in weekly_data.iterrows(): | |
week_start = row["date"] - pd.to_timedelta(row["date"].weekday(), unit="D") | |
for i in range(7): | |
daily_date = week_start + pd.DateOffset(days=i) | |
new_row = row.copy() | |
new_row["date"] = daily_date | |
for col in df.columns: | |
if isinstance(new_row[col], numbers.Number): | |
new_row[col] = new_row[col] / 7 | |
daily_data.append(new_row) | |
daily_data_df = pd.DataFrame(daily_data) | |
return daily_data_df | |
else: | |
# The 'date' column does not have a daily or weekly frequency | |
return df | |
# Function to merge DataFrames if present | |
def merge_dataframes(gold_layer_df, data_upload_df): | |
if gold_layer_df.empty and data_upload_df.empty: | |
return pd.DataFrame() | |
if not gold_layer_df.empty and not data_upload_df.empty: | |
# Merge gold_layer_df and data_upload_df on 'panel', and 'date' | |
merged_df = pd.merge( | |
gold_layer_df, | |
data_upload_df, | |
on=["panel", "date"], | |
how="outer", | |
suffixes=("_gold", "_upload"), | |
) | |
# Handle duplicate columns | |
for col in merged_df.columns: | |
if col.endswith("_gold"): | |
base_col = col[:-5] # Remove '_gold' suffix | |
upload_col = base_col + "_upload" # Column name in data_upload_df | |
if upload_col in merged_df.columns: | |
# Prefer values from data_upload_df | |
merged_df[base_col] = merged_df[upload_col].combine_first( | |
merged_df[col] | |
) | |
merged_df.drop(columns=[col, upload_col], inplace=True) | |
else: | |
# Rename column to remove the suffix | |
merged_df.rename(columns={col: base_col}, inplace=True) | |
elif data_upload_df.empty: | |
merged_df = gold_layer_df.copy() | |
elif gold_layer_df.empty: | |
merged_df = data_upload_df.copy() | |
return merged_df | |
# Function to check if all required columns are present in the Uploaded DataFrame | |
def check_required_columns(df, detected_channels, detected_response_metric): | |
required_columns = [] | |
# Add all channels with 'spends_' + detected channel name | |
for channel in detected_channels: | |
required_columns.append(f"spends_{channel}") | |
# Add all channels with 'response_metric_' + detected channel name | |
for response_metric in detected_response_metric: | |
for channel in detected_channels + ["total"]: | |
required_columns.append(f"response_metric_{response_metric}_{channel}") | |
# Check for missing columns | |
missing_columns = [col for col in required_columns if col not in df.columns] | |
# Channel groupings | |
no_media_data = [] | |
channel_columns_dict = {} | |
for channel in detected_channels: | |
channel_columns = [ | |
col | |
for col in merged_df.columns | |
if channel in col | |
and not ( | |
col.startswith("response_metric_") | |
or col.startswith("exogenous_") | |
or col.startswith("internal_") | |
) | |
and col.endswith(channel) | |
] | |
channel_columns_dict[channel] = channel_columns | |
if len(channel_columns) <= 1: | |
no_media_data.append(channel) | |
return missing_columns, no_media_data, channel_columns_dict | |
# Function to prepare tool DataFrame | |
def prepare_tool_df(merged_df, granularity_selection): | |
# Drop all response metric columns that do not end with '_total' | |
cols_to_drop = [ | |
col | |
for col in merged_df.columns | |
if col.startswith("response_metric_") and not col.endswith("_total") | |
] | |
# Create a DataFrame to be used for the tool | |
tool_df = merged_df.drop(columns=cols_to_drop) | |
# Convert to weekly granularity by aggregating all data for given panel and week | |
if granularity_selection.lower() == "weekly": | |
tool_df.set_index("date", inplace=True) | |
tool_df = ( | |
tool_df.groupby( | |
[pd.Grouper(freq="W-MON", closed="left", label="left"), "panel"] | |
) | |
.sum() | |
.reset_index() | |
) | |
return tool_df | |
# Function to generate imputation DataFrame | |
def generate_imputation_df(tool_df): | |
# Initialize lists to store the column details | |
column_names = [] | |
categories = [] | |
missing_values_info = [] | |
zero_values_info = [] | |
imputation_methods = [] | |
# Define the function to calculate the percentage of missing values | |
def calculate_missing_percentage(series): | |
return series.isnull().sum(), (series.isnull().mean() * 100) | |
# Define the function to calculate the percentage of zero values | |
def calculate_zero_percentage(series): | |
return (series == 0).sum(), ((series == 0).mean() * 100) | |
# Iterate over each column to categorize and calculate missing and zero values | |
for col in tool_df.columns: | |
# Determine category based on column name prefix | |
if col == "date" or col == "panel": | |
continue | |
elif col.startswith("response_metric_"): | |
categories.append("Response Metrics") | |
elif col.startswith("spends_"): | |
categories.append("Spends") | |
elif col.startswith("exogenous_"): | |
categories.append("Exogenous") | |
elif col.startswith("internal_"): | |
categories.append("Internal") | |
else: | |
categories.append("Media") | |
# Calculate missing values and percentage | |
missing_count, missing_percentage = calculate_missing_percentage(tool_df[col]) | |
missing_values_info.append(f"{missing_count} ({missing_percentage:.1f}%)") | |
# Calculate zero values and percentage | |
zero_count, zero_percentage = calculate_zero_percentage(tool_df[col]) | |
zero_values_info.append(f"{zero_count} ({zero_percentage:.1f}%)") | |
# Determine default imputation method based on conditions | |
if col.startswith("spends_"): | |
imputation_methods.append("Fill with 0") | |
elif col.startswith("response_metric_"): | |
imputation_methods.append("Fill with Mean") | |
elif zero_percentage + missing_percentage > percent_drop_col_threshold: | |
imputation_methods.append("Drop Column") | |
else: | |
imputation_methods.append("Fill with Mean") | |
column_names.append(col) | |
# Create the DataFrame | |
imputation_df = pd.DataFrame( | |
{ | |
"Column Name": column_names, | |
"Category": categories, | |
"Missing Values": missing_values_info, | |
"Zero Values": zero_values_info, | |
"Imputation Method": imputation_methods, | |
} | |
) | |
# Define the category order for sorting | |
category_order = { | |
"Response Metrics": 1, | |
"Spends": 2, | |
"Media": 3, | |
"Exogenous": 4, | |
"Internal": 5, | |
} | |
# Add a temporary column for sorting based on the category order | |
imputation_df["Category Order"] = imputation_df["Category"].map(category_order) | |
# Sort the DataFrame based on the category order and then drop the temporary column | |
imputation_df = imputation_df.sort_values( | |
by=["Category Order", "Column Name"] | |
).drop(columns=["Category Order"]) | |
return imputation_df | |
# Function to perform imputation as per user requests | |
def perform_imputation(imputation_df, tool_df): | |
# Detect channels associated with spends | |
detected_channels = [ | |
col.replace("spends_", "") | |
for col in tool_df.columns | |
if col.startswith("spends_") | |
] | |
# Create a dictionary with keys as channels and values as associated columns | |
group_dict = { | |
channel: [ | |
col | |
for col in tool_df.columns | |
if channel in col | |
and not ( | |
col.startswith("response_metric_") | |
or col.startswith("exogenous_") | |
or col.startswith("internal_") | |
) | |
] | |
for channel in detected_channels | |
} | |
# Create a reverse dictionary with keys as columns and values as channels | |
column_to_channel_dict = { | |
col: channel for channel, cols in group_dict.items() for col in cols | |
} | |
# Perform imputation | |
already_dropped = [] | |
for index, row in imputation_df.iterrows(): | |
col_name = row["Column Name"] | |
impute_method = row["Imputation Method"] | |
# Skip already dropped columns | |
if col_name in already_dropped: | |
continue | |
# Skip imputation if dropping response metric column and add warning | |
if impute_method == "Drop Column" and col_name.startswith("response_metric_"): | |
return None, {}, f"Cannot drop response metric column: {col_name}" | |
# Drop column if requested | |
if impute_method == "Drop Column": | |
# If spends column is dropped, drop all related columns | |
if col_name.startswith("spends_"): | |
tool_df.drop( | |
columns=group_dict[col_name.replace("spends_", "")], | |
inplace=True, | |
) | |
already_dropped += group_dict[col_name.replace("spends_", "")] | |
del group_dict[col_name.replace("spends_", "")] | |
else: | |
tool_df.drop(columns=[col_name], inplace=True) | |
if not ( | |
col_name.startswith("exogenous_") | |
or col_name.startswith("internal_") | |
): | |
group_name = column_to_channel_dict[col_name] | |
group_dict[group_name].remove(col_name) | |
# Check for channels with one or fewer associated columns and add warning if needed | |
if len(group_dict[group_name]) <= 1: | |
return ( | |
None, | |
{}, | |
f"No media variable associated with category {col_name.replace('spends_', '')}.", | |
) | |
continue | |
# Check for each panel | |
for panel in tool_df["panel"].unique(): | |
panel_df = tool_df[tool_df["panel"] == panel] | |
# Check if the column is entirely null or empty for the current panel | |
if panel_df[col_name].isnull().all(): | |
if impute_method in ["Fill with Mean", "Fill with Median"]: | |
return ( | |
None, | |
{}, | |
f"Cannot impute for empty column(s) with mean or median. Select 'Fill with 0'. Details: Panel: {panel}, Column: {col_name}", | |
) | |
# Fill missing values as requested | |
if impute_method == "Fill with Mean": | |
tool_df[col_name] = tool_df.groupby("panel")[col_name].transform( | |
lambda x: x.fillna(x.mean()) | |
) | |
elif impute_method == "Fill with Median": | |
tool_df[col_name] = tool_df.groupby("panel")[col_name].transform( | |
lambda x: x.fillna(x.median()) | |
) | |
elif impute_method == "Fill with 0": | |
tool_df[col_name].fillna(0, inplace=True) | |
# Check if final DataFrame has at least one response metric and two spends categories | |
response_metrics = [ | |
col for col in tool_df.columns if col.startswith("response_metric_") | |
] | |
spends_categories = [col for col in tool_df.columns if col.startswith("spends_")] | |
if len(response_metrics) < 1: | |
return (None, {}, "The final DataFrame must have at least one response metric.") | |
if len(spends_categories) < 2: | |
return ( | |
None, | |
{}, | |
"The final DataFrame must have at least two spends categories.", | |
) | |
return tool_df, group_dict, "Imputed Successfully!" | |
# Function to display groups with custom styling | |
def display_groups(input_dict): | |
# Define custom CSS for pastel light blue rounded rectangle | |
custom_css = """ | |
<style> | |
.group-box { | |
background-color: #ffdaab; | |
border-radius: 10px; | |
padding: 10px; | |
margin: 5px 0; | |
} | |
</style> | |
""" | |
st.markdown(custom_css, unsafe_allow_html=True) | |
for group_name, values in input_dict.items(): | |
group_html = f"<div class='group-box'><strong>{group_name}:</strong> {format_values_for_display(values)}</div>" | |
st.markdown(group_html, unsafe_allow_html=True) | |
# Function to categorize columns and create an ordered dictionary | |
def create_ordered_category_dict(df): | |
category_dict = { | |
"Response Metrics": [], | |
"Spends": [], | |
"Media": [], | |
"Exogenous": [], | |
"Internal": [], | |
} | |
# Define the category order for sorting | |
category_order = { | |
"Response Metrics": 1, | |
"Spends": 2, | |
"Media": 3, | |
"Exogenous": 4, | |
"Internal": 5, | |
} | |
for column in df.columns: | |
if column == "date" or column == "panel": | |
continue # Skip 'date' and 'panel' columns | |
if column.startswith("response_metric_"): | |
category_dict["Response Metrics"].append(column) | |
elif column.startswith("spends_"): | |
category_dict["Spends"].append(column) | |
elif column.startswith("exogenous_"): | |
category_dict["Exogenous"].append(column) | |
elif column.startswith("internal_"): | |
category_dict["Internal"].append(column) | |
else: | |
category_dict["Media"].append(column) | |
# Sort the dictionary based on the defined category order | |
sorted_category_dict = OrderedDict( | |
sorted(category_dict.items(), key=lambda item: category_order[item[0]]) | |
) | |
return sorted_category_dict | |
try: | |
# Page Title | |
st.title("Data Import") | |
# Create file uploader | |
uploaded_file = st.file_uploader( | |
"Upload Data", type=["xlsx"], accept_multiple_files=False | |
) | |
# Expander with markdown for upload rules | |
with st.expander("Upload Rules and Guidelines"): | |
st.markdown( | |
""" | |
### Upload Guidelines | |
Please ensure your data adheres to the following rules: | |
1. **File Format**: | |
- Upload all data in a single Excel file. | |
2. **Compulsory Columns**: | |
- **Date**: Must be in the format `YYYY-MM-DD` only. | |
- **Panel**: If no panel data exists, use `aggregated` as a single panel. | |
3. **Column Naming Conventions**: | |
- All columns should start with the associated category prefix. | |
**Examples**: | |
- **Response Metric Column**: | |
- Format: `response_metric_<response_metric_name>_<channel_name>` | |
- Example: `response_metric_revenue_facebook` | |
- **Total Response Metric**: | |
- Format: `response_metric_<response_metric_name>_total` | |
- Example: `response_metric_revenue_total` | |
- **Spend Column**: | |
- Format: `spends_<channel_name>` | |
- Example: `spends_facebook` | |
- **Media Column**: | |
- Format: `media_<media_variable_name>_<channel_name>` | |
- Example: `media_clicks_facebook` | |
- **Exogenous Column**: | |
- Format: `exogenous_<variable_name>` | |
- Example: `exogenous_unemployment_rate` | |
- **Internal Column**: | |
- Format: `internal_<variable_name>` | |
- Example: `internal_discount` | |
**Notes**: | |
- The `total` response metric should represent the total for a particular date and panel, including all channels and organic contributions. | |
- The `date` column for weekly data should be the Monday of that week, representing the data from that Monday to the following Sunday. Example: If the week starts on Monday, August 5th, 2024, and ends on Sunday, August 11th, 2024, the date column for that week should display 2024-08-05. | |
""" | |
) | |
# Upload warning placeholder | |
upload_warning_placeholder = st.container() | |
# Load the uploaded file into a DataFrame if a file is uploaded | |
data_upload_df = load_and_transform_data(uploaded_file) | |
# Columns for user input | |
granularity_col, validate_process_col = st.columns(2) | |
# Dropdown for data granularity | |
granularity_selection = granularity_col.selectbox( | |
"Select data granularity", | |
options=["daily", "weekly"], | |
format_func=name_format_func, | |
key="granularity_selection_key", | |
) | |
# Gold Layer DataFrame | |
gold_layer_df = st.session_state["project_dct"]["data_import"]["gold_layer_df"] | |
if not gold_layer_df.empty: | |
st.subheader("Gold Layer DataFrame") | |
with st.expander("Gold Layer DataFrame"): | |
st.dataframe( | |
gold_layer_df, | |
hide_index=True, | |
column_config={ | |
"date": st.column_config.DateColumn("date", format="YYYY-MM-DD") | |
}, | |
) | |
else: | |
st.info( | |
"No gold layer data is selected for this project. Please upload data manually.", | |
icon="📊", | |
) | |
# Check input data | |
with validate_process_col: | |
st.write("##") # Padding | |
if validate_process_col.button("Validate and Process", use_container_width=True): | |
with st.spinner("Processing ..."): | |
# Check if both DataFrames are empty | |
valid_input = True | |
if gold_layer_df.empty and data_upload_df.empty: | |
# If both gold_layer_df and data_upload_df are empty, display a warning and stop the script | |
st.warning( | |
"Both the Gold Layer data and the uploaded data are empty. Please provide at least one data source.", | |
icon="⚠️", | |
) | |
# Log message | |
log_message( | |
"warning", | |
"Both the Gold Layer data and the uploaded data are empty. Please provide at least one data source.", | |
"Data Import", | |
) | |
valid_input = False | |
# If the uploaded DataFrame is empty and the Gold Layer is not, swap them to ensure all validation conditions are checked | |
elif not gold_layer_df.empty and data_upload_df.empty: | |
data_upload_df, gold_layer_df = ( | |
gold_layer_df.copy(), | |
data_upload_df.copy(), | |
) | |
valid_input = True | |
if valid_input: | |
# Fetch all necessary columns list | |
( | |
spends_columns, | |
response_metric_columns, | |
total_columns, | |
gold_layer_columns, | |
data_upload_columns, | |
) = fetch_columns(gold_layer_df, data_upload_df) | |
with upload_warning_placeholder: | |
valid_input, message = valid_input_df( | |
data_upload_df, | |
spends_columns, | |
response_metric_columns, | |
total_columns, | |
gold_layer_columns, | |
data_upload_columns, | |
) | |
if not valid_input: | |
st.warning(message, icon="⚠️") | |
# Log message | |
log_message("warning", message, "Data Import") | |
# Merge gold_layer_df and data_upload_df on 'panel' and 'date' | |
if valid_input: | |
merged_df = merge_dataframes(gold_layer_df, data_upload_df) | |
missing_columns, no_media_data, channel_columns_dict = ( | |
check_required_columns( | |
merged_df, spends_columns, response_metric_columns | |
) | |
) | |
with upload_warning_placeholder: | |
# Warning for categories with no media data | |
if no_media_data: | |
st.warning( | |
f"Categories without media data: {format_values_for_display(no_media_data)}. Please upload at least one media column to proceed.", | |
icon="⚠️", | |
) | |
valid_input = False | |
# Log message | |
log_message( | |
"warning", | |
f"Categories without media data: {format_values_for_display(no_media_data)}. Please upload at least one media column to proceed.", | |
"Data Import", | |
) | |
# Warning for insufficient rows | |
elif any( | |
granularity_selection == "daily" | |
and len(merged_df[merged_df["panel"] == panel]) | |
< minimum_row_req | |
for panel in merged_df["panel"].unique() | |
): | |
st.warning( | |
f"Insufficient data. Please provide at least {minimum_row_req} days of data for all panel.", | |
icon="⚠️", | |
) | |
valid_input = False | |
# Log message | |
log_message( | |
"warning", | |
f"Insufficient data. Please provide at least {minimum_row_req} days of data for all panel.", | |
"Data Import", | |
) | |
elif any( | |
granularity_selection == "weekly" | |
and len(merged_df[merged_df["panel"] == panel]) | |
< minimum_row_req * 7 | |
for panel in merged_df["panel"].unique() | |
): | |
st.warning( | |
f"Insufficient data. Please provide at least {minimum_row_req} weeks of data for all panel.", | |
icon="⚠️", | |
) | |
valid_input = False | |
# Log message | |
log_message( | |
"warning", | |
f"Insufficient data. Please provide at least {minimum_row_req} weeks of data for all panel.", | |
"Data Import", | |
) | |
# Info for missing columns | |
elif missing_columns: | |
st.info( | |
f"Missing columns: {format_values_for_display(missing_columns)}. Please upload all required columns.", | |
icon="💡", | |
) | |
if valid_input: | |
# Create a copy of the merged DataFrame for dashboard purposes | |
dashboard_df = merged_df | |
# Create a DataFrame for tool purposes | |
tool_df = prepare_tool_df(merged_df, granularity_selection) | |
# Create Imputation DataFrame | |
imputation_df = generate_imputation_df(tool_df) | |
# Save data to project dictionary | |
st.session_state["project_dct"]["data_import"][ | |
"granularity_selection" | |
] = st.session_state["granularity_selection_key"] | |
st.session_state["project_dct"]["data_import"][ | |
"dashboard_df" | |
] = dashboard_df | |
st.session_state["project_dct"]["data_import"]["tool_df"] = tool_df | |
st.session_state["project_dct"]["data_import"]["unique_panels"] = ( | |
tool_df["panel"].unique() | |
) | |
st.session_state["project_dct"]["data_import"][ | |
"imputation_df" | |
] = imputation_df | |
# Success message | |
with upload_warning_placeholder: | |
st.success("Processed Successfully!", icon="🗂️") | |
st.toast("Processed Successfully!", icon="🗂️") | |
# Log message | |
log_message("info", "Processed Successfully!", "Data Import") | |
# Load saved data from project dictionary | |
if st.session_state["project_dct"]["data_import"]["tool_df"] is None: | |
st.stop() | |
else: | |
tool_df = st.session_state["project_dct"]["data_import"]["tool_df"] | |
imputation_df = st.session_state["project_dct"]["data_import"]["imputation_df"] | |
unique_panels = st.session_state["project_dct"]["data_import"]["unique_panels"] | |
# Unique Panel | |
st.subheader("Unique Panel") | |
# Get unique panels count | |
total_count = len(unique_panels) | |
# Define custom CSS for pastel light blue rounded rectangle | |
custom_css = """ | |
<style> | |
.panel-box { | |
background-color: #ffdaab; | |
border-radius: 10px; | |
padding: 10px; | |
margin: 0 0; | |
} | |
</style> | |
""" | |
# Display unique panels with total count | |
st.markdown(custom_css, unsafe_allow_html=True) | |
panel_html = f"<div class='panel-box'><strong>Unique Panels:</strong> {format_values_for_display(unique_panels)}<br><strong>Total Count:</strong> {total_count}</div>" | |
st.markdown(panel_html, unsafe_allow_html=True) | |
st.write("##") # Padding | |
# Impute Missing Values | |
st.subheader("Impute Missing Values") | |
edited_imputation_df = st.data_editor( | |
imputation_df, | |
column_config={ | |
"Imputation Method": st.column_config.SelectboxColumn( | |
options=[ | |
"Drop Column", | |
"Fill with Mean", | |
"Fill with Median", | |
"Fill with 0", | |
], | |
required=True, | |
default="Fill with 0", | |
), | |
}, | |
column_order=[ | |
"Column Name", | |
"Category", | |
"Missing Values", | |
"Zero Values", | |
"Imputation Method", | |
], | |
disabled=["Column Name", "Category", "Missing Values", "Zero Values"], | |
hide_index=True, | |
use_container_width=True, | |
key="imputation_df_key", | |
) | |
# Expander with markdown for imputation rules | |
with st.expander("Impute Missing Values Guidelines"): | |
st.markdown( | |
f""" | |
### Imputation Guidelines | |
Please adhere to the following rules when handling missing values: | |
1. **Default Imputation Strategies**: | |
- **Response Metrics**: Imputed using the **mean** value of the column. | |
- **Spends**: Imputed with **zero** values. | |
- **Media, Exogenous, Internal**: Imputation strategy is **dynamic** based on the data. | |
2. **Drop Threshold**: | |
- If the combined percentage of **zeros** and **null values** in any column exceeds `{percent_drop_col_threshold}%`, the column will be **categorized to drop** by default which user can change manually. | |
- **Example**: If `spends_facebook` has more than `{percent_drop_col_threshold}%` of zeros and nulls combined, it will be marked for dropping. | |
3. **Category Generation and Association**: | |
- Categories are automatically generated from the **Spends** columns. | |
- **Example**: The column `spends_facebook` will generate the **facebook** category. This means columns like `spends_facebook`, `media_impression_facebook` and `media_clicks_facebook` will also be associated with this category. | |
4. **Column Association and Imputation**: | |
- Each category must have at least **one Media column** associated with it for imputation to proceed. | |
- **Example**: If the **facebook** category does not have any media columns like `media_impression_facebook`, imputation will not be allowed for that category. | |
- Solution: Either **drop the entire category** if it is empty, or **impute the columns** associated with the category instead of dropping them. | |
5. **Response Metrics and Category Count**: | |
- Dropping **Response Metric** columns is **not allowed** under any circumstances. | |
- At least **two categories** must exist after imputation, or the Imputation will not proceed. | |
- **Example**: If only **facebook** remains after selection, imputation will be halted. | |
**Notes**: | |
- The decision to drop a spends column will result in all associated columns being dropped. | |
- **Example**: Dropping `spends_facebook` will also drop all related columns like `media_impression_facebook` and `media_clicks_facebook`. | |
""" | |
) | |
# Imputation Warning Placeholder | |
imputation_warning_placeholder = st.container() | |
# Save the DataFrame and dictionary from the current session | |
if st.button("Impute and Save", use_container_width=True): | |
with st.spinner("Imputing ..."): | |
with imputation_warning_placeholder: | |
# Perform Imputation | |
imputed_tool_df, group_dict, message = perform_imputation( | |
edited_imputation_df.copy(), tool_df.copy() | |
) | |
if imputed_tool_df is None: | |
st.warning(message, icon="⚠️") | |
# Log message | |
log_message("warning", message, "Data Import") | |
else: | |
st.session_state["project_dct"]["data_import"][ | |
"imputed_tool_df" | |
] = imputed_tool_df | |
st.session_state["project_dct"]["data_import"][ | |
"imputation_df" | |
] = edited_imputation_df | |
st.session_state["project_dct"]["data_import"][ | |
"group_dict" | |
] = group_dict | |
st.session_state["project_dct"]["data_import"]["category_dict"] = ( | |
create_ordered_category_dict(imputed_tool_df) | |
) | |
if imputed_tool_df is not None: | |
# Update DB | |
update_db( | |
prj_id=st.session_state["project_number"], | |
page_nam="Data Import", | |
file_nam="project_dct", | |
pkl_obj=pickle.dumps(st.session_state["project_dct"]), | |
schema=schema, | |
) | |
# Success message | |
st.success("Saved Successfully!", icon="💾") | |
st.toast("Saved Successfully!", icon="💾") | |
# Log message | |
log_message("info", "Saved Successfully!", "Data Import") | |
# Load saved data from project dictionary | |
if st.session_state["project_dct"]["data_import"]["imputed_tool_df"] is None: | |
st.stop() | |
else: | |
imputed_tool_df = st.session_state["project_dct"]["data_import"][ | |
"imputed_tool_df" | |
] | |
group_dict = st.session_state["project_dct"]["data_import"]["group_dict"] | |
category_dict = st.session_state["project_dct"]["data_import"]["category_dict"] | |
# Channel Groupings | |
st.subheader("Channel Groupings") | |
display_groups(group_dict) | |
st.write("##") # Padding | |
# Variable Categorization | |
st.subheader("Variable Categorization") | |
display_groups(category_dict) | |
st.write("##") # Padding | |
# Final DataFrame | |
st.subheader("Final DataFrame") | |
st.dataframe( | |
imputed_tool_df, | |
hide_index=True, | |
column_config={ | |
"date": st.column_config.DateColumn("date", format="YYYY-MM-DD") | |
}, | |
) | |
st.write("##") # Padding | |
except Exception as e: | |
# Capture the error details | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
error_message = "".join( | |
traceback.format_exception(exc_type, exc_value, exc_traceback) | |
) | |
# Log message | |
log_message("error", f"An error occurred: {error_message}.", "Data Import") | |
# Display a warning message | |
st.warning( | |
"Oops! Something went wrong. Please try refreshing the tool or creating a new project.", | |
icon="⚠️", | |
) | |