import streamlit as st st.set_page_config(layout="wide") import pandas as pd from utilities import ( load_local_css, set_header, query_excecuter_postgres, store_hashed_password, verify_password, is_pswrd_flag_set, ) import psycopg2 # import bcrypt import re import time import random import string from log_application import log_message # setting page config load_local_css("styles.css") set_header() # schema=db_cred['schema'] db_cred = None def fetch_password_hash_key(): query = f""" SELECT emp_id FROM mmo_users WHERE emp_nam ='admin'; """ hashkey = query_excecuter_postgres(query, db_cred, insert=False) return hashkey def fetch_users_with_access(): # Query to get allowed employee IDs for the project unique_users_query = f""" SELECT DISTINCT emp_id, emp_nam, emp_typ FROM mmo_users; """ try: unique_users_result = query_excecuter_postgres( unique_users_query, db_cred, insert=False ) unique_users_result_df = pd.DataFrame(unique_users_result) unique_users_result_df.columns = ["User ID", "User Name", "User Type"] st.session_state["df_users"] = unique_users_result_df except: st.session_state["df_users"] = pd.DataFrame() def add_user(emp_id, emp_name, emp_type, admin_id, plain_text_password): """ Adds a new user to the mmo_users table if the user does not already exist. Parameters: emp_id (str): Employee ID of the user. emp_name (str): Name of the user. emp_type (str): Type of the user. db_cred (dict): Database credentials with keys 'dbname', 'user', 'password', 'host', 'port'. schema (str): The schema name. Returns: str: A success or error message. """ # Query to check if the user already exists check_user_query = f""" SELECT emp_id FROM mmo_users WHERE emp_id = ?; """ params_check = (emp_id,) try: # Check if the user already exists existing_user = query_excecuter_postgres( check_user_query, db_cred, params=params_check, insert=False ) # st.write(existing_user) if existing_user: # If user exists, return a warning message return False # Query to add a new user to the mmo_users table else: hashed_password = bcrypt.hashpw( plain_text_password.encode("utf-8"), bcrypt.gensalt() ) # Convert the byte string to a regular string for storage hashed_password_str = hashed_password.decode("utf-8") add_user_query = f""" INSERT INTO mmo_users (emp_id, emp_nam, emp_typ, crte_dt_tm, crte_by_uid,pswrd_key) VALUES (?, ?, ?,datetime('now'), ?,?); """ params_add = (emp_id, emp_name, emp_type, admin_id, hashed_password_str) # Execute the insert query query_excecuter_postgres( add_user_query, db_cred, params=params_add, insert=True ) return True except Exception as e: return False # def delete_users_by_names(emp_id): # # Sanitize and format the list of names for the SQL query # formatted_ids =tuple(emp_id) # # Query to delete users based on the employee names # delete_user_query = f""" # DELETE FROM mmo_users # WHERE emp_id IN ?; # """ # try: # # Execute the delete query # query_excecuter_postgres(delete_user_query, db_cred,params=(formatted_ids,), insert=True) # return f"{len(emp_id)} users deleted successfully." # except Exception as e: # st.write(e) # print(e) # return f"An error occurred: {e}" def delete_users_by_names(emp_ids): """ Deletes users from the mmo_users table and their associated projects from the mmo_project_meta_data and mmo_projects tables. Parameters: emp_ids (list): A list of employee IDs to delete. Returns: str: A success or error message. """ # Convert the list of employee IDs into a tuple for the SQL query formatted_ids = tuple(emp_ids) # Queries to delete projects and users delete_projects_meta_query = """ DELETE FROM mmo_project_meta_data WHERE prj_id IN ( SELECT prj_id FROM mmo_projects WHERE prj_ownr_id IN ({}) ); """.format( ",".join("?" * len(formatted_ids)) ) delete_projects_query = """ DELETE FROM mmo_projects WHERE prj_ownr_id IN ({}); """.format( ",".join("?" * len(formatted_ids)) ) delete_user_query = """ DELETE FROM mmo_users WHERE emp_id IN ({}); """.format( ",".join("?" * len(formatted_ids)) ) try: # Execute the delete queries using query_excecuter_postgres query_excecuter_postgres( delete_projects_meta_query, params=formatted_ids, insert=True ) query_excecuter_postgres( delete_projects_query, params=formatted_ids, insert=True ) query_excecuter_postgres(delete_user_query, params=formatted_ids, insert=True) return ( f"{len(emp_ids)} users and their associated projects deleted successfully." ) except Exception as e: return f"An error occurred: {e}" def reset_user_name(): st.session_state.user_id = "" st.session_state.user_name = "" def contains_sql_keywords_check(user_input): sql_keywords = [ "SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE", "GRANT", "REVOKE", "UNION", "JOIN", "WHERE", "HAVING", "EXEC", "TRUNCATE", "REPLACE", "MERGE", "DECLARE", "SHOW", "FROM", ] pattern = "|".join(re.escape(keyword) for keyword in sql_keywords) return re.search(pattern, user_input, re.IGNORECASE) def update_password_and_flag(user_id, plain_text_password, flag=False): """ Hashes a plain text password using bcrypt, converts it to a UTF-8 string, and stores it as text. Parameters: plain_text_password (str): The plain text password to be hashed. db_cred (dict): The database credentials including dbname, user, password, host, and port. """ # Hash the plain text password hashed_password = bcrypt.hashpw( plain_text_password.encode("utf-8"), bcrypt.gensalt() ) # Convert the byte string to a regular string for storage hashed_password_str = hashed_password.decode("utf-8") # SQL query to update the pswrd_key for the specified user_id if flag: query = f""" UPDATE mmo_users SET pswrd_key = ?, pswrd_flag = 0 WHERE emp_id = ?; """ else: query = f""" UPDATE mmo_users SET pswrd_key = ? WHERE emp_id = ?; """ # Execute the query using the existing query_excecuter_postgres function query_excecuter_postgres( query=query, db_cred=db_cred, params=(hashed_password_str, user_id), insert=True ) def reset_pass_key(): st.session_state["pass_key"] = "" st.title("User Management") # hashed_key=fetch_password_hash_key()[0][0] if "df_users" not in st.session_state: fetch_users_with_access() # st.write(hashed_key.encode()) if "unique_ids_admin" not in st.session_state: unique_users_query = f""" SELECT DISTINCT emp_id, emp_nam, emp_typ from mmo_users Where emp_typ= 'admin'; """ unique_users_result = query_excecuter_postgres( unique_users_query, db_cred, insert=False ) # retrieves all the users who has access to MMO TOOL st.session_state["unique_ids_admin"] = { emp_id: (emp_nam, emp_type) for emp_id, emp_nam, emp_type in unique_users_result } if len(st.session_state["unique_ids_admin"]) == 0: st.error("No admin found in the database!") st.markdown( """ - You can add an admin to the database. - **Ensure** that you store the passkey securely, as losing it will prevent anyone from accessing the tool. - Once done, reset the password on the "**Home**" page. """ ) emp_id = st.text_input("employee id", key="emp1111").lower() password = st.text_input("Enter password to access the page", type="password") if emp_id not in st.session_state["unique_ids_admin"] and len(emp_id) > 4: if not is_pswrd_flag_set(emp_id): st.warning("Reset password in Home page to continue") st.stop() st.warning( 'Incorrect username or password. If you are using the default password, please reset it on the "Home" page.' ) st.stop() # st.write(st.session_state["unique_ids_admin"]) # Check password or if no admin is present if verify_password(emp_id, password) or len(st.session_state["unique_ids_admin"]) == 0: st.success("Access Granted", icon="✅") st.header("All Users") st.dataframe( st.session_state["df_users"], use_container_width=True, hide_index=True ) user_manage = st.radio( "Select a Option", ["Add New User", "Delete Users", "Manage User Passwords"], horizontal=True, ) if user_manage == "Add New User": with st.expander("", expanded=True): add_col = st.columns(3) with add_col[0]: user_id = st.text_input("Enter User ID", key="user_id").lower() with add_col[1]: user_name = st.text_input("Enter User Name", key="user_name").lower() with add_col[2]: if len(st.session_state["unique_ids_admin"]) == 0: user_types_options = ["Admin"] else: user_types_options = ["Data Scientist", "Media Planner", "Admin"] user_type = ( st.selectbox("Select User Type", user_types_options) .replace(" ", "_") .lower() ) warning_box = st.empty() if "passkey" not in st.session_state: st.session_state["passkey"] = "" if len(user_id) < 3 or len(user_name) < 3: st.session_state["passkey"] = "" pass_key_col = st.columns(3) with pass_key_col[0]: st.markdown( '
' '

