MediaMixOptimization / pages /14_User_Management.py
samkeet's picture
Upload 40 files
00b00eb verified
import streamlit as st
st.set_page_config(layout="wide")
import pandas as pd
from utilities import (
load_local_css,
set_header,
query_excecuter_postgres,
store_hashed_password,
verify_password,
is_pswrd_flag_set,
)
import psycopg2
#
import bcrypt
import re
import time
import random
import string
from log_application import log_message
# setting page config
load_local_css("styles.css")
set_header()
# schema=db_cred['schema']
db_cred = None
def fetch_password_hash_key():
query = f"""
SELECT emp_id
FROM mmo_users
WHERE emp_nam ='admin';
"""
hashkey = query_excecuter_postgres(query, db_cred, insert=False)
return hashkey
def fetch_users_with_access():
# Query to get allowed employee IDs for the project
unique_users_query = f"""
SELECT DISTINCT emp_id, emp_nam, emp_typ
FROM mmo_users;
"""
try:
unique_users_result = query_excecuter_postgres(
unique_users_query, db_cred, insert=False
)
unique_users_result_df = pd.DataFrame(unique_users_result)
unique_users_result_df.columns = ["User ID", "User Name", "User Type"]
st.session_state["df_users"] = unique_users_result_df
except:
st.session_state["df_users"] = pd.DataFrame()
def add_user(emp_id, emp_name, emp_type, admin_id, plain_text_password):
"""
Adds a new user to the mmo_users table if the user does not already exist.
Parameters:
emp_id (str): Employee ID of the user.
emp_name (str): Name of the user.
emp_type (str): Type of the user.
db_cred (dict): Database credentials with keys 'dbname', 'user', 'password', 'host', 'port'.
schema (str): The schema name.
Returns:
str: A success or error message.
"""
# Query to check if the user already exists
check_user_query = f"""
SELECT emp_id FROM mmo_users
WHERE emp_id = ?;
"""
params_check = (emp_id,)
try:
# Check if the user already exists
existing_user = query_excecuter_postgres(
check_user_query, db_cred, params=params_check, insert=False
)
# st.write(existing_user)
if existing_user:
# If user exists, return a warning message
return False
# Query to add a new user to the mmo_users table
else:
hashed_password = bcrypt.hashpw(
plain_text_password.encode("utf-8"), bcrypt.gensalt()
)
# Convert the byte string to a regular string for storage
hashed_password_str = hashed_password.decode("utf-8")
add_user_query = f"""
INSERT INTO mmo_users
(emp_id, emp_nam, emp_typ, crte_dt_tm, crte_by_uid,pswrd_key)
VALUES (?, ?, ?,datetime('now'), ?,?);
"""
params_add = (emp_id, emp_name, emp_type, admin_id, hashed_password_str)
# Execute the insert query
query_excecuter_postgres(
add_user_query, db_cred, params=params_add, insert=True
)
return True
except Exception as e:
return False
# def delete_users_by_names(emp_id):
# # Sanitize and format the list of names for the SQL query
# formatted_ids =tuple(emp_id)
# # Query to delete users based on the employee names
# delete_user_query = f"""
# DELETE FROM mmo_users
# WHERE emp_id IN ?;
# """
# try:
# # Execute the delete query
# query_excecuter_postgres(delete_user_query, db_cred,params=(formatted_ids,), insert=True)
# return f"{len(emp_id)} users deleted successfully."
# except Exception as e:
# st.write(e)
# print(e)
# return f"An error occurred: {e}"
def delete_users_by_names(emp_ids):
"""
Deletes users from the mmo_users table and their associated projects from the mmo_project_meta_data
and mmo_projects tables.
Parameters:
emp_ids (list): A list of employee IDs to delete.
Returns:
str: A success or error message.
"""
# Convert the list of employee IDs into a tuple for the SQL query
formatted_ids = tuple(emp_ids)
# Queries to delete projects and users
delete_projects_meta_query = """
DELETE FROM mmo_project_meta_data
WHERE prj_id IN (
SELECT prj_id FROM mmo_projects
WHERE prj_ownr_id IN ({})
);
""".format(
",".join("?" * len(formatted_ids))
)
delete_projects_query = """
DELETE FROM mmo_projects
WHERE prj_ownr_id IN ({});
""".format(
",".join("?" * len(formatted_ids))
)
delete_user_query = """
DELETE FROM mmo_users
WHERE emp_id IN ({});
""".format(
",".join("?" * len(formatted_ids))
)
try:
# Execute the delete queries using query_excecuter_postgres
query_excecuter_postgres(
delete_projects_meta_query, params=formatted_ids, insert=True
)
query_excecuter_postgres(
delete_projects_query, params=formatted_ids, insert=True
)
query_excecuter_postgres(delete_user_query, params=formatted_ids, insert=True)
return (
f"{len(emp_ids)} users and their associated projects deleted successfully."
)
except Exception as e:
return f"An error occurred: {e}"
def reset_user_name():
st.session_state.user_id = ""
st.session_state.user_name = ""
def contains_sql_keywords_check(user_input):
sql_keywords = [
"SELECT",
"INSERT",
"UPDATE",
"DELETE",
"DROP",
"ALTER",
"CREATE",
"GRANT",
"REVOKE",
"UNION",
"JOIN",
"WHERE",
"HAVING",
"EXEC",
"TRUNCATE",
"REPLACE",
"MERGE",
"DECLARE",
"SHOW",
"FROM",
]
pattern = "|".join(re.escape(keyword) for keyword in sql_keywords)
return re.search(pattern, user_input, re.IGNORECASE)
def update_password_and_flag(user_id, plain_text_password, flag=False):
"""
Hashes a plain text password using bcrypt, converts it to a UTF-8 string, and stores it as text.
Parameters:
plain_text_password (str): The plain text password to be hashed.
db_cred (dict): The database credentials including dbname, user, password, host, and port.
"""
# Hash the plain text password
hashed_password = bcrypt.hashpw(
plain_text_password.encode("utf-8"), bcrypt.gensalt()
)
# Convert the byte string to a regular string for storage
hashed_password_str = hashed_password.decode("utf-8")
# SQL query to update the pswrd_key for the specified user_id
if flag:
query = f"""
UPDATE mmo_users
SET pswrd_key = ?, pswrd_flag = 0
WHERE emp_id = ?;
"""
else:
query = f"""
UPDATE mmo_users
SET pswrd_key = ?
WHERE emp_id = ?;
"""
# Execute the query using the existing query_excecuter_postgres function
query_excecuter_postgres(
query=query, db_cred=db_cred, params=(hashed_password_str, user_id), insert=True
)
def reset_pass_key():
st.session_state["pass_key"] = ""
st.title("User Management")
# hashed_key=fetch_password_hash_key()[0][0]
if "df_users" not in st.session_state:
fetch_users_with_access()
# st.write(hashed_key.encode())
if "unique_ids_admin" not in st.session_state:
unique_users_query = f"""
SELECT DISTINCT emp_id, emp_nam, emp_typ from mmo_users
Where emp_typ= 'admin';
"""
unique_users_result = query_excecuter_postgres(
unique_users_query, db_cred, insert=False
) # retrieves all the users who has access to MMO TOOL
st.session_state["unique_ids_admin"] = {
emp_id: (emp_nam, emp_type) for emp_id, emp_nam, emp_type in unique_users_result
}
if len(st.session_state["unique_ids_admin"]) == 0:
st.error("No admin found in the database!")
st.markdown(
"""
- You can add an admin to the database.
- **Ensure** that you store the passkey securely, as losing it will prevent anyone from accessing the tool.
- Once done, reset the password on the "**Home**" page.
"""
)
emp_id = st.text_input("employee id", key="emp1111").lower()
password = st.text_input("Enter password to access the page", type="password")
if emp_id not in st.session_state["unique_ids_admin"] and len(emp_id) > 4:
if not is_pswrd_flag_set(emp_id):
st.warning("Reset password in Home page to continue")
st.stop()
st.warning(
'Incorrect username or password. If you are using the default password, please reset it on the "Home" page.'
)
st.stop()
# st.write(st.session_state["unique_ids_admin"])
# Check password or if no admin is present
if verify_password(emp_id, password) or len(st.session_state["unique_ids_admin"]) == 0:
st.success("Access Granted", icon="✅")
st.header("All Users")
st.dataframe(
st.session_state["df_users"], use_container_width=True, hide_index=True
)
user_manage = st.radio(
"Select a Option",
["Add New User", "Delete Users", "Manage User Passwords"],
horizontal=True,
)
if user_manage == "Add New User":
with st.expander("", expanded=True):
add_col = st.columns(3)
with add_col[0]:
user_id = st.text_input("Enter User ID", key="user_id").lower()
with add_col[1]:
user_name = st.text_input("Enter User Name", key="user_name").lower()
with add_col[2]:
if len(st.session_state["unique_ids_admin"]) == 0:
user_types_options = ["Admin"]
else:
user_types_options = ["Data Scientist", "Media Planner", "Admin"]
user_type = (
st.selectbox("Select User Type", user_types_options)
.replace(" ", "_")
.lower()
)
warning_box = st.empty()
if "passkey" not in st.session_state:
st.session_state["passkey"] = ""
if len(user_id) < 3 or len(user_name) < 3:
st.session_state["passkey"] = ""
pass_key_col = st.columns(3)
with pass_key_col[0]:
st.markdown(
'<div style="display: flex; position:relative; justify-content: flex-end; align-items: center; height: 100%;">'
'<p style="font-weight: bold; margin: 0; position:absolute; top:4px;">Default Password</p>'
"</div>",
unsafe_allow_html=True,
)
with pass_key_col[1]:
passkey_box = st.empty()
with passkey_box:
st.code(f"{st.session_state['passkey']}", language="text")
st.button(
"Reset Values", on_click=reset_user_name, use_container_width=True
)
if st.button("Add User", use_container_width=True):
if user_id in st.session_state["df_users"]["User ID"]:
with warning_box:
st.warning("User id already exists")
st.stop()
if (
len(user_id) == 0
or len(user_name) == 0
or not user_id.startswith("e")
or len(user_id) < 6
):
with warning_box:
st.warning("Enter a Valid User ID and User Name!")
st.stop()
if contains_sql_keywords_check(user_id):
with warning_box:
st.warning(
"Input contains SQL keywords. Please avoid using SQL commands."
)
st.stop()
else:
pass
if not (2 <= len(user_name) <= 50):
# Store the warning message details in session state
with warning_box:
st.warning(
"Please provide a valid user name (2-50 characters, only A-Z, a-z, 0-9, and _)."
)
st.stop()
if contains_sql_keywords_check(user_name):
with warning_box:
st.warning(
"Input contains SQL keywords. Please avoid using SQL commands."
)
st.stop()
else:
pass
characters = string.ascii_letters + string.digits # Letters and digits
plain_text_password = "".join(
random.choice(characters) for _ in range(10)
)
st.session_state["passkey"] = plain_text_password
if add_user(user_id, user_name, user_type, emp_id, plain_text_password):
with st.spinner("Adding New User"):
update_password_and_flag(user_id, plain_text_password)
fetch_users_with_access()
st.success("User added successfully")
st.rerun()
else:
st.warning(
f"User with emp_id {user_id} already exists in the database."
)
st.stop()
if user_manage == "Delete Users":
with st.expander("", expanded=True):
user_names_to_delete = st.multiselect(
"Select User IDS to Delete", st.session_state["df_users"]["User ID"]
)
if st.button("Delete", use_container_width=True):
delete_users_by_names(user_names_to_delete)
st.success("Users Deleted")
fetch_users_with_access()
st.rerun()
if user_manage == "Manage User Passwords":
with st.expander("**Manage User Passwords**", expanded=True):
st.markdown(
"""
1.Click on "**Reset Password**" to generate a new default password for the selected user.
2. Share the pass key with the corresponding user
""",
unsafe_allow_html=True,
)
user_id = st.selectbox(
"Select A User ID",
st.session_state["df_users"]["User ID"],
on_change=reset_pass_key,
)
if "pass_key" not in st.session_state:
st.session_state["pass_key"] = ""
default_passkey = st.code(
f"Default Password: {st.session_state['pass_key']}", language="text"
)
if st.button("Reset Password", use_container_width=True, key="reset_pass_button_key"):
with st.spinner("Reseting Password"):
characters = (
string.ascii_letters + string.digits
) # Letters and digits
plain_text_password = "".join(
random.choice(characters) for _ in range(10)
)
st.session_state["pass_key"] = plain_text_password
update_password_and_flag(user_id, plain_text_password, flag=True)
time.sleep(1)
del st.session_state["reset_pass_button_key"]
st.rerun()
elif len(password):
st.warning("Wrong user name or password!")