samkeet's picture
Upload 40 files
00b00eb verified
# importing pacakages
import streamlit as st
st.set_page_config(layout="wide")
from utilities import (
load_local_css,
set_header,
ensure_project_dct_structure,
store_hashed_password,
verify_password,
is_pswrd_flag_set,
set_pswrd_flag,
)
import os
from datetime import datetime
import pandas as pd
import pickle
import psycopg2
#
import numbers
from collections import OrderedDict
import re
from ppt_utils import create_ppt
from constants import default_dct
import time
from log_application import log_message, delete_old_log_files
import sqlite3
# setting page config
load_local_css("styles.css")
set_header()
db_cred = None
# --------------Functions----------------------#
# # schema = db_cred["schema"]
##API DATA#######################
# Function to load gold layer data
# @st.cache_data(show_spinner=False)
def load_gold_layer_data(table_name):
# Fetch Table
query = f"""
SELECT * FROM {table_name};
"""
# Execute the query and get the results
results = query_excecuter_postgres(
query, db_cred, insert=False, return_dataframe=True
)
if results is not None and not results.empty:
# Create a DataFrame
gold_layer_df = results
else:
st.warning("No data found for the selected table.")
st.stop()
# Columns to be removed
columns_to_remove = [
"clnt_nam",
"crte_dt_tm",
"crte_by_uid",
"updt_dt_tm",
"updt_by_uid",
"campgn_id",
"campgn_nam",
"ad_id",
"ad_nam",
"tctc_id",
"tctc_nam",
"campgn_grp_id",
"campgn_grp_nam",
"ad_grp_id",
"ad_grp_nam",
]
# TEMP CODE
gold_layer_df = gold_layer_df.rename(
columns={
"imprssns_cnt": "mda_imprssns_cnt",
"clcks_cnt": "mda_clcks_cnt",
"vd_vws_cnt": "mda_vd_vws_cnt",
}
)
# Remove specific columns
gold_layer_df = gold_layer_df.drop(columns=columns_to_remove, errors="ignore")
# Convert columns to numeric or datetime as appropriate
for col in gold_layer_df.columns:
if (
col.startswith("rspns_mtrc_")
or col.startswith("mda_")
or col.startswith("exogenous_")
or col.startswith("internal_")
or col in ["spnd_amt"]
):
gold_layer_df[col] = pd.to_numeric(gold_layer_df[col], errors="coerce")
elif col == "rcrd_dt":
gold_layer_df[col] = pd.to_datetime(gold_layer_df[col], errors="coerce")
# Replace columns starting with 'mda_' to 'media_'
gold_layer_df.columns = [
(col.replace("mda_", "media_") if col.startswith("mda_") else col)
for col in gold_layer_df.columns
]
# Identify non-numeric columns
non_numeric_columns = gold_layer_df.select_dtypes(exclude=["number"]).columns
allow_non_numeric_columns = ["rcrd_dt", "aggrgtn_lvl", "sub_chnnl_nam", "panl_nam"]
# Remove non-numeric columns except for allowed non-numeric columns
non_numeric_columns_to_remove = [
col for col in non_numeric_columns if col not in allow_non_numeric_columns
]
gold_layer_df = gold_layer_df.drop(
columns=non_numeric_columns_to_remove, errors="ignore"
)
# Remove specific columns
allow_columns = ["rcrd_dt", "aggrgtn_lvl", "sub_chnnl_nam", "panl_nam", "spnd_amt"]
for col in gold_layer_df.columns:
if (
col.startswith("rspns_mtrc_")
or col.startswith("media_")
or col.startswith("exogenous_")
or col.startswith("internal_")
):
allow_columns.append(col)
gold_layer_df = gold_layer_df[allow_columns]
# Rename columns
gold_layer_df = gold_layer_df.rename(
columns={
"rcrd_dt": "date",
"sub_chnnl_nam": "channels",
"panl_nam": "panel",
"spnd_amt": "spends",
}
)
# Clean column values
gold_layer_df["panel"] = (
gold_layer_df["panel"].astype(str).str.lower().str.strip().str.replace(" ", "_")
)
gold_layer_df["channels"] = (
gold_layer_df["channels"]
.astype(str)
.str.lower()
.str.strip()
.str.replace(" ", "_")
)
# Replace columns starting with 'rspns_mtrc_' to 'response_metric_'
gold_layer_df.columns = [
(
col.replace("rspns_mtrc_", "response_metric_")
if col.startswith("rspns_mtrc_")
else col
)
for col in gold_layer_df.columns
]
# Get the minimum date from the main dataframe
min_date = gold_layer_df["date"].min()
# Get maximum dates for daily and weekly data
max_date_daily = None
max_date_weekly = None
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "daily"].empty:
max_date_daily = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "daily"][
"date"
].max()
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].empty:
max_date_weekly = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"][
"date"
].max() + pd.DateOffset(days=6)
# Determine final maximum date
if max_date_daily is not None and max_date_weekly is not None:
final_max_date = max(max_date_daily, max_date_weekly)
elif max_date_daily is not None:
final_max_date = max_date_daily
elif max_date_weekly is not None:
final_max_date = max_date_weekly
# Create a date range with daily frequency
date_range = pd.date_range(start=min_date, end=final_max_date, freq="D")
# Create a base DataFrame with all channels and all panels for each channel
unique_channels = gold_layer_df["channels"].unique()
unique_panels = gold_layer_df["panel"].unique()
base_data = [
(channel, panel, date)
for channel in unique_channels
for panel in unique_panels
for date in date_range
]
base_df = pd.DataFrame(base_data, columns=["channels", "panel", "date"])
# Process weekly data to convert it to daily
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].empty:
weekly_data = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "weekly"].copy()
daily_data = []
for index, row in weekly_data.iterrows():
week_start = pd.to_datetime(row["date"]) - pd.to_timedelta(
pd.to_datetime(row["date"]).weekday(), unit="D"
)
for i in range(7):
daily_date = week_start + pd.DateOffset(days=i)
new_row = row.copy()
new_row["date"] = daily_date
for col in new_row.index:
if isinstance(new_row[col], numbers.Number):
new_row[col] = new_row[col] / 7
daily_data.append(new_row)
daily_data_df = pd.DataFrame(daily_data)
daily_data_df["aggrgtn_lvl"] = "daily"
gold_layer_df = pd.concat(
[gold_layer_df[gold_layer_df["aggrgtn_lvl"] != "weekly"], daily_data_df],
ignore_index=True,
)
# Process monthly data to convert it to daily
if not gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "monthly"].empty:
monthly_data = gold_layer_df[gold_layer_df["aggrgtn_lvl"] == "monthly"].copy()
daily_data = []
for index, row in monthly_data.iterrows():
month_start = pd.to_datetime(row["date"]).replace(day=1)
next_month_start = (month_start + pd.DateOffset(months=1)).replace(day=1)
days_in_month = (next_month_start - month_start).days
for i in range(days_in_month):
daily_date = month_start + pd.DateOffset(days=i)
new_row = row.copy()
new_row["date"] = daily_date
for col in new_row.index:
if isinstance(new_row[col], numbers.Number):
new_row[col] = new_row[col] / days_in_month
daily_data.append(new_row)
daily_data_df = pd.DataFrame(daily_data)
daily_data_df["aggrgtn_lvl"] = "daily"
gold_layer_df = pd.concat(
[gold_layer_df[gold_layer_df["aggrgtn_lvl"] != "monthly"], daily_data_df],
ignore_index=True,
)
# Remove aggrgtn_lvl column
gold_layer_df = gold_layer_df.drop(columns=["aggrgtn_lvl"], errors="ignore")
# Group by 'panel', and 'date'
gold_layer_df = gold_layer_df.groupby(["channels", "panel", "date"]).sum()
# Merge gold_layer_df to base_df on channels, panel and date
gold_layer_df_cleaned = pd.merge(
base_df, gold_layer_df, on=["channels", "panel", "date"], how="left"
)
# Pivot the dataframe and rename columns
pivot_columns = [
col
for col in gold_layer_df_cleaned.columns
if col not in ["channels", "panel", "date"]
]
gold_layer_df_cleaned = gold_layer_df_cleaned.pivot_table(
index=["date", "panel"], columns="channels", values=pivot_columns, aggfunc="sum"
).reset_index()
# Flatten the columns
gold_layer_df_cleaned.columns = [
"_".join(col).strip() if col[1] else col[0]
for col in gold_layer_df_cleaned.columns.values
]
# Replace columns ending with '_all' to '_total'
gold_layer_df_cleaned.columns = [
col.replace("_all", "_total") if col.endswith("_all") else col
for col in gold_layer_df_cleaned.columns
]
# Clean panel column values
gold_layer_df_cleaned["panel"] = (
gold_layer_df_cleaned["panel"]
.astype(str)
.str.lower()
.str.strip()
.str.replace(" ", "_")
)
# Drop all columns that end with '_total' except those starting with 'response_metric_'
cols_to_drop = [
col
for col in gold_layer_df_cleaned.columns
if col.endswith("_total") and not col.startswith("response_metric_")
]
gold_layer_df_cleaned.drop(columns=cols_to_drop, inplace=True)
return gold_layer_df_cleaned
def check_valid_name():
if (
not st.session_state["project_name_box"]
.lower()
.startswith(defualt_project_prefix)
):
st.session_state["disable_create_project"] = True
with warning_box:
st.warning("Project Name should follow naming conventions")
st.session_state["warning"] = (
"Project Name should follow naming conventions!"
)
with warning_box:
st.warning("Project Name should follow naming conventions")
st.button("Reset Name", on_click=reset_project_text_box, key="2")
if st.session_state["project_name_box"] == defualt_project_prefix:
with warning_box:
st.warning("Cannot Name only with Prefix")
st.session_state["warning"] = "Cannot Name only with Prefix"
st.session_state["disable_create_project"] = True
if st.session_state["project_name_box"] in user_projects:
with warning_box:
st.warning("Project already exists please enter new name")
st.session_state["warning"] = "Project already exists please enter new name"
st.session_state["disable_create_project"] = True
else:
st.session_state["disable_create_project"] = False
def query_excecuter_postgres(
query,
db_path=None,
params=None,
insert=True,
insert_retrieve=False,
db_cred=None,
):
"""
Executes a SQL query on a SQLite database, handling both insert and select operations.
Parameters:
query (str): The SQL query to be executed.
db_path (str): Path to the SQLite database file.
params (tuple, optional): Parameters to pass into the SQL query for parameterized execution.
insert (bool, default=True): Flag to determine if the query is an insert operation (default) or a select operation.
insert_retrieve (bool, default=False): Flag to determine if the query should insert and then return the inserted ID.
"""
try:
# Construct a cross-platform path to the database
db_dir = os.path.join("db")
os.makedirs(db_dir, exist_ok=True) # Make sure the directory exists
db_path = os.path.join(db_dir, "imp_db.db")
# Establish connection to the SQLite database
conn = sqlite3.connect(db_path)
except sqlite3.Error as e:
st.warning(f"Unable to connect to the SQLite database: {e}")
st.stop()
# Create a cursor object to interact with the database
c = conn.cursor()
try:
# Execute the query with or without parameters
if params:
params = tuple(params)
query = query.replace("IN (?)", f"IN ({','.join(['?' for _ in params])})")
c.execute(query, params)
else:
c.execute(query)
if not insert:
# If not an insert operation, fetch and return the results
results = c.fetchall()
return results
elif insert_retrieve:
# If insert and retrieve operation, commit and return the last inserted row ID
conn.commit()
return c.lastrowid
else:
# For standard insert operations, commit the transaction
conn.commit()
except Exception as e:
st.write(f"Error executing query: {e}")
finally:
conn.close()
# Function to check if the input contains any SQL keywords
def contains_sql_keywords_check(user_input):
sql_keywords = [
"SELECT",
"INSERT",
"UPDATE",
"DELETE",
"DROP",
"ALTER",
"CREATE",
"GRANT",
"REVOKE",
"UNION",
"JOIN",
"WHERE",
"HAVING",
"EXEC",
"TRUNCATE",
"REPLACE",
"MERGE",
"DECLARE",
"SHOW",
"FROM",
]
pattern = "|".join(re.escape(keyword) for keyword in sql_keywords)
return re.search(pattern, user_input, re.IGNORECASE)
# def get_table_names(schema):
# query = f"""
# SELECT table_name
# FROM information_schema.tables
# WHERE table_schema = '{schema}'
# AND table_type = 'BASE TABLE'
# AND table_name LIKE '%_mmo_gold';
# """
# table_names = query_excecuter_postgres(query, db_cred, insert=False)
# table_names = [table[0] for table in table_names]
# return table_names
def update_summary_df():
"""
Updates the 'project_summary_df' in the session state with the latest project
summary information based on the most recent updates.
This function executes a SQL query to retrieve project metadata from a database
and stores the result in the session state.
Uses:
- query_excecuter_postgres(query, params=params, insert=False): A function that
executes the provided SQL query on a PostgreSQL database.
Modifies:
- st.session_state['project_summary_df']: Updates the dataframe with columns:
'Project Number', 'Project Name', 'Last Modified Page', 'Last Modified Time'.
"""
query = f"""
WITH LatestUpdates AS (
SELECT
prj_id,
page_nam,
updt_dt_tm,
ROW_NUMBER() OVER (PARTITION BY prj_id ORDER BY updt_dt_tm DESC) AS rn
FROM
mmo_project_meta_data
)
SELECT
p.prj_id,
p.prj_nam AS prj_nam,
lu.page_nam,
lu.updt_dt_tm
FROM
LatestUpdates lu
RIGHT JOIN
mmo_projects p ON lu.prj_id = p.prj_id
WHERE
p.prj_ownr_id = ? AND lu.rn = 1
"""
params = (st.session_state["emp_id"],) # Parameters for the SQL query
# Execute the query and retrieve project summary data
project_summary = query_excecuter_postgres(
query, db_cred, params=params, insert=False
)
# Update the session state with the project summary dataframe
st.session_state["project_summary_df"] = pd.DataFrame(
project_summary,
columns=[
"Project Number",
"Project Name",
"Last Modified Page",
"Last Modified Time",
],
)
st.session_state["project_summary_df"] = st.session_state[
"project_summary_df"
].sort_values(by=["Last Modified Time"], ascending=False)
def reset_project_text_box():
st.session_state["project_name_box"] = defualt_project_prefix
st.session_state["disable_create_project"] = True
def query_excecuter_sqlite(
insert_projects_query,
insert_meta_data_query,
db_path=None,
params_projects=None,
params_meta=None,
):
"""
Executes the project insert and associated metadata insert in an SQLite database.
Parameters:
insert_projects_query (str): SQL query for inserting into the mmo_projects table.
insert_meta_data_query (str): SQL query for inserting into the mmo_project_meta_data table.
db_path (str): Path to the SQLite database file.
params_projects (tuple, optional): Parameters for the mmo_projects table insert.
params_meta (tuple, optional): Parameters for the mmo_project_meta_data table insert.
Returns:
bool: True if successful, False otherwise.
"""
try:
# Construct a cross-platform path to the database
db_dir = os.path.join("db")
os.makedirs(db_dir, exist_ok=True) # Make sure the directory exists
db_path = os.path.join(db_dir, "imp_db.db")
# Establish connection to the SQLite database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Execute the first insert query into the mmo_projects table
cursor.execute(insert_projects_query, params_projects)
# Get the last inserted project ID
prj_id = cursor.lastrowid
# Modify the parameters for the metadata table with the inserted prj_id
params_meta = (prj_id,) + params_meta
# Execute the second insert query into the mmo_project_meta_data table
cursor.execute(insert_meta_data_query, params_meta)
# Commit the transaction
conn.commit()
except sqlite3.Error as e:
st.warning(f"Error executing query: {e}")
return False
finally:
# Close the connection
conn.close()
return True
def new_project():
"""
Cleans the project name input and inserts project data into the SQLite database,
updating session state and triggering UI rerun if successful.
"""
# Define a dictionary containing project data
project_dct = default_dct.copy()
gold_layer_df = pd.DataFrame()
if str(api_name).strip().lower() != "na":
try:
gold_layer_df = load_gold_layer_data(api_name)
except Exception as e:
st.toast(
"Failed to load gold layer data. Please check the gold layer structure and connection.",
icon="⚠️",
)
log_message(
"error",
f"Error loading gold layer data: {str(e)}",
"Home",
)
project_dct["data_import"]["gold_layer_df"] = gold_layer_df
# Get current time for database insertion
inserted_time = datetime.now().isoformat()
# Define SQL queries for inserting project and metadata into the SQLite database
insert_projects_query = """
INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_dt_tm, crte_by_uid)
VALUES (?, ?, ?, ?, ?);
"""
insert_meta_data_query = """
INSERT INTO mmo_project_meta_data (prj_id, page_nam, file_nam, pkl_obj, crte_dt_tm, crte_by_uid, updt_dt_tm)
VALUES (?, ?, ?, ?, ?, ?, ?);
"""
# Get current time for metadata update
updt_dt_tm = datetime.now().isoformat()
# Serialize project_dct using pickle
project_pkl = pickle.dumps(project_dct)
# Prepare data for database insertion
projects_data = (
st.session_state["emp_id"], # prj_ownr_id
project_name, # prj_nam
",".join(matching_user_id), # alwd_emp_id
inserted_time, # crte_dt_tm
st.session_state["emp_id"], # crte_by_uid
)
project_meta_data = (
"Home", # page_nam
"project_dct", # file_nam
project_pkl, # pkl_obj
inserted_time, # crte_dt_tm
st.session_state["emp_id"], # crte_by_uid
updt_dt_tm, # updt_dt_tm
)
# Execute the insertion query for SQLite
success = query_excecuter_sqlite(
insert_projects_query,
insert_meta_data_query,
params_projects=projects_data,
params_meta=project_meta_data,
)
if success:
st.success("Project Created")
update_summary_df()
else:
st.error("Failed to create project.")
def validate_password(user_input):
# List of SQL keywords to check for
sql_keywords = [
"SELECT",
"INSERT",
"UPDATE",
"DELETE",
"DROP",
"ALTER",
"CREATE",
"GRANT",
"REVOKE",
"UNION",
"JOIN",
"WHERE",
"HAVING",
"EXEC",
"TRUNCATE",
"REPLACE",
"MERGE",
"DECLARE",
"SHOW",
"FROM",
]
# Create a regex pattern for SQL keywords
pattern = "|".join(re.escape(keyword) for keyword in sql_keywords)
# Check if input contains any SQL keywords
if re.search(pattern, user_input, re.IGNORECASE):
return "SQL keyword detected."
# Password validation criteria
if len(user_input) < 8:
return "Password should be at least 8 characters long."
if not re.search(r"[A-Z]", user_input):
return "Password should contain at least one uppercase letter."
if not re.search(r"[0-9]", user_input):
return "Password should contain at least one digit."
if not re.search(r"[a-z]", user_input):
return "Password should contain at least one lowercase letter."
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', user_input):
return "Password should contain at least one special character."
# If all checks pass
return "Valid input."
def fetch_and_process_projects(emp_id):
query = f"""
WITH ProjectAccess AS (
SELECT
p.prj_id,
p.prj_nam,
p.alwd_emp_id,
u.emp_nam AS project_owner
FROM mmo_projects p
JOIN mmo_users u ON p.prj_ownr_id = u.emp_id
)
SELECT
pa.prj_id,
pa.prj_nam,
pa.project_owner
FROM
ProjectAccess pa
WHERE
pa.alwd_emp_id LIKE ?
ORDER BY
pa.prj_id;
"""
params = (f"%{emp_id}%",)
results = query_excecuter_postgres(query, db_cred, params=params, insert=False)
# Process the results to create the desired dictionary structure
clone_project_dict = {}
for row in results:
project_id, project_name, project_owner = row
if project_owner not in clone_project_dict:
clone_project_dict[project_owner] = []
clone_project_dict[project_owner].append(
{"project_name": project_name, "project_id": project_id}
)
return clone_project_dict
def get_project_id_from_dict(projects_dict, owner_name, project_name):
if owner_name in projects_dict:
for project in projects_dict[owner_name]:
if project["project_name"] == project_name:
return project["project_id"]
return None
# def fetch_project_metadata(prj_id):
# query = f"""
# SELECT
# prj_id, page_nam, file_nam, pkl_obj, dshbrd_ts
# FROM
# mmo_project_meta_data
# WHERE
# prj_id = ?;
# """
# params = (prj_id,)
# return query_excecuter_postgres(query, db_cred, params=params, insert=False)
def fetch_project_metadata(prj_id):
# Query to select project metadata
query = """
SELECT
prj_id, page_nam, file_nam, pkl_obj, dshbrd_ts
FROM
mmo_project_meta_data
WHERE
prj_id = ?;
"""
params = (prj_id,)
return query_excecuter_postgres(query, db_cred, params=params, insert=False)
# def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id):
# query = f"""
# INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm)
# VALUES (?, ?, ?, ?, NOW())
# RETURNING prj_id;
# """
# params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id)
# result = query_excecuter_postgres(
# query, db_cred, params=params, insert=True, insert_retrieve=True
# )
# return result[0][0]
# def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id):
# # Query to insert a new project
# insert_query = """
# INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm)
# VALUES (?, ?, ?, ?, DATETIME('now'));
# """
# params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id)
# # Execute the insert query
# query_excecuter_postgres(insert_query, db_cred, params=params, insert=True)
# # Retrieve the last inserted prj_id
# retrieve_id_query = "SELECT last_insert_rowid();"
# result = query_excecuter_postgres(retrieve_id_query, db_cred, insert_retrieve=True)
# return result[0][0]
def create_new_project(prj_ownr_id, prj_nam, alwd_emp_id, emp_id):
# Query to insert a new project
insert_query = """
INSERT INTO mmo_projects (prj_ownr_id, prj_nam, alwd_emp_id, crte_by_uid, crte_dt_tm)
VALUES (?, ?, ?, ?, DATETIME('now'));
"""
params = (prj_ownr_id, prj_nam, alwd_emp_id, emp_id)
# Execute the insert query and retrieve the last inserted prj_id directly
last_inserted_id = query_excecuter_postgres(
insert_query, params=params, insert_retrieve=True
)
return last_inserted_id
def insert_project_metadata(new_prj_id, metadata, created_emp_id):
# query = f"""
# INSERT INTO mmo_project_meta_data (
# prj_id, page_nam, crte_dt_tm, file_nam, pkl_obj, dshbrd_ts, crte_by_uid
# )
# VALUES (?, ?, NOW(), ?, ?, ?, ?);
# """
query = """
INSERT INTO mmo_project_meta_data (
prj_id, page_nam, crte_dt_tm, file_nam, pkl_obj, dshbrd_ts, crte_by_uid
)
VALUES (?, ?, DATETIME('now'), ?, ?, ?, ?);
"""
for row in metadata:
params = (new_prj_id, row[1], row[2], row[3], row[4], created_emp_id)
query_excecuter_postgres(query, db_cred, params=params, insert=True)
# def delete_projects_by_ids(prj_ids):
# # Ensure prj_ids is a tuple to use with the IN clause
# prj_ids_tuple = tuple(prj_ids)
# # Query to delete project metadata
# delete_metadata_query = f"""
# DELETE FROM mmo_project_meta_data
# WHERE prj_id IN ?;
# """
# delete_projects_query = f"""
# DELETE FROM mmo_projects
# WHERE prj_id IN ?;
# """
# try:
# # Delete from metadata table
# query_excecuter_postgres(
# delete_metadata_query, db_cred, params=(prj_ids_tuple,), insert=True
# )
# # Delete from projects table
# query_excecuter_postgres(
# delete_projects_query, db_cred, params=(prj_ids_tuple,), insert=True
# )
# except Exception as e:
# st.write(f"Error deleting projects: {e}")
def delete_projects_by_ids(prj_ids):
# Ensure prj_ids is a tuple to use with the IN clause
prj_ids_tuple = tuple(prj_ids)
# Dynamically generate placeholders for SQLite
placeholders = ", ".join(["?"] * len(prj_ids_tuple))
# Query to delete project metadata with dynamic placeholders
delete_metadata_query = f"""
DELETE FROM mmo_project_meta_data
WHERE prj_id IN ({placeholders});
"""
delete_projects_query = f"""
DELETE FROM mmo_projects
WHERE prj_id IN ({placeholders});
"""
try:
# Delete from metadata table
query_excecuter_postgres(
delete_metadata_query, db_cred, params=prj_ids_tuple, insert=True
)
# Delete from projects table
query_excecuter_postgres(
delete_projects_query, db_cred, params=prj_ids_tuple, insert=True
)
except Exception as e:
st.write(f"Error deleting projects: {e}")
def fetch_users_with_access(prj_id):
# Query to get allowed employee IDs for the project
get_allowed_emps_query = """
SELECT alwd_emp_id
FROM mmo_projects
WHERE prj_id = ?;
"""
# Fetch the allowed employee IDs
allowed_emp_ids_result = query_excecuter_postgres(
get_allowed_emps_query, db_cred, params=(prj_id,), insert=False
)
if not allowed_emp_ids_result:
return []
# Extract the allowed employee IDs (Assuming alwd_emp_id is a comma-separated string)
allowed_emp_ids_str = allowed_emp_ids_result[0][0]
allowed_emp_ids = allowed_emp_ids_str.split(",")
# Convert to tuple for the IN clause
allowed_emp_ids_tuple = tuple(allowed_emp_ids)
# Query to get user details for the allowed employee IDs
get_users_query = """
SELECT emp_id, emp_nam, emp_typ
FROM mmo_users
WHERE emp_id IN ({});
""".format(
",".join("?" * len(allowed_emp_ids_tuple))
) # Dynamically construct the placeholder list
# Fetch user details
user_details = query_excecuter_postgres(
get_users_query, db_cred, params=allowed_emp_ids_tuple, insert=False
)
return user_details
# def update_project_access(prj_id, user_names, new_user_ids):
# # Convert the list of new user IDs to a comma-separated string
# new_user_ids_str = ",".join(new_user_ids)
# # Query to update the alwd_emp_id for the specified project
# update_access_query = f"""
# UPDATE mmo_projects
# SET alwd_emp_id = ?
# WHERE prj_id = ?;
# """
# # Execute the update query
# query_excecuter_postgres(
# update_access_query, db_cred, params=(new_user_ids_str, prj_id), insert=True
# )
# st.success(f"Project {prj_id} access updated successfully")
def fetch_user_ids_from_dict(user_dict, user_names):
user_ids = []
# Iterate over the user_dict to find matching user names
for user_id, details in user_dict.items():
if details[0] in user_names:
user_ids.append(user_id)
return user_ids
def update_project_access(prj_id, user_names, user_dict):
# Fetch the new user IDs based on the provided user names from the dictionary
new_user_ids = fetch_user_ids_from_dict(user_dict, user_names)
# Convert the list of new user IDs to a comma-separated string
new_user_ids_str = ",".join(new_user_ids)
# Query to update the alwd_emp_id for the specified project
update_access_query = f"""
UPDATE mmo_projects
SET alwd_emp_id = ?
WHERE prj_id = ?;
"""
# Execute the update query
query_excecuter_postgres(
update_access_query, db_cred, params=(new_user_ids_str, prj_id), insert=True
)
st.write(f"Project {prj_id} access updated successfully")
def validate_emp_id():
if st.session_state.sign_up not in st.session_state["unique_ids"].keys():
st.warning("You dont have access to the tool please contact admin")
# -------------------Front END-------------------------#
st.header("Manage Projects")
unique_users_query = f"""
SELECT DISTINCT emp_id, emp_nam, emp_typ
FROM mmo_users;
"""
if "unique_ids" not in st.session_state:
unique_users_result = query_excecuter_postgres(
unique_users_query, db_cred, insert=False
) # retrieves all the users who has access to MMO TOOL
if len(unique_users_result) == 0:
st.warning("No users data present in db, please contact admin!")
st.stop()
st.session_state["unique_ids"] = {
emp_id: (emp_nam, emp_type) for emp_id, emp_nam, emp_type in unique_users_result
}
if "toggle" not in st.session_state:
st.session_state["toggle"] = 0
if "emp_id" not in st.session_state:
reset_password = st.radio(
"Select An Option",
options=["Login", "Reset Password"],
index=st.session_state["toggle"],
horizontal=True,
)
if reset_password == "Login":
emp_id = st.text_input("Employee id").lower() # emp id
password = st.text_input("Password", max_chars=15, type="password")
login_button = st.button("Login", use_container_width=True)
else:
emp_id = st.text_input(
"Employee id", key="sign_up", on_change=validate_emp_id
).lower()
current_password = st.text_input(
"Enter Current Password and Press Enter to Validate",
max_chars=15,
type="password",
key="current_password",
)
if emp_id:
if emp_id not in st.session_state["unique_ids"].keys():
st.write("Invalid id!")
st.stop()
else:
if not is_pswrd_flag_set(emp_id):
if verify_password(emp_id, current_password):
st.success("Your password key has been successfully validated!")
elif (
not verify_password(emp_id, current_password)
and len(current_password) > 1
):
st.write("Wrong Password Key Please Try Again")
st.stop()
elif verify_password(emp_id, current_password):
st.success("Your password has been successfully validated!")
elif (
not verify_password(emp_id, current_password)
and len(current_password) > 1
):
st.write("Wrong Password Please Try Again")
st.stop()
new_password = st.text_input(
"Enter New Password", max_chars=15, type="password", key="new_password"
)
st.markdown(
"**Password must be at least 8 to 15 characters long and contain at least one uppercase letter, one lowercase letter, one digit, and one special character. No SQL commands allowed.**"
)
validation_result = validate_password(new_password)
confirm_new_password = st.text_input(
"Confirm New Password",
max_chars=15,
type="password",
key="confirm_new_password",
)
reset_button = st.button("Reset Password", use_container_width=True)
if reset_button:
validation_result = validate_password(new_password)
if validation_result != "Valid input.":
st.warning(validation_result)
st.stop()
elif new_password != confirm_new_password:
st.warning(
"The new password and confirmation password do not match. Please try again."
)
st.stop()
else:
store_hashed_password(emp_id, confirm_new_password)
set_pswrd_flag(emp_id)
st.success("Password Reset Successful!")
with st.spinner("Redirecting to Login"):
time.sleep(3)
st.session_state["toggle"] = 0
st.rerun()
st.stop()
if login_button:
if emp_id not in st.session_state["unique_ids"].keys() or len(password) == 0:
st.warning("invalid id or password!")
st.stop()
if not is_pswrd_flag_set(emp_id):
st.warning("Reset password to continue")
with st.spinner("Redirecting"):
st.session_state["toggle"] = 1
time.sleep(2)
st.rerun()
st.stop()
elif verify_password(emp_id, password):
with st.spinner("Loading Saved Projects"):
st.session_state["emp_id"] = emp_id
update_summary_df() # function call to fetch user saved projects
st.session_state["clone_project_dict"] = fetch_and_process_projects(
st.session_state["emp_id"]
)
if "project_dct" in st.session_state:
del st.session_state["project_dct"]
st.session_state["project_name"] = None
delete_old_log_files()
st.rerun()
if (
len(st.session_state["emp_id"]) == 0
or st.session_state["emp_id"]
not in st.session_state["unique_ids"].keys()
):
st.stop()
else:
st.warning("Invalid user name or password")
st.stop()
if st.button("Logout"):
if "emp_id" in st.session_state:
del st.session_state["emp_id"]
st.rerun()
if st.session_state["emp_id"] in st.session_state["unique_ids"].keys():
if "project_name" not in st.session_state:
st.session_state["project_name"] = None
cols1 = st.columns([2, 1])
st.session_state["username"] = st.session_state["unique_ids"][
st.session_state["emp_id"]
][0]
with cols1[0]:
st.markdown(f"**Welcome {st.session_state['username']}**")
with cols1[1]:
st.markdown(f"**Current Project: {st.session_state['project_name']}**")
st.markdown(
"""
Enter project number in the text below and click on load project to load the project.
"""
)
st.markdown("Select Project")
# st.write(type(st.session_state.keys))
if len(st.session_state["project_summary_df"]) != 0:
# Display an editable data table using Streamlit's data editor component
table = st.dataframe(
st.session_state["project_summary_df"],
use_container_width=True,
hide_index=True,
)
project_number = st.selectbox(
"Enter Project number",
options=st.session_state["project_summary_df"]["Project Number"],
)
log_message(
"info",
f"Project number {project_number} selected by employee {st.session_state['emp_id']}.",
"Home",
)
project_col = st.columns(2)
# if "load_project_key" not in st.session_state:
# st.session_state["load_project_key"] = None\
def load_project_fun():
st.session_state["project_name"] = (
st.session_state["project_summary_df"]
.loc[
st.session_state["project_summary_df"]["Project Number"]
== project_number,
"Project Name",
]
.values[0]
) # fetching project name from project number stored in summary df
project_dct_query = f"""
SELECT pkl_obj
FROM mmo_project_meta_data
WHERE prj_id = ? AND file_nam = ?;
"""
# Execute the query and retrieve the result
project_dct_retrieved = query_excecuter_postgres(
project_dct_query,
db_cred,
params=(project_number, "project_dct"),
insert=False,
)
# retrieves project dict (meta data) stored in db
st.session_state["project_dct"] = pickle.loads(
project_dct_retrieved[0][0]
) # converting bytes data to original objet using pickle
st.session_state["project_number"] = project_number
keys_to_keep = [
"unique_ids",
"emp_id",
"project_dct",
"project_name",
"project_number",
"username",
"project_summary_df",
"clone_project_dict",
]
# Clear all keys in st.session_state except the ones to keep
for key in list(st.session_state.keys()):
if key not in keys_to_keep:
del st.session_state[key]
ensure_project_dct_structure(st.session_state["project_dct"], default_dct)
if st.button(
"Load Project",
use_container_width=True,
key="load_project_key",
on_click=load_project_fun,
):
st.success("Project Loded")
# st.rerun() # refresh the page
# st.write(st.session_state['project_dct'])
if "radio_box_index" not in st.session_state:
st.session_state["radio_box_index"] = 0
projct_radio = st.radio(
"Select Options",
[
"Create New Project",
"Modify Project Access",
"Clone Saved Projects",
"Delete Projects",
],
horizontal=True,
index=st.session_state["radio_box_index"],
)
if projct_radio == "Modify Project Access":
with st.expander("Modify Project Access"):
project_number_for_access = st.selectbox(
"Select Project Number",
st.session_state["project_summary_df"]["Project Number"],
)
with st.spinner("Loading"):
users_who_has_access = fetch_users_with_access(
project_number_for_access
)
users_name_who_has_access = [user[1] for user in users_who_has_access]
modified_users_for_access_options = [
details[0]
for user_id, details in st.session_state["unique_ids"].items()
if user_id != st.session_state["emp_id"]
]
users_name_who_has_access = [
name
for name in users_name_who_has_access
if name in modified_users_for_access_options
]
modified_users_for_access = st.multiselect(
"Select or deselect users to grant or revoke access, then click the 'Modify Access' button to submit changes.",
options=modified_users_for_access_options,
default=users_name_who_has_access,
)
if st.button("Modify Access", use_container_width=True):
with st.spinner("Modifying Access"):
update_project_access(
project_number_for_access,
modified_users_for_access,
st.session_state["unique_ids"],
)
if projct_radio == "Create New Project":
with st.expander("Create New Project", expanded=False):
st.session_state["is_create_project_open"] = True
unique_users = [
user[0] for user in st.session_state["unique_ids"].values()
] # fetching unique users who has access to the tool
user_projects = list(
set(st.session_state["project_summary_df"]["Project Name"])
) # fetching corressponding user's projects
st.markdown(
"""
To create a new project, follow the instructions below:
1. **Project Name**:
- It should start with the client name, followed by the username.
- It should not contain special characters except for underscores (`_`) and should not contain spaces.
- Example format: `<client_name>_<username>_<project_name>`
2. **Select User**: Select the user you want to give access to this project.
3. **Create New Project**: Click **Create New Project** once the above details are entered.
**Example**:
- For a client named "ClientA" and a user named "UserX" with a project named "NewCampaign", the project name should be:
`ClientA_UserX_NewCampaign`
"""
)
project_col1 = st.columns(3)
with project_col1[0]:
# API_tables = get_table_names(schema) # load API files
slection_tables = ["NA"]
api_name = st.selectbox("Select API data", slection_tables, index=0)
# data availabe through API
# api_path = API_path_dict[api_name]
with project_col1[1]:
defualt_project_prefix = f"{api_name.split('_mmo_')[0]}_{st.session_state['unique_ids'][st.session_state['emp_id']][0]}_".replace(
" ", "_"
).lower()
if "project_name_box" not in st.session_state:
st.session_state["project_name_box"] = defualt_project_prefix
project_name = st.text_input(
"Enter Project Name", key="project_name_box"
)
warning_box = st.empty()
with project_col1[2]:
allowed_users = st.multiselect(
"Select Users who can access to this Project",
[val for val in unique_users],
)
allowed_users = list(allowed_users)
matching_user_id = []
if len(allowed_users) > 0:
# converting the selection to comma seperated values to store in db
for emp_id, details in st.session_state["unique_ids"].items():
for name in allowed_users:
if name in details:
matching_user_id.append(emp_id)
break
st.button(
"Reset Project Name",
on_click=reset_project_text_box,
help="",
use_container_width=True,
)
create = st.button(
"Create New Project",
use_container_width=True,
help="Project Name should follow naming convention",
)
if create:
if not project_name.lower().startswith(defualt_project_prefix):
with warning_box:
st.warning("Project Name should follow naming convention")
st.stop()
if project_name == defualt_project_prefix:
with warning_box:
st.warning("Cannot name only with prefix")
st.stop()
if project_name in user_projects:
with warning_box:
st.warning("Project already exists please enter new name")
st.stop()
if not (
2 <= len(project_name) <= 50
and bool(re.match("^[A-Za-z0-9_]*$", project_name))
):
# Store the warning message details in session state
with warning_box:
st.warning(
"Please provide a valid project name (2-50 characters, only A-Z, a-z, 0-9, and _)."
)
st.stop()
if contains_sql_keywords_check(project_name):
with warning_box:
st.warning(
"Input contains SQL keywords. Please avoid using SQL commands."
)
st.stop()
else:
pass
with st.spinner("Creating Project"):
new_project()
with warning_box:
st.write("Project Created")
st.session_state["radio_box_index"] = 1
log_message(
"info",
f"Employee {st.session_state['emp_id']} created new project {project_name}.",
"Home",
)
st.rerun()
if projct_radio == "Clone Saved Projects":
with st.expander("Clone Saved Projects", expanded=False):
if len(st.session_state["clone_project_dict"]) == 0:
st.warning("You dont have access to any saved projects")
st.stop()
cols = st.columns(2)
with cols[0]:
owners = list(st.session_state["clone_project_dict"].keys())
owner_name = st.selectbox("Select Owner", owners)
with cols[1]:
project_names = [
project["project_name"]
for project in st.session_state["clone_project_dict"][owner_name]
]
project_name_owner = st.selectbox(
"Select a saved Project available for you",
project_names,
)
defualt_project_prefix = f"{project_name_owner.split('_')[0]}_{st.session_state['unique_ids'][st.session_state['emp_id']][0]}_".replace(
" ", "_"
).lower()
user_projects = list(
set(st.session_state["project_summary_df"]["Project Name"])
)
cloned_project_name = st.text_input(
"Enter Project Name",
value=defualt_project_prefix,
)
warning_box = st.empty()
if st.button(
"Load Project", use_container_width=True, key="load_project_button_key"
):
if not cloned_project_name.lower().startswith(defualt_project_prefix):
with warning_box:
st.warning("Project Name should follow naming conventions")
st.stop()
if cloned_project_name == defualt_project_prefix:
with warning_box:
st.warning("Cannot Name only with Prefix")
st.stop()
if cloned_project_name in user_projects:
with warning_box:
st.warning("Project already exists please enter new name")
st.stop()
with st.spinner("Cloning Project"):
old_prj_id = get_project_id_from_dict(
st.session_state["clone_project_dict"],
owner_name,
project_name_owner,
)
old_metadata = fetch_project_metadata(old_prj_id)
new_prj_id = create_new_project(
st.session_state["emp_id"],
cloned_project_name,
"",
st.session_state["emp_id"],
)
insert_project_metadata(
new_prj_id, old_metadata, st.session_state["emp_id"]
)
update_summary_df()
st.success("Project Cloned")
st.rerun()
if projct_radio == "Delete Projects":
if len(st.session_state["project_summary_df"]) != 0:
with st.expander("Delete Projects", expanded=True):
delete_projects = st.multiselect(
"Select all the projects number who want to delete",
st.session_state["project_summary_df"]["Project Number"],
)
st.warning(
"Projects will be permanently deleted. Other users will not be able to clone them if they have not already done so."
)
if st.button("Delete Projects", use_container_width=True):
if len(delete_projects) > 0:
with st.spinner("Deleting Projects"):
delete_projects_by_ids(delete_projects)
update_summary_df()
st.success("Projects Deleted")
st.rerun()
else:
st.warning("Please select atleast one project number to delete")
if projct_radio == "Download Project PPT":
try:
ppt = create_ppt(
st.session_state["project_name"],
st.session_state["username"],
"panel", # new
)
if ppt is not False:
st.download_button(
"Download",
data=ppt.getvalue(),
file_name=st.session_state["project_name"]
+ " Project Summary.pptx",
use_container_width=True,
)
else:
st.warning("Please make some progress before downloading PPT.")
except Exception as e:
st.warning("PPT Download Faild ")
# new
log_message(
log_type="error", message=f"Error in PPT build: {e}", page_name="Home"
)