Spaces:
Sleeping
Sleeping
# importing pacakages | |
import streamlit as st | |
st.set_page_config(layout="wide") | |
from utilities import ( | |
load_local_css, | |
set_header, | |
ensure_project_dct_structure, | |
store_hashed_password, | |
verify_password, | |
is_pswrd_flag_set, | |
set_pswrd_flag, | |
) | |
import os | |
from datetime import datetime | |
import pandas as pd | |
import pickle | |
import psycopg2 | |
# | |
import numbers | |
from collections import OrderedDict | |
import re | |
from ppt_utils import create_ppt | |
from constants import default_dct | |
import time | |
from log_application import log_message, delete_old_log_files | |
import sqlite3 | |
# setting page config | |
load_local_css("styles.css") | |
set_header() | |
db_cred = None | |
# --------------Functions----------------------# | |
# # schema = db_cred["schema"] | |
##API DATA####################### | |
# Function to load gold layer data | |
# @st.cache_data(show_spinner=False) | |
def load_gold_layer_data(table_name): | |
# Fetch Table | |
query = f""" | |
SELECT * FROM {table_name}; | |
""" | |
# Execute the query and get the results | |
results = query_excecuter_postgres( | |
query, db_cred, insert=False, return_dataframe=True | |
) | |
if results is not None and not results.empty: | |
# Create a DataFrame | |
gold_layer_df = results | |
else: | |
st.warning("No data found for the selected table.") | |
st.stop() | |
# Columns to be removed | |
columns_to_remove = [ | |
"clnt_nam", | |
"crte_dt_tm", | |
"crte_by_uid", | |
"updt_dt_tm", | |
"updt_by_uid", | |
"campgn_id", | |
"campgn_nam", | |
"ad_id", | |
"ad_nam", | |
"tctc_id", | |
"tctc_nam", | |
"campgn_grp_id", | |
"campgn_grp_nam", | |
"ad_grp_id", | |
"ad_grp_nam", | |
] | |
# TEMP CODE | |
gold_layer_df = gold_layer_df.rename( | |
columns={ | |
"imprssns_cnt": "mda_imprssns_cnt", | |
"clcks_cnt": "mda_clcks_cnt", | |
"vd_vws_cnt": "mda_vd_vws_cnt", | |
} | |
) | |
# Remove specific columns | |
gold_layer_df = gold_layer_df.drop(columns=columns_to_remove, errors="ignore") | |
# Convert columns to numeric or datetime as appropriate | |
for col in gold_layer_df.columns: | |
if ( | |
col.startswith("rspns_mtrc_") | |
or col.startswith("mda_") | |
or col.startswith("exogenous_") | |
or col.startswith("internal_") | |
or col in ["spnd_amt"] | |
): | |
gold_layer_df[col] = pd.to_numeric(gold_layer_df[col], errors="coerce") | |
elif col == "rcrd_dt": | |
gold_layer_df[col] = pd.to_datetime(gold_layer_df[col], errors="coerce") | |
# Replace columns starting with 'mda_' to 'media_' | |
gold_layer_df.columns = [ | |
(col.replace("mda_", "media_") if col.startswith("mda_") else col) | |
for col in gold_layer_df.columns | |
] | |
# Identify non-numeric columns | |
non_numeric_columns = gold_layer_df.select_dtypes(exclude=["number"]).columns | |
allow_non_numeric_columns = ["rcrd_dt", "aggrgtn_lvl", "sub_chnnl_nam", "panl_nam"] | |
# Remove non-numeric columns except for allowed non-numeric columns | |
non_numeric_columns_to_remove = [ | |
col for col in non_numeric_columns if col not in allow_non_numeric_columns | |
] | |
gold_layer_df = gold_layer_df.drop( | |
columns=non_numeric_columns_to_remove, errors="ignore" | |
) | |
# Remove specific columns | |
allow_columns = ["rcrd_dt", "aggrgtn_lvl", "sub_chnnl_nam", "panl_nam", "spnd_amt"] | |
for col in gold_layer_df.columns: | |
if ( | |
col.startswith("rspns_mtrc_") | |
or col.startswith("media_") | |
or col.startswith("exogenous_") | |
or col.startswith("internal_") | |
): | |
allow_columns.append(col) | |
gold_layer_df = gold_layer_df[allow_columns] | |
# Rename columns | |
gold_layer_df = gold_layer_df.rename( | |
columns={ | |
"rcrd_dt": "date", | |
"sub_chnnl_nam": "channels", | |
"panl_nam": "panel", | |
"spnd_amt": "spends", | |
} | |
) | |
# Clean column values | |
gold_layer_df["panel"] = ( | |
gold_layer_df["panel"].astype(str).str.lower().str.strip().str.replace(" ", "_") | |
) | |
gold_layer_df["channels"] = ( | |
gold_layer_df["channels"] | |
.astype(str) | |
.str.lower() | |
.str.strip() | |
.str.replace(" ", "_") | |
) | |
# Replace columns starting with 'rspns_mtrc_' to 'response_metric_' | |
gold_layer_df.columns = [ | |
( | |
col.replace("rspns_mtrc_", "response_metric_") | |
if col.startswith("rspns_mtrc_") | |
else col | |
) | |
for col in gold_layer_df.columns | |
] | |
# Get the minimum date from the main dataframe | |
min_date = gold_layer_df["date"].min() | |
# Get maximum dates for daily and weekly data | |
max_date_daily = None | |
max_date_weekly = None | |
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "daily"].empty: | |
max_date_daily = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "daily"][ | |
"date" | |
].max() | |
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].empty: | |
max_date_weekly = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"][ | |
"date" | |
].max() + pd.DateOffset(days=6) | |
# Determine final maximum date | |
if max_date_daily is not None and max_date_weekly is not None: | |
final_max_date = max(max_date_daily, max_date_weekly) | |
elif max_date_daily is not None: | |
final_max_date = max_date_daily | |
elif max_date_weekly is not None: | |
final_max_date = max_date_weekly | |
# Create a date range with daily frequency | |
date_range = pd.date_range(start=min_date, end=final_max_date, freq="D") | |
# Create a base DataFrame with all channels and all panels for each channel | |
unique_channels = gold_layer_df["channels"].unique() | |
unique_panels = gold_layer_df["panel"].unique() | |
base_data = [ | |
(channel, panel, date) | |
for channel in unique_channels | |
for panel in unique_panels | |
for date in date_range | |
] | |
base_df = pd.DataFrame(base_data, columns=["channels", "panel", "date"]) | |
# Process weekly data to convert it to daily | |
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].empty: | |
weekly_data = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].copy() | |
daily_data = [] | |
for index, row in weekly_data.iterrows(): | |
week_start = pd.to_datetime(row["date"]) - pd.to_timedelta( | |
pd.to_datetime(row["date"]).weekday(), unit="D" | |
) | |
for i in range(7): | |
daily_date = week_start + pd.DateOffset(days=i) | |
new_row = row.copy() | |
new_row["date"] = daily_date | |
for col in new_row.index: | |
if isinstance(new_row[col], numbers.Number): | |
new_row[col] = new_row[col] / 7 | |
daily_data.append(new_row) | |
daily_data_df = pd.DataFrame(daily_data) | |
daily_data_df["aggrgtn_lvl"] = "daily" | |
gold_layer_df = pd.concat( | |
[gold_layer_df[gold_layer_df["aggrgtn_lvl"] != "weekly"], daily_data_df], | |
ignore_index=True, | |
) | |
# Process monthly data to convert it to daily | |
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "monthly"].empty: | |
monthly_data = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "monthly"].copy() | |
daily_data = [] | |
for index, row in monthly_data.iterrows(): | |
month_start = pd.to_datetime(row["date"]).replace(day=1) | |
next_month_start = (month_start + pd.DateOffset(months=1)).replace(day=1) | |
days_in_month = (next_month_start - month_start).days | |
for i in range(days_in_month): | |
daily_date = month_start + pd.DateOffset(days=i) | |
new_row = row.copy() | |
new_row["date"] = daily_date | |
for col in new_row.index: | |
if isinstance(new_row[col], numbers.Number): | |
new_row[col] = new_row[col] / days_in_month | |
daily_data.append(new_row) | |
daily_data_df = pd.DataFrame(daily_data) | |
daily_data_df["aggrgtn_lvl"] = "daily" | |
gold_layer_df = pd.concat( | |
[gold_layer_df[gold_layer_df["aggrgtn_lvl"] != "monthly"], daily_data_df], | |
ignore_index=True, | |
) | |
# Remove aggrgtn_lvl column | |
gold_layer_df = gold_layer_df.drop(columns=["aggrgtn_lvl"], errors="ignore") | |
# Group by 'panel', and 'date' | |
gold_layer_df = gold_layer_df.groupby(["channels", "panel", "date"]).sum() | |
# Merge gold_layer_df to base_df on channels, panel and date | |
gold_layer_df_cleaned = pd.merge( | |
base_df, gold_layer_df, on=["channels", "panel", "date"], how="left" | |
) | |
# Pivot the dataframe and rename columns | |
pivot_columns = [ | |
col | |
for col in gold_layer_df_cleaned.columns | |
if col not in ["channels", "panel", "date"] | |
] | |
gold_layer_df_cleaned = gold_layer_df_cleaned.pivot_table( | |
index=["date", "panel"], columns="channels", values=pivot_columns, aggfunc="sum" | |
).reset_index() | |
# Flatten the columns | |
gold_layer_df_cleaned.columns = [ | |
"_".join(col).strip() if col[1] else col[0] | |
for col in gold_layer_df_cleaned.columns.values | |
] | |
# Replace columns ending with '_all' to '_total' | |
gold_layer_df_cleaned.columns = [ | |
col.replace("_all", "_total") if col.endswith("_all") else col | |
for col in gold_layer_df_cleaned.columns | |
] | |
# Clean panel column values | |
gold_layer_df_cleaned["panel"] = ( | |
gold_layer_df_cleaned["panel"] | |
.astype(str) | |
.str.lower() | |
.str.strip() | |
.str.replace(" ", "_") | |
) | |
# Drop all columns that end with '_total' except those starting with 'response_metric_' | |
cols_to_drop = [ | |
col | |
for col in gold_layer_df_cleaned.columns | |
if col.endswith("_total") and not col.startswith("response_metric_") | |
] | |
gold_layer_df_cleaned.drop(columns=cols_to_drop, inplace=True) | |
return gold_layer_df_cleaned | |
def check_valid_name(): | |
if ( | |
not st.session_state["project_name_box"] | |
.lower() | |
.startswith(defualt_project_prefix) | |
): | |
st.session_state["disable_create_project"] = True | |
with warning_box: | |
st.warning("Project Name should follow naming conventions") | |
st.session_state["warning"] = ( | |
"Project Name should follow naming conventions!" | |
) | |
with warning_box: | |
st.warning("Project Name should follow naming conventions") | |
st.button("Reset Name", on_click=reset_project_text_box, key="2") | |
if st.session_state["project_name_box"] == defualt_project_prefix: | |
with warning_box: | |
st.warning("Cannot Name only with Prefix") | |
st.session_state["warning"] = "Cannot Name only with Prefix" | |
st.session_state["disable_create_project"] = True | |
if st.session_state["project_name_box"] in user_projects: | |
with warning_box: | |
st.warning("Project already exists please enter new name") | |
st.session_state["warning"] = "Project already exists please enter new name" | |
st.session_state["disable_create_project"] = True | |
else: | |
st.session_state["disable_create_project"] = False | |
def query_excecuter_postgres( | |
query, | |
db_path=None, | |
params=None, | |
insert=True, | |
insert_retrieve=False, | |
db_cred=None, | |
): | |
""" | |
Executes a SQL query on a SQLite database, handling both insert and select operations. | |
Parameters: | |
query (str): The SQL query to be executed. | |
db_path (str): Path to the SQLite database file. | |
params (tuple, optional): Parameters to pass into the SQL query for parameterized execution. | |
insert (bool, default=True): Flag to determine if the query is an insert operation (default) or a select operation. | |
insert_retrieve (bool, default=False): Flag to determine if the query should insert and then return the inserted ID. | |
""" | |
try: | |
# Construct a cross-platform path to the database | |
db_dir = os.path.join("db") | |
os.makedirs(db_dir, exist_ok=True) # Make sure the directory exists | |
db_path = os.path.join(db_dir, "imp_db.db") | |
# Establish connection to the SQLite database | |
conn = sqlite3.connect(db_path) | |
except sqlite3.Error as e: | |
st.warning(f"Unable to connect to the SQLite database: {e}") | |
st.stop() | |
# Create a cursor object to interact with the database | |
c = conn.cursor() | |
try: | |
# Execute the query with or without parameters | |
if params: | |
params = tuple(params) | |
query = query.replace("IN (?)", f"IN ({','.join(['?' for _ in params])})") | |
c.execute(query, params) | |
else: | |
c.execute(query) | |
if not insert: | |
# If not an insert operation, fetch and return the results | |
results = c.fetchall() | |
return results | |
elif insert_retrieve: | |
# If insert and retrieve operation, commit and return the last inserted row ID | |
conn.commit() | |
return c.lastrowid | |
else: | |
# For standard insert operations, commit the transaction | |
conn.commit() | |
except Exception as e: | |
st.write(f"Error executing query: {e}") | |
finally: | |
conn.close() | |
# Function to check if the input contains any SQL keywords | |
def contains_sql_keywords_check(user_input): | |
sql_keywords = [ | |
"SELECT", | |
"INSERT", | |
"UPDATE", | |
"DELETE", | |
"DROP", | |
"ALTER", | |
"CREATE", | |
"GRANT", | |
"REVOKE", | |
"UNION", | |
"JOIN", | |
"WHERE", | |
"HAVING", | |
"EXEC", | |
"TRUNCATE", | |
"REPLACE", | |
"MERGE", | |
"DECLARE", | |
"SHOW", | |
"FROM", | |
] | |
pattern = "|".join(re.escape(keyword) for keyword in sql_keywords) | |
return re.search(pattern, user_input, re.IGNORECASE) | |
# def get_table_names(schema): | |
# query = f""" | |
# SELECT table_name | |
# FROM information_schema.tables | |
# WHERE table_schema = '{schema}' | |
# AND table_type = 'BASE TABLE' | |
# AND table_name LIKE '%_mmo_gold'; | |
# """ | |
# table_names = query_excecuter_postgres(query, db_cred, insert=False) | |
# table_names = [table[0] for table in table_names] | |
# return table_names | |
def update_summary_df(): | |
""" | |
Updates the 'project_summary_df' in the session state with the latest project | |
summary information based on the most recent updates. | |
This function executes a SQL query to retrieve project metadata from a database | |
and stores the result in the session state. | |
Uses: | |
- query_excecuter_postgres(query, params=params, insert=False): A function that | |
executes the provided SQL query on a PostgreSQL database. | |
Modifies: | |
- st.session_state['project_summary_df']: Updates the dataframe with columns: | |
'Project Number', 'Project Name', 'Last Modified Page', 'Last Modified Time'. | |
""" | |
query = f""" | |
WITH LatestUpdates AS ( | |
SELECT | |
prj_id, | |
page_nam, | |
updt_dt_tm, | |
ROW_NUMBER() OVER (PARTITION BY prj_id ORDER BY updt_dt_tm DESC) AS rn | |
FROM | |
mmo_project_meta_data | |
) | |
SELECT | |
p.prj_id, | |
p.prj_nam AS prj_nam, | |
lu.page_nam, | |
lu.updt_dt_tm | |
FROM | |
LatestUpdates lu | |
RIGHT JOIN | |
mmo_projects p ON lu.prj_id = p.prj_id | |
WHERE | |
p.prj_ownr_id = ? AND lu.rn = 1 | |
""" | |
params = (st.session_state["emp_id"],) # Parameters for the SQL query | |
# Execute the query and retrieve project summary data | |
project_summary = query_excecuter_postgres( | |
query, db_cred, params=params, insert=False | |
) | |
# Update the session state with the project summary dataframe | |
st.session_state["project_summary_df"] = pd.DataFrame( | |
project_summary, | |
columns=[ | |
"Project Number", | |
"Project Name", | |
"Last Modified Page", | |
"Last Modified Time", | |
], | |
) | |
st.session_state["project_summary_df"] = st.session_state[ | |
"project_summary_df" | |
].sort_values(by=["Last Modified Time"], ascending=False) | |
def reset_project_text_box(): | |
st.session_state["project_name_box"] = defualt_project_prefix | |
st.session_state["disable_create_project"] = True | |
def query_excecuter_sqlite( | |
insert_projects_query, | |
insert_meta_data_query, | |
db_path=None, | |
params_projects=None, | |
params_meta=None, | |
): | |
""" | |
Executes the project insert and associated metadata insert in an SQLite database. | |
Parameters: | |
insert_projects_query (str): SQL query for inserting into the mmo_projects table. | |
insert_meta_data_query (str): SQL query for inserting into the mmo_project_meta_data table. | |
db_path (str): Path to the SQLite database file. | |
params_projects (tuple, optional): Parameters for the mmo_projects table insert. | |
params_meta (tuple, optional): Parameters for the mmo_project_meta_data table insert. | |
Returns: | |
bool: True if successful, False otherwise. | |
""" | |
try: | |
# Construct a cross-platform path to the database | |
db_dir = os.path.join("db") | |
os.makedirs(db_dir, exist_ok=True) # Make sure the directory exists | |
db_path = os.path.join(db_dir, "imp_db.db") | |
# Establish connection to the SQLite database | |
conn = sqlite3.connect(db_path) | |
cursor = conn.cursor() | |
# Execute the first insert query into the mmo_projects table | |
cursor.execute(insert_projects_query, params_projects) | |
# Get the last inserted project ID | |
prj_id = cursor.lastrowid | |
# Modify the parameters for the metadata table with the inserted prj_id | |
params_meta = (prj_id,) + params_meta | |
# Execute the second insert query into the mmo_project_meta_data table | |
cursor.execute(insert_meta_data_query, params_meta) | |
# Commit the transaction | |
conn.commit() | |
except sqlite3.Error as e: | |
st.warning(f"Error executing query: {e}") | |
return False | |
finally: | |
# Close the connection | |
conn.close() | |
return True | |
def new_project(): | |
""" | |
Cleans the project name input and inserts project data into the SQLite database, | |
updating session state and triggering UI rerun if successful. | |
""" | |
# Define a dictionary containing project data | |
project_dct = default_dct.copy() | |
gold_layer_df = pd.DataFrame() | |
if str(api_name).strip().lower() != "na": | |
try: | |
gold_layer_df = load_gold_layer_data(api_name) | |
except Exception as e: | |
st.toast( | |
"Failed to load gold layer data. Please check the gold layer structure and connection.", | |
icon="⚠️", | |
) | |
log_message( | |
"error", | |
f"Error loading gold layer data: {str(e)}", | |
"Home", | |
) | |
project_dct["data_import"]["gold_layer_df"] = gold_layer_df | |
# Get current time for database insertion | |
inserted_time = datetime.now().isoformat() | |
# Define SQL queries for inserting project and metadata into the SQLite database | |
insert_projects_query = """ | |
INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_dt_tm, crte_by_uid) | |
VALUES (?, ?, ?, ?, ?); | |
""" | |
insert_meta_data_query = """ | |
INSERT INTO mmo_project_meta_data (prj_id, page_nam, file_nam, pkl_obj, crte_dt_tm, crte_by_uid, updt_dt_tm) | |
VALUES (?, ?, ?, ?, ?, ?, ?); | |
""" | |
# Get current time for metadata update | |
updt_dt_tm = datetime.now().isoformat() | |
# Serialize project_dct using pickle | |
project_pkl = pickle.dumps(project_dct) | |
# Prepare data for database insertion | |
projects_data = ( | |
st.session_state["emp_id"], # prj_ownr_id | |
project_name, # prj_nam | |
",".join(matching_user_id), # alwd_emp_id | |
inserted_time, # crte_dt_tm | |
st.session_state["emp_id"], # crte_by_uid | |
) | |
project_meta_data = ( | |
"Home", # page_nam | |
"project_dct", # file_nam | |
project_pkl, # pkl_obj | |
inserted_time, # crte_dt_tm | |
st.session_state["emp_id"], # crte_by_uid | |
updt_dt_tm, # updt_dt_tm | |
) | |
# Execute the insertion query for SQLite | |
success = query_excecuter_sqlite( | |
insert_projects_query, | |
insert_meta_data_query, | |
params_projects=projects_data, | |
params_meta=project_meta_data, | |
) | |
if success: | |
st.success("Project Created") | |
update_summary_df() | |
else: | |
st.error("Failed to create project.") | |
def validate_password(user_input): | |
# List of SQL keywords to check for | |
sql_keywords = [ | |
"SELECT", | |
"INSERT", | |
"UPDATE", | |
"DELETE", | |
"DROP", | |
"ALTER", | |
"CREATE", | |
"GRANT", | |
"REVOKE", | |
"UNION", | |
"JOIN", | |
"WHERE", | |
"HAVING", | |
"EXEC", | |
"TRUNCATE", | |
"REPLACE", | |
"MERGE", | |
"DECLARE", | |
"SHOW", | |
"FROM", | |
] | |
# Create a regex pattern for SQL keywords | |
pattern = "|".join(re.escape(keyword) for keyword in sql_keywords) | |
# Check if input contains any SQL keywords | |
if re.search(pattern, user_input, re.IGNORECASE): | |
return "SQL keyword detected." | |
# Password validation criteria | |
if len(user_input) < 8: | |
return "Password should be at least 8 characters long." | |
if not re.search(r"[A-Z]", user_input): | |
return "Password should contain at least one uppercase letter." | |
if not re.search(r"[0-9]", user_input): | |
return "Password should contain at least one digit." | |
if not re.search(r"[a-z]", user_input): | |
return "Password should contain at least one lowercase letter." | |
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', user_input): | |
return "Password should contain at least one special character." | |
# If all checks pass | |
return "Valid input." | |
def fetch_and_process_projects(emp_id): | |
query = f""" | |
WITH ProjectAccess AS ( | |
SELECT | |
p.prj_id, | |
p.prj_nam, | |
p.alwd_emp_id, | |
u.emp_nam AS project_owner | |
FROM mmo_projects p | |
JOIN mmo_users u ON p.prj_ownr_id = u.emp_id | |
) | |
SELECT | |
pa.prj_id, | |
pa.prj_nam, | |
pa.project_owner | |
FROM | |
ProjectAccess pa | |
WHERE | |
pa.alwd_emp_id LIKE ? | |
ORDER BY | |
pa.prj_id; | |
""" | |
params = (f"%{emp_id}%",) | |
results = query_excecuter_postgres(query, db_cred, params=params, insert=False) | |
# Process the results to create the desired dictionary structure | |
clone_project_dict = {} | |
for row in results: | |
project_id, project_name, project_owner = row | |
if project_owner not in clone_project_dict: | |
clone_project_dict[project_owner] = [] | |
clone_project_dict[project_owner].append( | |
{"project_name": project_name, "project_id": project_id} | |
) | |
return clone_project_dict | |
def get_project_id_from_dict(projects_dict, owner_name, project_name): | |
if owner_name in projects_dict: | |
for project in projects_dict[owner_name]: | |
if project["project_name"] == project_name: | |
return project["project_id"] | |
return None | |
# def fetch_project_metadata(prj_id): | |
# query = f""" | |
# SELECT | |
# prj_id, page_nam, file_nam, pkl_obj, dshbrd_ts | |
# FROM | |
# mmo_project_meta_data | |
# WHERE | |
# prj_id = ?; | |
# """ | |
# params = (prj_id,) | |
# return query_excecuter_postgres(query, db_cred, params=params, insert=False) | |
def fetch_project_metadata(prj_id): | |
# Query to select project metadata | |
query = """ | |
SELECT | |
prj_id, page_nam, file_nam, pkl_obj, dshbrd_ts | |
FROM | |
mmo_project_meta_data | |
WHERE | |
prj_id = ?; | |
""" | |
params = (prj_id,) | |
return query_excecuter_postgres(query, db_cred, params=params, insert=False) | |
# def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id): | |
# query = f""" | |
# INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm) | |
# VALUES (?, ?, ?, ?, NOW()) | |
# RETURNING prj_id; | |
# """ | |
# params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id) | |
# result = query_excecuter_postgres( | |
# query, db_cred, params=params, insert=True, insert_retrieve=True | |
# ) | |
# return result[0][0] | |
# def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id): | |
# # Query to insert a new project | |
# insert_query = """ | |
# INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm) | |
# VALUES (?, ?, ?, ?, DATETIME('now')); | |
# """ | |
# params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id) | |
# # Execute the insert query | |
# query_excecuter_postgres(insert_query, db_cred, params=params, insert=True) | |
# # Retrieve the last inserted prj_id | |
# retrieve_id_query = "SELECT last_insert_rowid();" | |
# result = query_excecuter_postgres(retrieve_id_query, db_cred, insert_retrieve=True) | |
# return result[0][0] | |
def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id): | |
# Query to insert a new project | |
insert_query = """ | |
INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm) | |
VALUES (?, ?, ?, ?, DATETIME('now')); | |
""" | |
params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id) | |
# Execute the insert query and retrieve the last inserted prj_id directly | |
last_inserted_id = query_excecuter_postgres( | |
insert_query, params=params, insert_retrieve=True | |
) | |
return last_inserted_id | |
def insert_project_metadata(new_prj_id, metadata, created_emp_id): | |
# query = f""" | |
# INSERT INTO mmo_project_meta_data ( | |
# prj_id, page_nam, crte_dt_tm, file_nam, pkl_obj, dshbrd_ts, crte_by_uid | |
# ) | |
# VALUES (?, ?, NOW(), ?, ?, ?, ?); | |
# """ | |
query = """ | |
INSERT INTO mmo_project_meta_data ( | |
prj_id, page_nam, crte_dt_tm, file_nam, pkl_obj, dshbrd_ts, crte_by_uid | |
) | |
VALUES (?, ?, DATETIME('now'), ?, ?, ?, ?); | |
""" | |
for row in metadata: | |
params = (new_prj_id, row[1], row[2], row[3], row[4], created_emp_id) | |
query_excecuter_postgres(query, db_cred, params=params, insert=True) | |
# def delete_projects_by_ids(prj_ids): | |
# # Ensure prj_ids is a tuple to use with the IN clause | |
# prj_ids_tuple = tuple(prj_ids) | |
# # Query to delete project metadata | |
# delete_metadata_query = f""" | |
# DELETE FROM mmo_project_meta_data | |
# WHERE prj_id IN ?; | |
# """ | |
# delete_projects_query = f""" | |
# DELETE FROM mmo_projects | |
# WHERE prj_id IN ?; | |
# """ | |
# try: | |
# # Delete from metadata table | |
# query_excecuter_postgres( | |
# delete_metadata_query, db_cred, params=(prj_ids_tuple,), insert=True | |
# ) | |
# # Delete from projects table | |
# query_excecuter_postgres( | |
# delete_projects_query, db_cred, params=(prj_ids_tuple,), insert=True | |
# ) | |
# except Exception as e: | |
# st.write(f"Error deleting projects: {e}") | |
def delete_projects_by_ids(prj_ids): | |
# Ensure prj_ids is a tuple to use with the IN clause | |
prj_ids_tuple = tuple(prj_ids) | |
# Dynamically generate placeholders for SQLite | |
placeholders = ", ".join(["?"] * len(prj_ids_tuple)) | |
# Query to delete project metadata with dynamic placeholders | |
delete_metadata_query = f""" | |
DELETE FROM mmo_project_meta_data | |
WHERE prj_id IN ({placeholders}); | |
""" | |
delete_projects_query = f""" | |
DELETE FROM mmo_projects | |
WHERE prj_id IN ({placeholders}); | |
""" | |
try: | |
# Delete from metadata table | |
query_excecuter_postgres( | |
delete_metadata_query, db_cred, params=prj_ids_tuple, insert=True | |
) | |
# Delete from projects table | |
query_excecuter_postgres( | |
delete_projects_query, db_cred, params=prj_ids_tuple, insert=True | |
) | |
except Exception as e: | |
st.write(f"Error deleting projects: {e}") | |
def fetch_users_with_access(prj_id): | |
# Query to get allowed employee IDs for the project | |
get_allowed_emps_query = """ | |
SELECT alwd_emp_id | |
FROM mmo_projects | |
WHERE prj_id = ?; | |
""" | |
# Fetch the allowed employee IDs | |
allowed_emp_ids_result = query_excecuter_postgres( | |
get_allowed_emps_query, db_cred, params=(prj_id,), insert=False | |
) | |
if not allowed_emp_ids_result: | |
return [] | |
# Extract the allowed employee IDs (Assuming alwd_emp_id is a comma-separated string) | |
allowed_emp_ids_str = allowed_emp_ids_result[0][0] | |
allowed_emp_ids = allowed_emp_ids_str.split(",") | |
# Convert to tuple for the IN clause | |
allowed_emp_ids_tuple = tuple(allowed_emp_ids) | |
# Query to get user details for the allowed employee IDs | |
get_users_query = """ | |
SELECT emp_id, emp_nam, emp_typ | |
FROM mmo_users | |
WHERE emp_id IN ({}); | |
""".format( | |
",".join("?" * len(allowed_emp_ids_tuple)) | |
) # Dynamically construct the placeholder list | |
# Fetch user details | |
user_details = query_excecuter_postgres( | |
get_users_query, db_cred, params=allowed_emp_ids_tuple, insert=False | |
) | |
return user_details | |
# def update_project_access(prj_id, user_names, new_user_ids): | |
# # Convert the list of new user IDs to a comma-separated string | |
# new_user_ids_str = ",".join(new_user_ids) | |
# # Query to update the alwd_emp_id for the specified project | |
# update_access_query = f""" | |
# UPDATE mmo_projects | |
# SET alwd_emp_id = ? | |
# WHERE prj_id = ?; | |
# """ | |
# # Execute the update query | |
# query_excecuter_postgres( | |
# update_access_query, db_cred, params=(new_user_ids_str, prj_id), insert=True | |
# ) | |
# st.success(f"Project {prj_id} access updated successfully") | |
def fetch_user_ids_from_dict(user_dict, user_names): | |
user_ids = [] | |
# Iterate over the user_dict to find matching user names | |
for user_id, details in user_dict.items(): | |
if details[0] in user_names: | |
user_ids.append(user_id) | |
return user_ids | |
def update_project_access(prj_id, user_names, user_dict): | |
# Fetch the new user IDs based on the provided user names from the dictionary | |
new_user_ids = fetch_user_ids_from_dict(user_dict, user_names) | |
# Convert the list of new user IDs to a comma-separated string | |
new_user_ids_str = ",".join(new_user_ids) | |
# Query to update the alwd_emp_id for the specified project | |
update_access_query = f""" | |
UPDATE mmo_projects | |
SET alwd_emp_id = ? | |
WHERE prj_id = ?; | |
""" | |
# Execute the update query | |
query_excecuter_postgres( | |
update_access_query, db_cred, params=(new_user_ids_str, prj_id), insert=True | |
) | |
st.write(f"Project {prj_id} access updated successfully") | |
def validate_emp_id(): | |
if st.session_state.sign_up not in st.session_state["unique_ids"].keys(): | |
st.warning("You dont have access to the tool please contact admin") | |
# -------------------Front END-------------------------# | |
st.header("Manage Projects") | |
unique_users_query = f""" | |
SELECT DISTINCT emp_id, emp_nam, emp_typ | |
FROM mmo_users; | |
""" | |
if "unique_ids" not in st.session_state: | |
unique_users_result = query_excecuter_postgres( | |
unique_users_query, db_cred, insert=False | |
) # retrieves all the users who has access to MMO TOOL | |
if len(unique_users_result) == 0: | |
st.warning("No users data present in db, please contact admin!") | |
st.stop() | |
st.session_state["unique_ids"] = { | |
emp_id: (emp_nam, emp_type) for emp_id, emp_nam, emp_type in unique_users_result | |
} | |
if "toggle" not in st.session_state: | |
st.session_state["toggle"] = 0 | |
if "emp_id" not in st.session_state: | |
reset_password = st.radio( | |
"Select An Option", | |
options=["Login", "Reset Password"], | |
index=st.session_state["toggle"], | |
horizontal=True, | |
) | |
if reset_password == "Login": | |
emp_id = st.text_input("Employee id").lower() # emp id | |
password = st.text_input("Password", max_chars=15, type="password") | |
login_button = st.button("Login", use_container_width=True) | |
else: | |
emp_id = st.text_input( | |
"Employee id", key="sign_up", on_change=validate_emp_id | |
).lower() | |
current_password = st.text_input( | |
"Enter Current Password and Press Enter to Validate", | |
max_chars=15, | |
type="password", | |
key="current_password", | |
) | |
if emp_id: | |
if emp_id not in st.session_state["unique_ids"].keys(): | |
st.write("Invalid id!") | |
st.stop() | |
else: | |
if not is_pswrd_flag_set(emp_id): | |
if verify_password(emp_id, current_password): | |
st.success("Your password key has been successfully validated!") | |
elif ( | |
not verify_password(emp_id, current_password) | |
and len(current_password) > 1 | |
): | |
st.write("Wrong Password Key Please Try Again") | |
st.stop() | |
elif verify_password(emp_id, current_password): | |
st.success("Your password has been successfully validated!") | |
elif ( | |
not verify_password(emp_id, current_password) | |
and len(current_password) > 1 | |
): | |
st.write("Wrong Password Please Try Again") | |
st.stop() | |
new_password = st.text_input( | |
"Enter New Password", max_chars=15, type="password", key="new_password" | |
) | |
st.markdown( | |
"**Password must be at least 8 to 15 characters long and contain at least one uppercase letter, one lowercase letter, one digit, and one special character. No SQL commands allowed.**" | |
) | |
validation_result = validate_password(new_password) | |
confirm_new_password = st.text_input( | |
"Confirm New Password", | |
max_chars=15, | |
type="password", | |
key="confirm_new_password", | |
) | |
reset_button = st.button("Reset Password", use_container_width=True) | |
if reset_button: | |
validation_result = validate_password(new_password) | |
if validation_result != "Valid input.": | |
st.warning(validation_result) | |
st.stop() | |
elif new_password != confirm_new_password: | |
st.warning( | |
"The new password and confirmation password do not match. Please try again." | |
) | |
st.stop() | |
else: | |
store_hashed_password(emp_id, confirm_new_password) | |
set_pswrd_flag(emp_id) | |
st.success("Password Reset Successful!") | |
with st.spinner("Redirecting to Login"): | |
time.sleep(3) | |
st.session_state["toggle"] = 0 | |
st.rerun() | |
st.stop() | |
if login_button: | |
if emp_id not in st.session_state["unique_ids"].keys() or len(password) == 0: | |
st.warning("invalid id or password!") | |
st.stop() | |
if not is_pswrd_flag_set(emp_id): | |
st.warning("Reset password to continue") | |
with st.spinner("Redirecting"): | |
st.session_state["toggle"] = 1 | |
time.sleep(2) | |
st.rerun() | |
st.stop() | |
elif verify_password(emp_id, password): | |
with st.spinner("Loading Saved Projects"): | |
st.session_state["emp_id"] = emp_id | |
update_summary_df() # function call to fetch user saved projects | |
st.session_state["clone_project_dict"] = fetch_and_process_projects( | |
st.session_state["emp_id"] | |
) | |
if "project_dct" in st.session_state: | |
del st.session_state["project_dct"] | |
st.session_state["project_name"] = None | |
delete_old_log_files() | |
st.rerun() | |
if ( | |
len(st.session_state["emp_id"]) == 0 | |
or st.session_state["emp_id"] | |
not in st.session_state["unique_ids"].keys() | |
): | |
st.stop() | |
else: | |
st.warning("Invalid user name or password") | |
st.stop() | |
if st.button("Logout"): | |
if "emp_id" in st.session_state: | |
del st.session_state["emp_id"] | |
st.rerun() | |
if st.session_state["emp_id"] in st.session_state["unique_ids"].keys(): | |
if "project_name" not in st.session_state: | |
st.session_state["project_name"] = None | |
cols1 = st.columns([2, 1]) | |
st.session_state["username"] = st.session_state["unique_ids"][ | |
st.session_state["emp_id"] | |
][0] | |
with cols1[0]: | |
st.markdown(f"**Welcome {st.session_state['username']}**") | |
with cols1[1]: | |
st.markdown(f"**Current Project: {st.session_state['project_name']}**") | |
st.markdown( | |
""" | |
Enter project number in the text below and click on load project to load the project. | |
""" | |
) | |
st.markdown("Select Project") | |
# st.write(type(st.session_state.keys)) | |
if len(st.session_state["project_summary_df"]) != 0: | |
# Display an editable data table using Streamlit's data editor component | |
table = st.dataframe( | |
st.session_state["project_summary_df"], | |
use_container_width=True, | |
hide_index=True, | |
) | |
project_number = st.selectbox( | |
"Enter Project number", | |
options=st.session_state["project_summary_df"]["Project Number"], | |
) | |
log_message( | |
"info", | |
f"Project number {project_number} selected by employee {st.session_state['emp_id']}.", | |
"Home", | |
) | |
project_col = st.columns(2) | |
# if "load_project_key" not in st.session_state: | |
# st.session_state["load_project_key"] = None\ | |
def load_project_fun(): | |
st.session_state["project_name"] = ( | |
st.session_state["project_summary_df"] | |
.loc[ | |
st.session_state["project_summary_df"]["Project Number"] | |
== project_number, | |
"Project Name", | |
] | |
.values[0] | |
) # fetching project name from project number stored in summary df | |
project_dct_query = f""" | |
SELECT pkl_obj | |
FROM mmo_project_meta_data | |
WHERE prj_id = ? AND file_nam = ?; | |
""" | |
# Execute the query and retrieve the result | |
project_dct_retrieved = query_excecuter_postgres( | |
project_dct_query, | |
db_cred, | |
params=(project_number, "project_dct"), | |
insert=False, | |
) | |
# retrieves project dict (meta data) stored in db | |
st.session_state["project_dct"] = pickle.loads( | |
project_dct_retrieved[0][0] | |
) # converting bytes data to original objet using pickle | |
st.session_state["project_number"] = project_number | |
keys_to_keep = [ | |
"unique_ids", | |
"emp_id", | |
"project_dct", | |
"project_name", | |
"project_number", | |
"username", | |
"project_summary_df", | |
"clone_project_dict", | |
] | |
# Clear all keys in st.session_state except the ones to keep | |
for key in list(st.session_state.keys()): | |
if key not in keys_to_keep: | |
del st.session_state[key] | |
ensure_project_dct_structure(st.session_state["project_dct"], default_dct) | |
if st.button( | |
"Load Project", | |
use_container_width=True, | |
key="load_project_key", | |
on_click=load_project_fun, | |
): | |
st.success("Project Loded") | |
# st.rerun() # refresh the page | |
# st.write(st.session_state['project_dct']) | |
if "radio_box_index" not in st.session_state: | |
st.session_state["radio_box_index"] = 0 | |
projct_radio = st.radio( | |
"Select Options", | |
[ | |
"Create New Project", | |
"Modify Project Access", | |
"Clone Saved Projects", | |
"Delete Projects", | |
], | |
horizontal=True, | |
index=st.session_state["radio_box_index"], | |
) | |
if projct_radio == "Modify Project Access": | |
with st.expander("Modify Project Access"): | |
project_number_for_access = st.selectbox( | |
"Select Project Number", | |
st.session_state["project_summary_df"]["Project Number"], | |
) | |
with st.spinner("Loading"): | |
users_who_has_access = fetch_users_with_access( | |
project_number_for_access | |
) | |
users_name_who_has_access = [user[1] for user in users_who_has_access] | |
modified_users_for_access_options = [ | |
details[0] | |
for user_id, details in st.session_state["unique_ids"].items() | |
if user_id != st.session_state["emp_id"] | |
] | |
users_name_who_has_access = [ | |
name | |
for name in users_name_who_has_access | |
if name in modified_users_for_access_options | |
] | |
modified_users_for_access = st.multiselect( | |
"Select or deselect users to grant or revoke access, then click the 'Modify Access' button to submit changes.", | |
options=modified_users_for_access_options, | |
default=users_name_who_has_access, | |
) | |
if st.button("Modify Access", use_container_width=True): | |
with st.spinner("Modifying Access"): | |
update_project_access( | |
project_number_for_access, | |
modified_users_for_access, | |
st.session_state["unique_ids"], | |
) | |
if projct_radio == "Create New Project": | |
with st.expander("Create New Project", expanded=False): | |
st.session_state["is_create_project_open"] = True | |
unique_users = [ | |
user[0] for user in st.session_state["unique_ids"].values() | |
] # fetching unique users who has access to the tool | |
user_projects = list( | |
set(st.session_state["project_summary_df"]["Project Name"]) | |
) # fetching corressponding user's projects | |
st.markdown( | |
""" | |
To create a new project, follow the instructions below: | |
1. **Project Name**: | |
- It should start with the client name, followed by the username. | |
- It should not contain special characters except for underscores (`_`) and should not contain spaces. | |
- Example format: `<client_name>_<username>_<project_name>` | |
2. **Select User**: Select the user you want to give access to this project. | |
3. **Create New Project**: Click **Create New Project** once the above details are entered. | |
**Example**: | |
- For a client named "ClientA" and a user named "UserX" with a project named "NewCampaign", the project name should be: | |
`ClientA_UserX_NewCampaign` | |
""" | |
) | |
project_col1 = st.columns(3) | |
with project_col1[0]: | |
# API_tables = get_table_names(schema) # load API files | |
slection_tables = ["NA"] | |
api_name = st.selectbox("Select API data", slection_tables, index=0) | |
# data availabe through API | |
# api_path = API_path_dict[api_name] | |
with project_col1[1]: | |
defualt_project_prefix = f"{api_name.split('_mmo_')[0]}_{st.session_state['unique_ids'][st.session_state['emp_id']][0]}_".replace( | |
" ", "_" | |
).lower() | |
if "project_name_box" not in st.session_state: | |
st.session_state["project_name_box"] = defualt_project_prefix | |
project_name = st.text_input( | |
"Enter Project Name", key="project_name_box" | |
) | |
warning_box = st.empty() | |
with project_col1[2]: | |
allowed_users = st.multiselect( | |
"Select Users who can access to this Project", | |
[val for val in unique_users], | |
) | |
allowed_users = list(allowed_users) | |
matching_user_id = [] | |
if len(allowed_users) > 0: | |
# converting the selection to comma seperated values to store in db | |
for emp_id, details in st.session_state["unique_ids"].items(): | |
for name in allowed_users: | |
if name in details: | |
matching_user_id.append(emp_id) | |
break | |
st.button( | |
"Reset Project Name", | |
on_click=reset_project_text_box, | |
help="", | |
use_container_width=True, | |
) | |
create = st.button( | |
"Create New Project", | |
use_container_width=True, | |
help="Project Name should follow naming convention", | |
) | |
if create: | |
if not project_name.lower().startswith(defualt_project_prefix): | |
with warning_box: | |
st.warning("Project Name should follow naming convention") | |
st.stop() | |
if project_name == defualt_project_prefix: | |
with warning_box: | |
st.warning("Cannot name only with prefix") | |
st.stop() | |
if project_name in user_projects: | |
with warning_box: | |
st.warning("Project already exists please enter new name") | |
st.stop() | |
if not ( | |
2 <= len(project_name) <= 50 | |
and bool(re.match("^[A-Za-z0-9_]*$", project_name)) | |
): | |
# Store the warning message details in session state | |
with warning_box: | |
st.warning( | |
"Please provide a valid project name (2-50 characters, only A-Z, a-z, 0-9, and _)." | |
) | |
st.stop() | |
if contains_sql_keywords_check(project_name): | |
with warning_box: | |
st.warning( | |
"Input contains SQL keywords. Please avoid using SQL commands." | |
) | |
st.stop() | |
else: | |
pass | |
with st.spinner("Creating Project"): | |
new_project() | |
with warning_box: | |
st.write("Project Created") | |
st.session_state["radio_box_index"] = 1 | |
log_message( | |
"info", | |
f"Employee {st.session_state['emp_id']} created new project {project_name}.", | |
"Home", | |
) | |
st.rerun() | |
if projct_radio == "Clone Saved Projects": | |
with st.expander("Clone Saved Projects", expanded=False): | |
if len(st.session_state["clone_project_dict"]) == 0: | |
st.warning("You dont have access to any saved projects") | |
st.stop() | |
cols = st.columns(2) | |
with cols[0]: | |
owners = list(st.session_state["clone_project_dict"].keys()) | |
owner_name = st.selectbox("Select Owner", owners) | |
with cols[1]: | |
project_names = [ | |
project["project_name"] | |
for project in st.session_state["clone_project_dict"][owner_name] | |
] | |
project_name_owner = st.selectbox( | |
"Select a saved Project available for you", | |
project_names, | |
) | |
defualt_project_prefix = f"{project_name_owner.split('_')[0]}_{st.session_state['unique_ids'][st.session_state['emp_id']][0]}_".replace( | |
" ", "_" | |
).lower() | |
user_projects = list( | |
set(st.session_state["project_summary_df"]["Project Name"]) | |
) | |
cloned_project_name = st.text_input( | |
"Enter Project Name", | |
value=defualt_project_prefix, | |
) | |
warning_box = st.empty() | |
if st.button( | |
"Load Project", use_container_width=True, key="load_project_button_key" | |
): | |
if not cloned_project_name.lower().startswith(defualt_project_prefix): | |
with warning_box: | |
st.warning("Project Name should follow naming conventions") | |
st.stop() | |
if cloned_project_name == defualt_project_prefix: | |
with warning_box: | |
st.warning("Cannot Name only with Prefix") | |
st.stop() | |
if cloned_project_name in user_projects: | |
with warning_box: | |
st.warning("Project already exists please enter new name") | |
st.stop() | |
with st.spinner("Cloning Project"): | |
old_prj_id = get_project_id_from_dict( | |
st.session_state["clone_project_dict"], | |
owner_name, | |
project_name_owner, | |
) | |
old_metadata = fetch_project_metadata(old_prj_id) | |
new_prj_id = create_new_project( | |
st.session_state["emp_id"], | |
cloned_project_name, | |
"", | |
st.session_state["emp_id"], | |
) | |
insert_project_metadata( | |
new_prj_id, old_metadata, st.session_state["emp_id"] | |
) | |
update_summary_df() | |
st.success("Project Cloned") | |
st.rerun() | |
if projct_radio == "Delete Projects": | |
if len(st.session_state["project_summary_df"]) != 0: | |
with st.expander("Delete Projects", expanded=True): | |
delete_projects = st.multiselect( | |
"Select all the projects number who want to delete", | |
st.session_state["project_summary_df"]["Project Number"], | |
) | |
st.warning( | |
"Projects will be permanently deleted. Other users will not be able to clone them if they have not already done so." | |
) | |
if st.button("Delete Projects", use_container_width=True): | |
if len(delete_projects) > 0: | |
with st.spinner("Deleting Projects"): | |
delete_projects_by_ids(delete_projects) | |
update_summary_df() | |
st.success("Projects Deleted") | |
st.rerun() | |
else: | |
st.warning("Please select atleast one project number to delete") | |
if projct_radio == "Download Project PPT": | |
try: | |
ppt = create_ppt( | |
st.session_state["project_name"], | |
st.session_state["username"], | |
"panel", # new | |
) | |
if ppt is not False: | |
st.download_button( | |
"Download", | |
data=ppt.getvalue(), | |
file_name=st.session_state["project_name"] | |
+ " Project Summary.pptx", | |
use_container_width=True, | |
) | |
else: | |
st.warning("Please make some progress before downloading PPT.") | |
except Exception as e: | |
st.warning("PPT Download Faild ") | |
# new | |
log_message( | |
log_type="error", message=f"Error in PPT build: {e}", page_name="Home" | |
) | |