Mastercard / Data_Import.py
BlendMMM's picture
Upload 28 files
bb080e9 verified
raw
history blame
15 kB
# Importing necessary libraries
import streamlit as st
import pickle
st.set_page_config(
page_title="Model Build",
page_icon=":shark:",
layout="wide",
initial_sidebar_state="collapsed",
)
from utilities import load_authenticator
import numpy as np
import pandas as pd
from utilities import set_header, load_local_css
load_local_css("styles.css")
set_header()
for k, v in st.session_state.items():
if k not in ['logout', 'login','config'] and not k.startswith('FormSubmitter'):
st.session_state[k] = v
authenticator = st.session_state.get('authenticator')
if authenticator is None:
authenticator = load_authenticator()
name, authentication_status, username = authenticator.login('Login', 'main')
auth_status = st.session_state.get('authentication_status')
if auth_status == True:
is_state_initiaized = st.session_state.get('initialized',False)
if not is_state_initiaized:
a=1
# Function to expand dataframe to daily
@st.cache_resource(show_spinner=False)
def expand_to_daily(df, granularity, start_date, end_date):
# Create a new DataFrame with a row for each day
all_dates = pd.date_range(start=start_date, end=end_date, freq="D")
daily_df = pd.DataFrame(all_dates, columns=["Date"])
if granularity == "daily":
# For daily data, simply merge to fill missing dates
daily_df = daily_df.merge(df, on="Date", how="left")
else:
# For weekly or monthly, distribute values to daily rows
for column in df.columns:
if column != "Date": # Skip 'Date' column
daily_df[column] = np.nan # Initialize with NaNs
# Group by the required frequency and distribute values
freq = "W-MON" if granularity == "weekly" else "MS"
for _, group in df.groupby(pd.Grouper(key="Date", freq=freq)):
num_days = len(
pd.date_range(group["Date"].min(), group["Date"].max(), freq="D")
)
for column in group.columns:
if column == "Date": # Skip 'Date' column
continue
value = group[column].sum() / num_days
date_range = pd.date_range(
group["Date"].min(), periods=num_days, freq="D"
)
daily_df.loc[daily_df["Date"].isin(date_range), column] = value
return daily_df
# Function to validate date column in dataframe
def validate_date_column(df):
try:
# Attempt to convert the 'Date' column to datetime
df["Date"] = pd.to_datetime(df["Date"], format="%d-%m-%Y")
return True
except:
return False
# Function to determine data interval
def determine_data_interval(common_freq):
if common_freq == 1:
return "daily"
elif common_freq == 7:
return "weekly"
elif 28 <= common_freq <= 31:
return "monthly"
else:
return "irregular"
# Function to convert and fill dates in dataframe
def convert_and_fill_dates(df, start_date, end_date, interval):
# Create a date range for the desired period
all_dates = pd.date_range(start=start_date, end=end_date, freq="D")
new_df = pd.DataFrame(all_dates, columns=["Date"])
# Preprocess and aggregate data based on the original interval
if interval != "daily":
# Resample to start of each week/month, then sum values for the same period
if interval == "weekly":
df = df.resample("W-MON", on="Date").sum().reset_index()
elif interval == "monthly":
df = df.resample("MS", on="Date").sum().reset_index()
# Distribute values equally across the days in each week/month
expanded_rows = []
for _, row in df.iterrows():
if interval == "weekly":
period_dates = pd.date_range(row["Date"], periods=7)
elif interval == "monthly":
period_end = row["Date"] + pd.offsets.MonthEnd(1)
period_dates = pd.date_range(row["Date"], period_end)
for date in period_dates:
new_row = row.copy()
new_row["Date"] = date
for col in df.columns:
if col != "Date": # Skip 'Date' column
new_row[col] = row[col] / len(period_dates)
expanded_rows.append(new_row)
# Create a DataFrame from expanded rows
expanded_df = pd.DataFrame(expanded_rows)
new_df = pd.merge(new_df, expanded_df, how="left", on="Date")
else:
# Daily data, aggregate if there are multiple entries for the same day
df = df.groupby("Date").sum().reset_index()
new_df = pd.merge(new_df, df, how="left", on="Date")
# Ensure all dates from start to end are present, filling missing values with NaN
new_df["Date"] = pd.to_datetime(new_df["Date"]) # Ensure 'Date' is datetime type
new_df = new_df.set_index("Date").reindex(all_dates).reset_index()
new_df.rename(columns={"index": "Date"}, inplace=True)
return new_df
# Function to convert a DataFrame from daily level granularity to either weekly or monthly level
def convert_to_higher_granularity(df, required_granularity):
if required_granularity == "daily":
return df
# Ensure 'Date' is the index and is in datetime format
if not pd.api.types.is_datetime64_any_dtype(df["Date"]):
df["Date"] = pd.to_datetime(df["Date"])
df.set_index("Date", inplace=True)
# Resample and aggregate
if required_granularity == "weekly":
# Resample to weekly, using 'W-MON' to indicate weeks starting on Monday
df = df.resample("W-MON").sum()
elif required_granularity == "monthly":
# Resample to monthly, using 'MS' to indicate month start
df = df.resample("MS").sum()
# Reset index to move 'Date' back to a column
df.reset_index(inplace=True)
return df
# # Read the CSV file, parsing 'Date' column as datetime
main_df = pd.read_csv("Media_data_for_model_dma_level.csv", dayfirst=True, parse_dates=["Date"])
# st.write(main_df)
# Get the start date (minimum) and end date (maximum) from the 'Date' column
api_start_date = main_df["Date"].min()
api_end_date = main_df["Date"].max()
# Infer the granularity from the most common difference between consecutive dates
date_diffs = main_df["Date"].diff().dt.days.dropna()
common_diff = date_diffs.mode()[0]
api_granularity = determine_data_interval(common_diff)
# Convert the DataFrame to daily level granularity
main_df = expand_to_daily(main_df, api_granularity, api_start_date, api_end_date)
# Page Title
st.title("Data Import")
# File uploader
uploaded_files = st.file_uploader(
"Upload additional data", type=["xlsx"], accept_multiple_files=True
)
# Custom HTML for upload instructions
recommendation_html = f"""
<div style="text-align: justify;">
<strong>Recommendation:</strong> For optimal processing, please ensure that all uploaded datasets including media, internal, and exogenous data adhere to the following guidelines: Each dataset must include a <code>Date</code> column formatted as <code>DD-MM-YYYY</code>, be free of missing values, and aggregated to a {api_granularity} level.
</div>
"""
st.markdown(recommendation_html, unsafe_allow_html=True)
# Initialize a list to collect all processed DataFrames
all_data_dfs = []
if uploaded_files:
for uploaded_file in uploaded_files:
# Extract the file name
file_name = uploaded_file.name
# Load the file into a DataFrame
data_df = pd.read_excel(
uploaded_file,
)
# Identify numeric columns in the DataFrame
numeric_columns = data_df.select_dtypes(include="number").columns.tolist()
# Validate the 'Date' column and ensure there's at least one numeric column
if validate_date_column(data_df) and len(numeric_columns) > 0:
data_df = data_df[["Date"] + numeric_columns]
# Ensure the 'Date' column is in datetime format and sorted
data_df["Date"] = pd.to_datetime(data_df["Date"], dayfirst=True)
data_df.sort_values("Date", inplace=True)
# Calculate the most common day difference between dates to determine frequency
common_freq = data_df["Date"].diff().dt.days.dropna().mode()[0]
# Calculate the data interval (daily, weekly, monthly or irregular)
interval = determine_data_interval(common_freq)
if interval == "irregular":
# Warn the user if the 'Date' column doesn't meet the format requirements
st.warning(
f"File Name: {file_name} ➜ Please upload data in daily, weekly or monthly interval."
)
continue
# Convert data to specified interval and redistribute to daily
data_df = convert_and_fill_dates(
data_df, api_start_date, api_end_date, interval
)
# Add the processed DataFrame to the list
all_data_dfs.append(data_df)
else:
# Warn the user if the 'Date' column doesn't meet the format requirements
st.warning(
f"File Name: {file_name} ➜ Please upload data with Date column in 'DD-MM-YYYY' format and at least one media/exogenous column."
)
# Sequentially merge each of the other DataFrames with the main DataFrame on 'Date'
for df in all_data_dfs:
main_df = pd.merge(main_df, df, on="Date", how="left")
# Function to calculate missing stats and prepare for editable DataFrame
def prepare_missing_stats_df(df):
missing_stats = []
for column in df.columns:
if (
column == "Date" or column == "Total Approved Accounts - Revenue"
): # Skip Date and Revenue column
continue
missing = df[column].isnull().sum()
pct_missing = round((missing / len(df)) * 100, 2)
missing_stats.append(
{
"Column": column,
"Missing Values": missing,
"Missing Percentage": pct_missing,
"Impute Method": "Fill with 0", # Default value
"Category": "Media", # Default value
}
)
stats_df = pd.DataFrame(missing_stats)
return stats_df
# Prepare missing stats DataFrame for editing
missing_stats_df = prepare_missing_stats_df(main_df)
# Create an editable DataFrame in Streamlit
st.markdown("#### Select Variables Category & Impute Missing Values")
edited_stats_df = st.data_editor(
missing_stats_df,
column_config={
"Impute Method": st.column_config.SelectboxColumn(
options=[
"Drop Column",
"Fill with Mean",
"Fill with Median",
"Fill with 0",
],
required=True,
default="Fill with 0",
),
"Category": st.column_config.SelectboxColumn(
options=[
"Date",
"Media",
"Exogenous",
"Internal",
"DMA/Panel",
"Response_Metric"
],
required=True,
default="Media",
),
},
disabled=["Column", "Missing Values", "Missing Percentage"],
hide_index=True,
use_container_width=True,
)
# Apply changes based on edited DataFrame
for i, row in edited_stats_df.iterrows():
column = row["Column"]
if row["Impute Method"] == "Drop Column":
main_df.drop(columns=[column], inplace=True)
elif row["Impute Method"] == "Fill with Mean":
main_df[column].fillna(main_df[column].mean(), inplace=True)
elif row["Impute Method"] == "Fill with Median":
main_df[column].fillna(main_df[column].median(), inplace=True)
elif row["Impute Method"] == "Fill with 0":
main_df[column].fillna(0, inplace=True)
# Convert the Final DataFrame to required granularity
main_df = convert_to_higher_granularity(main_df, api_granularity)
# Display the Final DataFrame and exogenous variables
st.markdown("#### Final DataFrame:")
st.dataframe(main_df)
# Initialize an empty dictionary to hold categories and their variables
category_dict = {}
# Iterate over each row in the edited DataFrame to populate the dictionary
for i, row in edited_stats_df.iterrows():
column = row["Column"]
category = row["Category"] # The category chosen by the user for this variable
# Check if the category already exists in the dictionary
if category not in category_dict:
# If not, initialize it with the current column as its first element
category_dict[category] = [column]
else:
# If it exists, append the current column to the list of variables under this category
category_dict[category].append(column)
# Display the dictionary
st.markdown("#### Variable Category:")
for category, variables in category_dict.items():
# Check if there are multiple variables to handle "and" insertion correctly
if len(variables) > 1:
# Join all but the last variable with ", ", then add " and " before the last variable
variables_str = ", ".join(variables[:-1]) + " and " + variables[-1]
else:
# If there's only one variable, no need for "and"
variables_str = variables[0]
# Display the category and its variables in the desired format
st.markdown(f"**{category}:** {variables_str}\n\n", unsafe_allow_html=True)
# storing maindf and categories in session_state
# st.write(main_df)
# st.session_state['Cleaned_data']=main_df
# st.session_state['category_dict']=category_dict
if st.button('Save Changes'):
with open("Pickle_files/main_df", 'wb') as f:
pickle.dump(main_df, f)
with open("Pickle_files/category_dict",'wb') as c:
pickle.dump(category_dict,c)
st.success('Changes Saved!')