Default Password

' "
", unsafe_allow_html=True, ) with pass_key_col[1]: passkey_box = st.empty() with passkey_box: st.code(f"{st.session_state['passkey']}", language="text") st.button( "Reset Values", on_click=reset_user_name, use_container_width=True ) if st.button("Add User", use_container_width=True): if user_id in st.session_state["df_users"]["User ID"]: with warning_box: st.warning("User id already exists") st.stop() if ( len(user_id) == 0 or len(user_name) == 0 or not user_id.startswith("e") or len(user_id) < 6 ): with warning_box: st.warning("Enter a Valid User ID and User Name!") st.stop() if contains_sql_keywords_check(user_id): with warning_box: st.warning( "Input contains SQL keywords. Please avoid using SQL commands." ) st.stop() else: pass if not (2 <= len(user_name) <= 50): # Store the warning message details in session state with warning_box: st.warning( "Please provide a valid user name (2-50 characters, only A-Z, a-z, 0-9, and _)." ) st.stop() if contains_sql_keywords_check(user_name): with warning_box: st.warning( "Input contains SQL keywords. Please avoid using SQL commands." ) st.stop() else: pass characters = string.ascii_letters + string.digits # Letters and digits plain_text_password = "".join( random.choice(characters) for _ in range(10) ) st.session_state["passkey"] = plain_text_password if add_user(user_id, user_name, user_type, emp_id, plain_text_password): with st.spinner("Adding New User"): update_password_and_flag(user_id, plain_text_password) fetch_users_with_access() st.success("User added successfully") st.rerun() else: st.warning( f"User with emp_id {user_id} already exists in the database." ) st.stop() if user_manage == "Delete Users": with st.expander("", expanded=True): user_names_to_delete = st.multiselect( "Select User IDS to Delete", st.session_state["df_users"]["User ID"] ) if st.button("Delete", use_container_width=True): delete_users_by_names(user_names_to_delete) st.success("Users Deleted") fetch_users_with_access() st.rerun() if user_manage == "Manage User Passwords": with st.expander("**Manage User Passwords**", expanded=True): st.markdown( """ 1.Click on "**Reset Password**" to generate a new default password for the selected user. 2. Share the pass key with the corresponding user """, unsafe_allow_html=True, ) user_id = st.selectbox( "Select A User ID", st.session_state["df_users"]["User ID"], on_change=reset_pass_key, ) if "pass_key" not in st.session_state: st.session_state["pass_key"] = "" default_passkey = st.code( f"Default Password: {st.session_state['pass_key']}", language="text" ) if st.button("Reset Password", use_container_width=True, key="reset_pass_button_key"): with st.spinner("Reseting Password"): characters = ( string.ascii_letters + string.digits ) # Letters and digits plain_text_password = "".join( random.choice(characters) for _ in range(10) ) st.session_state["pass_key"] = plain_text_password update_password_and_flag(user_id, plain_text_password, flag=True) time.sleep(1) del st.session_state["reset_pass_button_key"] st.rerun() elif len(password): st.warning("Wrong user name or password!")