Spaces:
Sleeping
Sleeping
# Importing necessary libraries | |
import streamlit as st | |
import pickle | |
st.set_page_config( | |
page_title="Model Build", | |
page_icon=":shark:", | |
layout="wide", | |
initial_sidebar_state="collapsed", | |
) | |
from utilities import load_authenticator | |
import numpy as np | |
import pandas as pd | |
from utilities import set_header, load_local_css | |
load_local_css("styles.css") | |
set_header() | |
for k, v in st.session_state.items(): | |
if k not in ['logout', 'login','config'] and not k.startswith('FormSubmitter'): | |
st.session_state[k] = v | |
authenticator = st.session_state.get('authenticator') | |
if authenticator is None: | |
authenticator = load_authenticator() | |
name, authentication_status, username = authenticator.login('Login', 'main') | |
auth_status = st.session_state.get('authentication_status') | |
if auth_status == True: | |
is_state_initiaized = st.session_state.get('initialized',False) | |
if not is_state_initiaized: | |
a=1 | |
# Function to expand dataframe to daily | |
def expand_to_daily(df, granularity, start_date, end_date): | |
# Create a new DataFrame with a row for each day | |
all_dates = pd.date_range(start=start_date, end=end_date, freq="D") | |
daily_df = pd.DataFrame(all_dates, columns=["Date"]) | |
if granularity == "daily": | |
# For daily data, simply merge to fill missing dates | |
daily_df = daily_df.merge(df, on="Date", how="left") | |
else: | |
# For weekly or monthly, distribute values to daily rows | |
for column in df.columns: | |
if column != "Date": # Skip 'Date' column | |
daily_df[column] = np.nan # Initialize with NaNs | |
# Group by the required frequency and distribute values | |
freq = "W-MON" if granularity == "weekly" else "MS" | |
for _, group in df.groupby(pd.Grouper(key="Date", freq=freq)): | |
num_days = len( | |
pd.date_range(group["Date"].min(), group["Date"].max(), freq="D") | |
) | |
for column in group.columns: | |
if column == "Date": # Skip 'Date' column | |
continue | |
value = group[column].sum() / num_days | |
date_range = pd.date_range( | |
group["Date"].min(), periods=num_days, freq="D" | |
) | |
daily_df.loc[daily_df["Date"].isin(date_range), column] = value | |
return daily_df | |
# Function to validate date column in dataframe | |
def validate_date_column(df): | |
try: | |
# Attempt to convert the 'Date' column to datetime | |
df["Date"] = pd.to_datetime(df["Date"], format="%d-%m-%Y") | |
return True | |
except: | |
return False | |
# Function to determine data interval | |
def determine_data_interval(common_freq): | |
if common_freq == 1: | |
return "daily" | |
elif common_freq == 7: | |
return "weekly" | |
elif 28 <= common_freq <= 31: | |
return "monthly" | |
else: | |
return "irregular" | |
# Function to convert and fill dates in dataframe | |
def convert_and_fill_dates(df, start_date, end_date, interval): | |
# Create a date range for the desired period | |
all_dates = pd.date_range(start=start_date, end=end_date, freq="D") | |
new_df = pd.DataFrame(all_dates, columns=["Date"]) | |
# Preprocess and aggregate data based on the original interval | |
if interval != "daily": | |
# Resample to start of each week/month, then sum values for the same period | |
if interval == "weekly": | |
df = df.resample("W-MON", on="Date").sum().reset_index() | |
elif interval == "monthly": | |
df = df.resample("MS", on="Date").sum().reset_index() | |
# Distribute values equally across the days in each week/month | |
expanded_rows = [] | |
for _, row in df.iterrows(): | |
if interval == "weekly": | |
period_dates = pd.date_range(row["Date"], periods=7) | |
elif interval == "monthly": | |
period_end = row["Date"] + pd.offsets.MonthEnd(1) | |
period_dates = pd.date_range(row["Date"], period_end) | |
for date in period_dates: | |
new_row = row.copy() | |
new_row["Date"] = date | |
for col in df.columns: | |
if col != "Date": # Skip 'Date' column | |
new_row[col] = row[col] / len(period_dates) | |
expanded_rows.append(new_row) | |
# Create a DataFrame from expanded rows | |
expanded_df = pd.DataFrame(expanded_rows) | |
new_df = pd.merge(new_df, expanded_df, how="left", on="Date") | |
else: | |
# Daily data, aggregate if there are multiple entries for the same day | |
df = df.groupby("Date").sum().reset_index() | |
new_df = pd.merge(new_df, df, how="left", on="Date") | |
# Ensure all dates from start to end are present, filling missing values with NaN | |
new_df["Date"] = pd.to_datetime(new_df["Date"]) # Ensure 'Date' is datetime type | |
new_df = new_df.set_index("Date").reindex(all_dates).reset_index() | |
new_df.rename(columns={"index": "Date"}, inplace=True) | |
return new_df | |
# Function to convert a DataFrame from daily level granularity to either weekly or monthly level | |
def convert_to_higher_granularity(df, required_granularity): | |
if required_granularity == "daily": | |
return df | |
# Ensure 'Date' is the index and is in datetime format | |
if not pd.api.types.is_datetime64_any_dtype(df["Date"]): | |
df["Date"] = pd.to_datetime(df["Date"]) | |
df.set_index("Date", inplace=True) | |
# Resample and aggregate | |
if required_granularity == "weekly": | |
# Resample to weekly, using 'W-MON' to indicate weeks starting on Monday | |
df = df.resample("W-MON").sum() | |
elif required_granularity == "monthly": | |
# Resample to monthly, using 'MS' to indicate month start | |
df = df.resample("MS").sum() | |
# Reset index to move 'Date' back to a column | |
df.reset_index(inplace=True) | |
return df | |
# # Read the CSV file, parsing 'Date' column as datetime | |
main_df = pd.read_csv("Media_data_for_model_dma_level.csv", dayfirst=True, parse_dates=["Date"]) | |
# st.write(main_df) | |
# Get the start date (minimum) and end date (maximum) from the 'Date' column | |
api_start_date = main_df["Date"].min() | |
api_end_date = main_df["Date"].max() | |
# Infer the granularity from the most common difference between consecutive dates | |
date_diffs = main_df["Date"].diff().dt.days.dropna() | |
common_diff = date_diffs.mode()[0] | |
api_granularity = determine_data_interval(common_diff) | |
# Convert the DataFrame to daily level granularity | |
main_df = expand_to_daily(main_df, api_granularity, api_start_date, api_end_date) | |
# Page Title | |
st.title("Data Import") | |
# File uploader | |
uploaded_files = st.file_uploader( | |
"Upload additional data", type=["xlsx"], accept_multiple_files=True | |
) | |
# Custom HTML for upload instructions | |
recommendation_html = f""" | |
<div style="text-align: justify;"> | |
<strong>Recommendation:</strong> For optimal processing, please ensure that all uploaded datasets including media, internal, and exogenous data adhere to the following guidelines: Each dataset must include a <code>Date</code> column formatted as <code>DD-MM-YYYY</code>, be free of missing values, and aggregated to a {api_granularity} level. | |
</div> | |
""" | |
st.markdown(recommendation_html, unsafe_allow_html=True) | |
# Initialize a list to collect all processed DataFrames | |
all_data_dfs = [] | |
if uploaded_files: | |
for uploaded_file in uploaded_files: | |
# Extract the file name | |
file_name = uploaded_file.name | |
# Load the file into a DataFrame | |
data_df = pd.read_excel( | |
uploaded_file, | |
) | |
# Identify numeric columns in the DataFrame | |
numeric_columns = data_df.select_dtypes(include="number").columns.tolist() | |
# Validate the 'Date' column and ensure there's at least one numeric column | |
if validate_date_column(data_df) and len(numeric_columns) > 0: | |
data_df = data_df[["Date"] + numeric_columns] | |
# Ensure the 'Date' column is in datetime format and sorted | |
data_df["Date"] = pd.to_datetime(data_df["Date"], dayfirst=True) | |
data_df.sort_values("Date", inplace=True) | |
# Calculate the most common day difference between dates to determine frequency | |
common_freq = data_df["Date"].diff().dt.days.dropna().mode()[0] | |
# Calculate the data interval (daily, weekly, monthly or irregular) | |
interval = determine_data_interval(common_freq) | |
if interval == "irregular": | |
# Warn the user if the 'Date' column doesn't meet the format requirements | |
st.warning( | |
f"File Name: {file_name} β Please upload data in daily, weekly or monthly interval." | |
) | |
continue | |
# Convert data to specified interval and redistribute to daily | |
data_df = convert_and_fill_dates( | |
data_df, api_start_date, api_end_date, interval | |
) | |
# Add the processed DataFrame to the list | |
all_data_dfs.append(data_df) | |
else: | |
# Warn the user if the 'Date' column doesn't meet the format requirements | |
st.warning( | |
f"File Name: {file_name} β Please upload data with Date column in 'DD-MM-YYYY' format and at least one media/exogenous column." | |
) | |
# Sequentially merge each of the other DataFrames with the main DataFrame on 'Date' | |
for df in all_data_dfs: | |
main_df = pd.merge(main_df, df, on="Date", how="left") | |
# Function to calculate missing stats and prepare for editable DataFrame | |
def prepare_missing_stats_df(df): | |
missing_stats = [] | |
for column in df.columns: | |
if ( | |
column == "Date" or column == "Total Approved Accounts - Revenue" | |
): # Skip Date and Revenue column | |
continue | |
missing = df[column].isnull().sum() | |
pct_missing = round((missing / len(df)) * 100, 2) | |
missing_stats.append( | |
{ | |
"Column": column, | |
"Missing Values": missing, | |
"Missing Percentage": pct_missing, | |
"Impute Method": "Fill with 0", # Default value | |
"Category": "Media", # Default value | |
} | |
) | |
stats_df = pd.DataFrame(missing_stats) | |
return stats_df | |
# Prepare missing stats DataFrame for editing | |
missing_stats_df = prepare_missing_stats_df(main_df) | |
# Create an editable DataFrame in Streamlit | |
st.markdown("#### Select Variables Category & Impute Missing Values") | |
edited_stats_df = st.data_editor( | |
missing_stats_df, | |
column_config={ | |
"Impute Method": st.column_config.SelectboxColumn( | |
options=[ | |
"Drop Column", | |
"Fill with Mean", | |
"Fill with Median", | |
"Fill with 0", | |
], | |
required=True, | |
default="Fill with 0", | |
), | |
"Category": st.column_config.SelectboxColumn( | |
options=[ | |
"Date", | |
"Media", | |
"Exogenous", | |
"Internal", | |
"DMA/Panel", | |
"Response_Metric" | |
], | |
required=True, | |
default="Media", | |
), | |
}, | |
disabled=["Column", "Missing Values", "Missing Percentage"], | |
hide_index=True, | |
use_container_width=True, | |
) | |
# Apply changes based on edited DataFrame | |
for i, row in edited_stats_df.iterrows(): | |
column = row["Column"] | |
if row["Impute Method"] == "Drop Column": | |
main_df.drop(columns=[column], inplace=True) | |
elif row["Impute Method"] == "Fill with Mean": | |
main_df[column].fillna(main_df[column].mean(), inplace=True) | |
elif row["Impute Method"] == "Fill with Median": | |
main_df[column].fillna(main_df[column].median(), inplace=True) | |
elif row["Impute Method"] == "Fill with 0": | |
main_df[column].fillna(0, inplace=True) | |
# Convert the Final DataFrame to required granularity | |
main_df = convert_to_higher_granularity(main_df, api_granularity) | |
# Display the Final DataFrame and exogenous variables | |
st.markdown("#### Final DataFrame:") | |
st.dataframe(main_df) | |
# Initialize an empty dictionary to hold categories and their variables | |
category_dict = {} | |
# Iterate over each row in the edited DataFrame to populate the dictionary | |
for i, row in edited_stats_df.iterrows(): | |
column = row["Column"] | |
category = row["Category"] # The category chosen by the user for this variable | |
# Check if the category already exists in the dictionary | |
if category not in category_dict: | |
# If not, initialize it with the current column as its first element | |
category_dict[category] = [column] | |
else: | |
# If it exists, append the current column to the list of variables under this category | |
category_dict[category].append(column) | |
# Display the dictionary | |
st.markdown("#### Variable Category:") | |
for category, variables in category_dict.items(): | |
# Check if there are multiple variables to handle "and" insertion correctly | |
if len(variables) > 1: | |
# Join all but the last variable with ", ", then add " and " before the last variable | |
variables_str = ", ".join(variables[:-1]) + " and " + variables[-1] | |
else: | |
# If there's only one variable, no need for "and" | |
variables_str = variables[0] | |
# Display the category and its variables in the desired format | |
st.markdown(f"**{category}:** {variables_str}\n\n", unsafe_allow_html=True) | |
# storing maindf and categories in session_state | |
# st.write(main_df) | |
# st.session_state['Cleaned_data']=main_df | |
# st.session_state['category_dict']=category_dict | |
if st.button('Save Changes'): | |
with open("Pickle_files/main_df", 'wb') as f: | |
pickle.dump(main_df, f) | |
with open("Pickle_files/category_dict",'wb') as c: | |
pickle.dump(category_dict,c) | |
st.success('Changes Saved!') | |