Spaces:
Sleeping
Sleeping
import streamlit as st | |
st.set_page_config(layout="wide") | |
import pandas as pd | |
from utilities import ( | |
load_local_css, | |
set_header, | |
query_excecuter_postgres, | |
store_hashed_password, | |
verify_password, | |
is_pswrd_flag_set, | |
) | |
import psycopg2 | |
# | |
import bcrypt | |
import re | |
import time | |
import random | |
import string | |
from log_application import log_message | |
# setting page config | |
load_local_css("styles.css") | |
set_header() | |
# schema=db_cred['schema'] | |
db_cred = None | |
def fetch_password_hash_key(): | |
query = f""" | |
SELECT emp_id | |
FROM mmo_users | |
WHERE emp_nam ='admin'; | |
""" | |
hashkey = query_excecuter_postgres(query, db_cred, insert=False) | |
return hashkey | |
def fetch_users_with_access(): | |
# Query to get allowed employee IDs for the project | |
unique_users_query = f""" | |
SELECT DISTINCT emp_id, emp_nam, emp_typ | |
FROM mmo_users; | |
""" | |
try: | |
unique_users_result = query_excecuter_postgres( | |
unique_users_query, db_cred, insert=False | |
) | |
unique_users_result_df = pd.DataFrame(unique_users_result) | |
unique_users_result_df.columns = ["User ID", "User Name", "User Type"] | |
st.session_state["df_users"] = unique_users_result_df | |
except: | |
st.session_state["df_users"] = pd.DataFrame() | |
def add_user(emp_id, emp_name, emp_type, admin_id, plain_text_password): | |
""" | |
Adds a new user to the mmo_users table if the user does not already exist. | |
Parameters: | |
emp_id (str): Employee ID of the user. | |
emp_name (str): Name of the user. | |
emp_type (str): Type of the user. | |
db_cred (dict): Database credentials with keys 'dbname', 'user', 'password', 'host', 'port'. | |
schema (str): The schema name. | |
Returns: | |
str: A success or error message. | |
""" | |
# Query to check if the user already exists | |
check_user_query = f""" | |
SELECT emp_id FROM mmo_users | |
WHERE emp_id = ?; | |
""" | |
params_check = (emp_id,) | |
try: | |
# Check if the user already exists | |
existing_user = query_excecuter_postgres( | |
check_user_query, db_cred, params=params_check, insert=False | |
) | |
# st.write(existing_user) | |
if existing_user: | |
# If user exists, return a warning message | |
return False | |
# Query to add a new user to the mmo_users table | |
else: | |
hashed_password = bcrypt.hashpw( | |
plain_text_password.encode("utf-8"), bcrypt.gensalt() | |
) | |
# Convert the byte string to a regular string for storage | |
hashed_password_str = hashed_password.decode("utf-8") | |
add_user_query = f""" | |
INSERT INTO mmo_users | |
(emp_id, emp_nam, emp_typ, crte_dt_tm, crte_by_uid,pswrd_key) | |
VALUES (?, ?, ?,datetime('now'), ?,?); | |
""" | |
params_add = (emp_id, emp_name, emp_type, admin_id, hashed_password_str) | |
# Execute the insert query | |
query_excecuter_postgres( | |
add_user_query, db_cred, params=params_add, insert=True | |
) | |
return True | |
except Exception as e: | |
return False | |
# def delete_users_by_names(emp_id): | |
# # Sanitize and format the list of names for the SQL query | |
# formatted_ids =tuple(emp_id) | |
# # Query to delete users based on the employee names | |
# delete_user_query = f""" | |
# DELETE FROM mmo_users | |
# WHERE emp_id IN ?; | |
# """ | |
# try: | |
# # Execute the delete query | |
# query_excecuter_postgres(delete_user_query, db_cred,params=(formatted_ids,), insert=True) | |
# return f"{len(emp_id)} users deleted successfully." | |
# except Exception as e: | |
# st.write(e) | |
# print(e) | |
# return f"An error occurred: {e}" | |
def delete_users_by_names(emp_ids): | |
""" | |
Deletes users from the mmo_users table and their associated projects from the mmo_project_meta_data | |
and mmo_projects tables. | |
Parameters: | |
emp_ids (list): A list of employee IDs to delete. | |
Returns: | |
str: A success or error message. | |
""" | |
# Convert the list of employee IDs into a tuple for the SQL query | |
formatted_ids = tuple(emp_ids) | |
# Queries to delete projects and users | |
delete_projects_meta_query = """ | |
DELETE FROM mmo_project_meta_data | |
WHERE prj_id IN ( | |
SELECT prj_id FROM mmo_projects | |
WHERE prj_ownr_id IN ({}) | |
); | |
""".format( | |
",".join("?" * len(formatted_ids)) | |
) | |
delete_projects_query = """ | |
DELETE FROM mmo_projects | |
WHERE prj_ownr_id IN ({}); | |
""".format( | |
",".join("?" * len(formatted_ids)) | |
) | |
delete_user_query = """ | |
DELETE FROM mmo_users | |
WHERE emp_id IN ({}); | |
""".format( | |
",".join("?" * len(formatted_ids)) | |
) | |
try: | |
# Execute the delete queries using query_excecuter_postgres | |
query_excecuter_postgres( | |
delete_projects_meta_query, params=formatted_ids, insert=True | |
) | |
query_excecuter_postgres( | |
delete_projects_query, params=formatted_ids, insert=True | |
) | |
query_excecuter_postgres(delete_user_query, params=formatted_ids, insert=True) | |
return ( | |
f"{len(emp_ids)} users and their associated projects deleted successfully." | |
) | |
except Exception as e: | |
return f"An error occurred: {e}" | |
def reset_user_name(): | |
st.session_state.user_id = "" | |
st.session_state.user_name = "" | |
def contains_sql_keywords_check(user_input): | |
sql_keywords = [ | |
"SELECT", | |
"INSERT", | |
"UPDATE", | |
"DELETE", | |
"DROP", | |
"ALTER", | |
"CREATE", | |
"GRANT", | |
"REVOKE", | |
"UNION", | |
"JOIN", | |
"WHERE", | |
"HAVING", | |
"EXEC", | |
"TRUNCATE", | |
"REPLACE", | |
"MERGE", | |
"DECLARE", | |
"SHOW", | |
"FROM", | |
] | |
pattern = "|".join(re.escape(keyword) for keyword in sql_keywords) | |
return re.search(pattern, user_input, re.IGNORECASE) | |
def update_password_and_flag(user_id, plain_text_password, flag=False): | |
""" | |
Hashes a plain text password using bcrypt, converts it to a UTF-8 string, and stores it as text. | |
Parameters: | |
plain_text_password (str): The plain text password to be hashed. | |
db_cred (dict): The database credentials including dbname, user, password, host, and port. | |
""" | |
# Hash the plain text password | |
hashed_password = bcrypt.hashpw( | |
plain_text_password.encode("utf-8"), bcrypt.gensalt() | |
) | |
# Convert the byte string to a regular string for storage | |
hashed_password_str = hashed_password.decode("utf-8") | |
# SQL query to update the pswrd_key for the specified user_id | |
if flag: | |
query = f""" | |
UPDATE mmo_users | |
SET pswrd_key = ?, pswrd_flag = 0 | |
WHERE emp_id = ?; | |
""" | |
else: | |
query = f""" | |
UPDATE mmo_users | |
SET pswrd_key = ? | |
WHERE emp_id = ?; | |
""" | |
# Execute the query using the existing query_excecuter_postgres function | |
query_excecuter_postgres( | |
query=query, db_cred=db_cred, params=(hashed_password_str, user_id), insert=True | |
) | |
def reset_pass_key(): | |
st.session_state["pass_key"] = "" | |
st.title("User Management") | |
# hashed_key=fetch_password_hash_key()[0][0] | |
if "df_users" not in st.session_state: | |
fetch_users_with_access() | |
# st.write(hashed_key.encode()) | |
if "unique_ids_admin" not in st.session_state: | |
unique_users_query = f""" | |
SELECT DISTINCT emp_id, emp_nam, emp_typ from mmo_users | |
Where emp_typ= 'admin'; | |
""" | |
unique_users_result = query_excecuter_postgres( | |
unique_users_query, db_cred, insert=False | |
) # retrieves all the users who has access to MMO TOOL | |
st.session_state["unique_ids_admin"] = { | |
emp_id: (emp_nam, emp_type) for emp_id, emp_nam, emp_type in unique_users_result | |
} | |
if len(st.session_state["unique_ids_admin"]) == 0: | |
st.error("No admin found in the database!") | |
st.markdown( | |
""" | |
- You can add an admin to the database. | |
- **Ensure** that you store the passkey securely, as losing it will prevent anyone from accessing the tool. | |
- Once done, reset the password on the "**Home**" page. | |
""" | |
) | |
emp_id = st.text_input("employee id", key="emp1111").lower() | |
password = st.text_input("Enter password to access the page", type="password") | |
if emp_id not in st.session_state["unique_ids_admin"] and len(emp_id) > 4: | |
if not is_pswrd_flag_set(emp_id): | |
st.warning("Reset password in Home page to continue") | |
st.stop() | |
st.warning( | |
'Incorrect username or password. If you are using the default password, please reset it on the "Home" page.' | |
) | |
st.stop() | |
# st.write(st.session_state["unique_ids_admin"]) | |
# Check password or if no admin is present | |
if verify_password(emp_id, password) or len(st.session_state["unique_ids_admin"]) == 0: | |
st.success("Access Granted", icon="✅") | |
st.header("All Users") | |
st.dataframe( | |
st.session_state["df_users"], use_container_width=True, hide_index=True | |
) | |
user_manage = st.radio( | |
"Select a Option", | |
["Add New User", "Delete Users", "Manage User Passwords"], | |
horizontal=True, | |
) | |
if user_manage == "Add New User": | |
with st.expander("", expanded=True): | |
add_col = st.columns(3) | |
with add_col[0]: | |
user_id = st.text_input("Enter User ID", key="user_id").lower() | |
with add_col[1]: | |
user_name = st.text_input("Enter User Name", key="user_name").lower() | |
with add_col[2]: | |
if len(st.session_state["unique_ids_admin"]) == 0: | |
user_types_options = ["Admin"] | |
else: | |
user_types_options = ["Data Scientist", "Media Planner", "Admin"] | |
user_type = ( | |
st.selectbox("Select User Type", user_types_options) | |
.replace(" ", "_") | |
.lower() | |
) | |
warning_box = st.empty() | |
if "passkey" not in st.session_state: | |
st.session_state["passkey"] = "" | |
if len(user_id) < 3 or len(user_name) < 3: | |
st.session_state["passkey"] = "" | |
pass_key_col = st.columns(3) | |
with pass_key_col[0]: | |
st.markdown( | |
'<div style="display: flex; position:relative; justify-content: flex-end; align-items: center; height: 100%;">' | |
'<p style="font-weight: bold; margin: 0; position:absolute; top:4px;">Default Password</p>' | |
"</div>", | |
unsafe_allow_html=True, | |
) | |
with pass_key_col[1]: | |
passkey_box = st.empty() | |
with passkey_box: | |
st.code(f"{st.session_state['passkey']}", language="text") | |
st.button( | |
"Reset Values", on_click=reset_user_name, use_container_width=True | |
) | |
if st.button("Add User", use_container_width=True): | |
if user_id in st.session_state["df_users"]["User ID"]: | |
with warning_box: | |
st.warning("User id already exists") | |
st.stop() | |
if ( | |
len(user_id) == 0 | |
or len(user_name) == 0 | |
or not user_id.startswith("e") | |
or len(user_id) < 6 | |
): | |
with warning_box: | |
st.warning("Enter a Valid User ID and User Name!") | |
st.stop() | |
if contains_sql_keywords_check(user_id): | |
with warning_box: | |
st.warning( | |
"Input contains SQL keywords. Please avoid using SQL commands." | |
) | |
st.stop() | |
else: | |
pass | |
if not (2 <= len(user_name) <= 50): | |
# Store the warning message details in session state | |
with warning_box: | |
st.warning( | |
"Please provide a valid user name (2-50 characters, only A-Z, a-z, 0-9, and _)." | |
) | |
st.stop() | |
if contains_sql_keywords_check(user_name): | |
with warning_box: | |
st.warning( | |
"Input contains SQL keywords. Please avoid using SQL commands." | |
) | |
st.stop() | |
else: | |
pass | |
characters = string.ascii_letters + string.digits # Letters and digits | |
plain_text_password = "".join( | |
random.choice(characters) for _ in range(10) | |
) | |
st.session_state["passkey"] = plain_text_password | |
if add_user(user_id, user_name, user_type, emp_id, plain_text_password): | |
with st.spinner("Adding New User"): | |
update_password_and_flag(user_id, plain_text_password) | |
fetch_users_with_access() | |
st.success("User added successfully") | |
st.rerun() | |
else: | |
st.warning( | |
f"User with emp_id {user_id} already exists in the database." | |
) | |
st.stop() | |
if user_manage == "Delete Users": | |
with st.expander("", expanded=True): | |
user_names_to_delete = st.multiselect( | |
"Select User IDS to Delete", st.session_state["df_users"]["User ID"] | |
) | |
if st.button("Delete", use_container_width=True): | |
delete_users_by_names(user_names_to_delete) | |
st.success("Users Deleted") | |
fetch_users_with_access() | |
st.rerun() | |
if user_manage == "Manage User Passwords": | |
with st.expander("**Manage User Passwords**", expanded=True): | |
st.markdown( | |
""" | |
1.Click on "**Reset Password**" to generate a new default password for the selected user. | |
2. Share the pass key with the corresponding user | |
""", | |
unsafe_allow_html=True, | |
) | |
user_id = st.selectbox( | |
"Select A User ID", | |
st.session_state["df_users"]["User ID"], | |
on_change=reset_pass_key, | |
) | |
if "pass_key" not in st.session_state: | |
st.session_state["pass_key"] = "" | |
default_passkey = st.code( | |
f"Default Password: {st.session_state['pass_key']}", language="text" | |
) | |
if st.button("Reset Password", use_container_width=True, key="reset_pass_button_key"): | |
with st.spinner("Reseting Password"): | |
characters = ( | |
string.ascii_letters + string.digits | |
) # Letters and digits | |
plain_text_password = "".join( | |
random.choice(characters) for _ in range(10) | |
) | |
st.session_state["pass_key"] = plain_text_password | |
update_password_and_flag(user_id, plain_text_password, flag=True) | |
time.sleep(1) | |
del st.session_state["reset_pass_button_key"] | |
st.rerun() | |
elif len(password): | |
st.warning("Wrong user name or password!") | |