RFI / Home.py
Manoj
latest
fde220d
import sqlite3
import uuid
import streamlit as st
from utilities import (
load_local_css,
set_header,
)
import os
import datetime
import shutil
import pandas as pd
import pickle
from pathlib import Path
import re
st.set_page_config(layout="wide")
load_local_css("styles.css")
set_header()
# Define the path to the database file
database_file = r"DB/User.db"
# Establish a connection to the SQLite database specified by database_file
conn = sqlite3.connect(database_file, check_same_thread=False)
# Create a cursor object using the connection
c = conn.cursor()
def get_excel_files(directory="API_data"):
excel_files = {}
for filename in os.listdir(directory):
if filename.endswith(".xlsx"):
file_path = os.path.join(directory, filename)
file_key = os.path.splitext(filename)[0] # Remove the .xlsx extension
excel_files[file_key] = file_path
return excel_files
def update_summary_df():
# print("[DEBUG]: Running update_summary_df")
"""Function to fetch the project details everytime user
changes the slection box this function is being called on change in
username select box"""
# Execute a SQL query to select distinct project names, the last edited page,
# and the last updated time from the 'sessions' table where the owner matches the user's name
c.execute(
"""
SELECT project_name, last_edited_page, updated_time as last_updated
FROM (
SELECT project_name, last_edited_page, updated_time
FROM sessions
WHERE owner=?
ORDER BY updated_time DESC
) sub
GROUP BY project_name
""",
(st.session_state["username"],),
)
# Fetch all the results of the query
project_summary = c.fetchall()
# This will hold the user's owned sessions
# Create a DataFrame from the fetched data with specified column names
project_summary_df = pd.DataFrame(
project_summary,
columns=["Project Name", "Last Page Edited", "Modified Date"],
)
# Convert the 'Modified Date' column to datetime format
project_summary_df["Modified Date"] = project_summary_df["Modified Date"].map(
lambda x: pd.to_datetime(x).date()
)
# Sort the DataFrame by 'Modified Date' in descending order
session_summary_df = project_summary_df.sort_values(
by=["Modified Date"], ascending=False
)
session_summary_df["Last Page Modified"] = session_summary_df[
"Last Page Edited"
].map(lambda x: re.sub(r"[_1-9]", " ", x).replace(".py", ""))
# Save the resulting DataFrame to the session state
st.session_state["session_summary_df"] = session_summary_df
# Add a 'selected' column to the DataFrame and initialize it with False for all rows
if "selected" not in st.session_state.session_summary_df.columns:
st.session_state.session_summary_df["selected"] = [False] * len(
st.session_state.session_summary_df
)
# Reset the index of the DataFrame and save it back to the session state
st.session_state["session_summary_df"] = (
st.session_state["session_summary_df"].reset_index(drop=True).copy()
)
st.header("Manage Projects")
users = {
"ioannis": "Ioannis Papadopoulos",
"sharon": "Sharon Sheng",
"herman": "Herman Kwong",
"ismail": "Ismail Mohammed",
"geetha": "Geetha Krishna",
"srishti": "Srishti Verma",
"samkeet": "Samkeet Sangai",
"manoj": "Manoj P",
"loveesh": "Loveesh Bhatt",
"bhavya": "Bhavya Jayantilal Kanzariya",
"pritisha": "Pritisha Punukollu",
"ashish": "Ashish Sharma",
"swarupa": "Swarupa Parepalli",
}
if "username" not in st.session_state:
st.session_state["username"] = ""
# first_name_value = [key for key, value in users.items() if value == st.session_state['username']]
# # Extract the first key from the list if the list is not empty
# first_name_value = first_name_value[0] if first_name_value else ''
first_name = st.text_input("Enter Name").lower()
if st.button("Login"):
if first_name not in users.keys():
st.warning("Please enter a valid name")
st.stop()
name = users[first_name]
st.session_state.name = name # storing in session state
st.session_state["username"] = name
update_summary_df() # function call to fetch user saved projects
# st.success('Projects sucessfully loaded')
if len(first_name) == 0 or first_name not in users.keys():
st.stop()
# name=st.session_state['Username']
# c.execute('Delete from sessions')
# conn.commit()
# c.execute("PRAGMA table_info(users);")
# c.execute("SELECT * FROM users;")
# st.write(c.fetchall())
# Delete all existing data in the users table
# c.execute("DELETE FROM users")
# # Insert new data into the users table
# for idx, (username, full_name) in enumerate(users.items(), start=1):
# user_id = idx
# email = f"{username}@mastercard.com"
# user_type = "technical"
# c.execute(
# "INSERT INTO users (user_id, username, email, user_type) VALUES (?, ?, ?, ?)",
# (user_id, full_name, email, user_type)
# )
# # Commit the changes and close the connection
# conn.commit()
if st.session_state["username"] in users.values():
if "session_summary_df" not in st.session_state:
st.session_state.session_summary_df = pd.DataFrame()
if "project_name" not in st.session_state:
st.session_state["project_name"] = None
cols1 = st.columns([2, 1])
with cols1[0]:
st.markdown(f"**Welcome {st.session_state['username']}**")
with cols1[1]:
st.markdown(f"**Current Project: {st.session_state['project_name']}**")
# Execute a SQL query to select all project names from the 'sessions' table
# where the owner matches the user's name
c.execute(
"SELECT project_name FROM sessions WHERE owner=?",
(st.session_state["username"],),
)
# Fetch all the results and create a list of project names
user_projects = [project[0] for project in c.fetchall()]
c.execute("SELECT DISTINCT username FROM users")
# Fetch all the results and create a list of usernames excluding the current user's name
allowed_users_db = [
user[0] for user in c.fetchall() if user[0] != st.session_state["username"]
]
page_name = "Home Page"
# Execute a SQL query to select the email, user_id, and user_type from the 'users' table
# where the username matches the current user's name
c.execute(
"SELECT email, user_id, user_type FROM users WHERE username = ?",
(st.session_state["username"],),
)
# Fetch the result of the query (assume there is only one matching row)
user_data = c.fetchone()
# Unpack the fetched data into corresponding variables
email, user_id, user_type = user_data
folder_path = r"Users"
user_folder_path = os.path.join(folder_path, email)
if not os.path.exists(user_folder_path):
os.makedirs(user_folder_path)
def dump_session_details_db(allowed_users, project_name):
"Function to dump details of project in db when a project is created/modified/cloned"
created_time = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M"
) # Get the current time
session_id = str(uuid.uuid4()) # Generate a unique session ID
if len(allowed_users) == 0:
# Insert a new session into the database with no allowed users
c.execute(
"INSERT INTO sessions VALUES (?, ?, ?, ?, ?, ?, ?,?)",
(
user_id,
st.session_state["username"],
session_id,
project_name,
page_name,
created_time,
created_time,
None,
),
)
conn.commit() # Commit the transaction
else:
# Insert new sessions for each allowed user
for allowed_user in allowed_users:
c.execute(
"INSERT INTO sessions VALUES (?, ?, ?, ?, ?, ?, ?,?)",
(
user_id,
st.session_state["username"],
session_id,
project_name,
"1_Home.py",
created_time,
created_time,
allowed_user,
),
)
conn.commit() # Commit the transaction
st.markdown(
"""
* **Delete Project:** If you wish to delete a project, select it and click 'Delete Project'.
* **Modify User Access:** Make changes to user access permissions as needed.
push
"""
)
session_col = st.columns([5, 5])
# data editor
if "selected_row_index" not in st.session_state:
st.session_state["selected_row_index"] = None
def selection_change():
# Get the edited rows from the session state
# print(st.session_state['session_summary_df'])
edited_rows: dict = st.session_state["project_selection"]["edited_rows"]
# print(edited_rows)
# Set the selected row index in the session state
st.session_state["selected_row_index"] = next(iter(edited_rows))
# # Set all 'selected' flags to False in the DataFrame
# st.session_state["session_summary_df"] = st.session_state[
# "session_summary_df"
# ].assign(selected=False)
# Create a dictionary to update the DataFrame
update_dict = {idx: values for idx, values in edited_rows.items()}
# Update the DataFrame with the edited rows
st.session_state["session_summary_df"].update(
pd.DataFrame.from_dict(update_dict, orient="index")
)
# Reset the DataFrame index
st.session_state["session_summary_df"] = st.session_state[
"session_summary_df"
].reset_index(drop=True)
st.markdown("Select Project")
if len(st.session_state["session_summary_df"]) != 0:
with st.container():
# Display an editable data table using Streamlit's data editor component
table = st.data_editor(
st.session_state["session_summary_df"]
.drop(["Last Page Edited"], axis=1)
.reindex(
columns=[
"selected",
"Project Name",
"Last Page Modified",
"Modified Date",
]
),
hide_index=True,
on_change=selection_change, # Function to call when data is edited
key="project_selection", # Key for the data editor component in the session state
use_container_width=False,
)
if (
len(st.session_state["session_summary_df"]) > 0
and st.session_state["selected_row_index"] is not None
):
selected_row_index = st.session_state["session_summary_df"]["selected"]
# st.write(st.session_state['selected_row_index'])
if len(selected_row_index) != 0:
try:
project_name = st.session_state["session_summary_df"].at[
st.session_state["selected_row_index"], "Project Name"
]
except Exception as e:
st.session_state["selected_row_index"] = None
st.rerun()
last_edited_page = st.session_state["session_summary_df"].at[
st.session_state["selected_row_index"], "Last Page Edited"
]
st.session_state["project_name"] = project_name
project_col = st.columns(2)
with project_col[0]:
if st.button("Load Project", use_container_width=True):
st.session_state["project_name"] = project_name
st.rerun()
project_path = os.path.join(user_folder_path, project_name)
st.session_state["project_path"] = project_path # load project dct
project_dct_path = os.path.join(project_path, "project_dct.pkl")
with open(project_dct_path, "rb") as f:
try:
st.session_state["project_dct"] = pickle.load(f)
st.success("Project Loded")
except Exception as e:
st.warning(
"Something went wrong unable to load saved details / data is lost due to app refresh. Please uncheck the check box and create a new project."
)
st.stop()
with project_col[1]:
if st.button(
f"Delete Project - **{project_name}**", use_container_width=True
):
project_name_to_delete = project_name
st.warning(
f"{project_name_to_delete} will be deleted permanentaly and all the information regarding the project will be lost"
)
try:
c.execute(
"Delete FROM sessions WHERE project_name =? AND owner =?",
(
project_name_to_delete,
st.session_state["name"],
),
)
if os.path.exists(project_path):
shutil.rmtree(project_path)
conn.commit()
update_summary_df()
st.rerun()
except:
st.warning(
"Failed to Delete project try refreshing the page or try after some time"
)
st.stop()
with st.expander("Add users with access to the selected project"):
c.execute(
"SELECT DISTINCT allowed_users FROM sessions WHERE project_name = ?",
(project_name,),
)
present_users = c.fetchall()
present_users = [
user[0]
for user in present_users
if user[0] != st.session_state["username"]
and user[0] is not None
]
present_users = None if len(present_users) == 0 else present_users
if present_users is not None:
allowed_users = st.multiselect(
"",
list(set(allowed_users_db) - set(present_users)),
)
else:
allowed_users = st.multiselect(
"",
list(set(allowed_users_db)),
)
if st.button("Save Changes", use_container_width=True):
dump_session_details_db(allowed_users, project_name)
c.execute("SELECT * from sessions")
with st.expander("Create New Project"):
st.markdown(
"To create a new project, Enter Project name below, select user who you want to give access of this project and click **Create New Project**"
)
project_col1 = st.columns(3)
with project_col1[0]:
project_name = st.text_input("Enter Project Name", key="project_name_box")
if project_name in user_projects:
st.warning("Project already exists please enter new name")
with project_col1[1]:
allowed_users = st.multiselect(
"Select Users who can access to this Project", allowed_users_db
)
allowed_users = list(allowed_users)
with project_col1[2]:
API_path_dict = get_excel_files()
api_name = st.selectbox("Select API data", API_path_dict.keys(), index=0)
api_path = API_path_dict[api_name]
Create = st.button("Create New Project", use_container_width=True)
if Create:
if len(project_name) == 0:
st.error("Plase enter a valid project name")
st.stop()
if project_name in user_projects:
st.warning("Project already exists please enter new name")
st.stop()
project_path = os.path.join(user_folder_path, project_name)
if not os.path.exists(project_path):
os.makedirs(project_path)
else:
st.warning("Project already exists please enter new name")
st.stop()
dump_session_details_db(allowed_users, project_name)
project_dct = {
"data_import": {
"granularity_selection": 0,
"cat_dct": {},
"merged_df": None,
"edited_df": None,
"numeric_columns": None,
"files_dict": None,
"formatted_panel1_values": None,
"formatted_panel2_values": None,
"missing_stats_df": None,
"edited_stats_df": None,
"default_df": None,
"final_df": None,
"edited_df": None,
"api_path": api_path,
"api_name": api_name,
},
"data_validation": {
"target_column": 0,
"selected_panels": None,
"selected_feature": 0,
"validated_variables": [],
"Non_media_variables": 0,
},
"transformations": {"Media": {}, "Exogenous": {}},
"model_build": {
"sel_target_col": None,
"all_iters_check": False,
"iterations": 0,
"build_button": False,
"show_results_check": False,
"session_state_saved": {},
},
"model_tuning": {
"sel_target_col": None,
"sel_model": {},
"flag_expander": False,
"start_date_default": None,
"end_date_default": None,
"repeat_default": "No",
"flags": {},
"select_all_flags_check": {},
"selected_flags": {},
"trend_check": False,
"week_num_check": False,
"sine_cosine_check": False,
"session_state_saved": {},
},
"saved_model_results": {
"selected_options": None,
"model_grid_sel": [1],
},
"model_result_overview": {},
"build_response_curves": {
"response_metrics_selectbox": 0,
"panel_selected_selectbox": 0,
"selected_channel_name_selectbox": 0,
"K_number_input": "default",
"b_number_input": "default",
"a_number_input": "default",
"x0_number_input": "default",
},
"scenario_planner": {
"panel_selected": 0,
"metrics_selected": 0,
"scenario": None,
"optimization_key_value": None,
"total_spends_change": None,
"optimze_all_channels": False,
},
"saved_scenarios": {
"selected_scenario_selectbox_key": 0,
},
"optimized_result_analysis": {
"selected_scenario_selectbox_visualize": 0,
"metric_selectbox_visualize": 0,
},
}
st.session_state["project_dct"] = project_dct
st.session_state["project_path"] = project_path
st.session_state["project_name"] = project_name
project_dct_path = os.path.join(project_path, "project_dct.pkl")
with open(project_dct_path, "wb") as f:
pickle.dump(project_dct, f)
st.success("Project Created")
update_summary_df()
st.rerun()
# st.header('Clone Project')
with st.expander("**Clone saved projects**"):
c.execute(
"SELECT DISTINCT owner FROM sessions WHERE allowed_users=?",
(st.session_state["username"],),
) # owner
owners = c.fetchall()
owners = [
owner[0] for owner in owners if owner[0] != st.session_state["username"]
]
if len(owners) == 0:
st.warning("You dont have any shared project yet!")
st.stop()
cols = st.columns(2)
with cols[0]:
owner = st.selectbox("Select Owner", owners)
c.execute("SELECT email FROM users WHERE username=?", (owner,))
owner_email = c.fetchone()[0]
owner_folder_path = os.path.join(folder_path, owner_email)
with cols[1]:
c.execute(
"SELECT project_name FROM sessions WHERE owner=? AND allowed_users = ?",
(owner, st.session_state["username"]),
) # available sessions for user
project_names = c.fetchall()
project_name_owner = st.selectbox(
"Select a saved Project available for you",
[project_name[0] for project_name in project_names],
)
owner_project_path = os.path.join(owner_folder_path, project_name)
project_name_user = st.text_input(
"Enter Project Name", value=project_name_owner
)
if project_name in user_projects:
st.warning(
"This Project name already exists in your directory Please enter a different name"
)
project_path = os.path.join(user_folder_path, project_name_user)
owner_project_path = os.path.join(owner_folder_path, project_name_owner)
if st.button("Load Project", use_container_width=True, key="load_project_button_key"):
if os.path.exists(project_path):
st.warning(
"This Project name already exists in your directory Please enter a different name"
)
st.stop()
shutil.copytree(owner_project_path, project_path)
project_dct_path = os.path.join(project_path, "project_dct.pkl")
with open(project_dct_path, "rb") as f:
st.session_state["project_dct"] = pickle.load(f)
st.session_state["project_path"] = project_path
dump_session_details_db(
[], project_name_user
) # passing empty list for allowed users
st.success("Project Cloned")
st.rerun()