Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| st.set_page_config(layout="wide") | |
| import pandas as pd | |
| from utilities import ( | |
| load_local_css, | |
| set_header, | |
| query_excecuter_postgres, | |
| store_hashed_password, | |
| verify_password, | |
| is_pswrd_flag_set, | |
| ) | |
| import psycopg2 | |
| # | |
| import bcrypt | |
| import re | |
| import time | |
| import random | |
| import string | |
| from log_application import log_message | |
| # setting page config | |
| load_local_css("styles.css") | |
| set_header() | |
| # schema=db_cred['schema'] | |
| db_cred = None | |
| def fetch_password_hash_key(): | |
| query = f""" | |
| SELECT emp_id | |
| FROM mmo_users | |
| WHERE emp_nam ='admin'; | |
| """ | |
| hashkey = query_excecuter_postgres(query, db_cred, insert=False) | |
| return hashkey | |
| def fetch_users_with_access(): | |
| # Query to get allowed employee IDs for the project | |
| unique_users_query = f""" | |
| SELECT DISTINCT emp_id, emp_nam, emp_typ | |
| FROM mmo_users; | |
| """ | |
| try: | |
| unique_users_result = query_excecuter_postgres( | |
| unique_users_query, db_cred, insert=False | |
| ) | |
| unique_users_result_df = pd.DataFrame(unique_users_result) | |
| unique_users_result_df.columns = ["User ID", "User Name", "User Type"] | |
| st.session_state["df_users"] = unique_users_result_df | |
| except: | |
| st.session_state["df_users"] = pd.DataFrame() | |
| def add_user(emp_id, emp_name, emp_type, admin_id, plain_text_password): | |
| """ | |
| Adds a new user to the mmo_users table if the user does not already exist. | |
| Parameters: | |
| emp_id (str): Employee ID of the user. | |
| emp_name (str): Name of the user. | |
| emp_type (str): Type of the user. | |
| db_cred (dict): Database credentials with keys 'dbname', 'user', 'password', 'host', 'port'. | |
| schema (str): The schema name. | |
| Returns: | |
| str: A success or error message. | |
| """ | |
| # Query to check if the user already exists | |
| check_user_query = f""" | |
| SELECT emp_id FROM mmo_users | |
| WHERE emp_id = ?; | |
| """ | |
| params_check = (emp_id,) | |
| try: | |
| # Check if the user already exists | |
| existing_user = query_excecuter_postgres( | |
| check_user_query, db_cred, params=params_check, insert=False | |
| ) | |
| # st.write(existing_user) | |
| if existing_user: | |
| # If user exists, return a warning message | |
| return False | |
| # Query to add a new user to the mmo_users table | |
| else: | |
| hashed_password = bcrypt.hashpw( | |
| plain_text_password.encode("utf-8"), bcrypt.gensalt() | |
| ) | |
| # Convert the byte string to a regular string for storage | |
| hashed_password_str = hashed_password.decode("utf-8") | |
| add_user_query = f""" | |
| INSERT INTO mmo_users | |
| (emp_id, emp_nam, emp_typ, crte_dt_tm, crte_by_uid,pswrd_key) | |
| VALUES (?, ?, ?,datetime('now'), ?,?); | |
| """ | |
| params_add = (emp_id, emp_name, emp_type, admin_id, hashed_password_str) | |
| # Execute the insert query | |
| query_excecuter_postgres( | |
| add_user_query, db_cred, params=params_add, insert=True | |
| ) | |
| return True | |
| except Exception as e: | |
| return False | |
| # def delete_users_by_names(emp_id): | |
| # # Sanitize and format the list of names for the SQL query | |
| # formatted_ids =tuple(emp_id) | |
| # # Query to delete users based on the employee names | |
| # delete_user_query = f""" | |
| # DELETE FROM mmo_users | |
| # WHERE emp_id IN ?; | |
| # """ | |
| # try: | |
| # # Execute the delete query | |
| # query_excecuter_postgres(delete_user_query, db_cred,params=(formatted_ids,), insert=True) | |
| # return f"{len(emp_id)} users deleted successfully." | |
| # except Exception as e: | |
| # st.write(e) | |
| # print(e) | |
| # return f"An error occurred: {e}" | |
| def delete_users_by_names(emp_ids): | |
| """ | |
| Deletes users from the mmo_users table and their associated projects from the mmo_project_meta_data | |
| and mmo_projects tables. | |
| Parameters: | |
| emp_ids (list): A list of employee IDs to delete. | |
| Returns: | |
| str: A success or error message. | |
| """ | |
| # Convert the list of employee IDs into a tuple for the SQL query | |
| formatted_ids = tuple(emp_ids) | |
| # Queries to delete projects and users | |
| delete_projects_meta_query = """ | |
| DELETE FROM mmo_project_meta_data | |
| WHERE prj_id IN ( | |
| SELECT prj_id FROM mmo_projects | |
| WHERE prj_ownr_id IN ({}) | |
| ); | |
| """.format( | |
| ",".join("?" * len(formatted_ids)) | |
| ) | |
| delete_projects_query = """ | |
| DELETE FROM mmo_projects | |
| WHERE prj_ownr_id IN ({}); | |
| """.format( | |
| ",".join("?" * len(formatted_ids)) | |
| ) | |
| delete_user_query = """ | |
| DELETE FROM mmo_users | |
| WHERE emp_id IN ({}); | |
| """.format( | |
| ",".join("?" * len(formatted_ids)) | |
| ) | |
| try: | |
| # Execute the delete queries using query_excecuter_postgres | |
| query_excecuter_postgres( | |
| delete_projects_meta_query, params=formatted_ids, insert=True | |
| ) | |
| query_excecuter_postgres( | |
| delete_projects_query, params=formatted_ids, insert=True | |
| ) | |
| query_excecuter_postgres(delete_user_query, params=formatted_ids, insert=True) | |
| return ( | |
| f"{len(emp_ids)} users and their associated projects deleted successfully." | |
| ) | |
| except Exception as e: | |
| return f"An error occurred: {e}" | |
| def reset_user_name(): | |
| st.session_state.user_id = "" | |
| st.session_state.user_name = "" | |
| def contains_sql_keywords_check(user_input): | |
| sql_keywords = [ | |
| "SELECT", | |
| "INSERT", | |
| "UPDATE", | |
| "DELETE", | |
| "DROP", | |
| "ALTER", | |
| "CREATE", | |
| "GRANT", | |
| "REVOKE", | |
| "UNION", | |
| "JOIN", | |
| "WHERE", | |
| "HAVING", | |
| "EXEC", | |
| "TRUNCATE", | |
| "REPLACE", | |
| "MERGE", | |
| "DECLARE", | |
| "SHOW", | |
| "FROM", | |
| ] | |
| pattern = "|".join(re.escape(keyword) for keyword in sql_keywords) | |
| return re.search(pattern, user_input, re.IGNORECASE) | |
| def update_password_and_flag(user_id, plain_text_password, flag=False): | |
| """ | |
| Hashes a plain text password using bcrypt, converts it to a UTF-8 string, and stores it as text. | |
| Parameters: | |
| plain_text_password (str): The plain text password to be hashed. | |
| db_cred (dict): The database credentials including dbname, user, password, host, and port. | |
| """ | |
| # Hash the plain text password | |
| hashed_password = bcrypt.hashpw( | |
| plain_text_password.encode("utf-8"), bcrypt.gensalt() | |
| ) | |
| # Convert the byte string to a regular string for storage | |
| hashed_password_str = hashed_password.decode("utf-8") | |
| # SQL query to update the pswrd_key for the specified user_id | |
| if flag: | |
| query = f""" | |
| UPDATE mmo_users | |
| SET pswrd_key = ?, pswrd_flag = 0 | |
| WHERE emp_id = ?; | |
| """ | |
| else: | |
| query = f""" | |
| UPDATE mmo_users | |
| SET pswrd_key = ? | |
| WHERE emp_id = ?; | |
| """ | |
| # Execute the query using the existing query_excecuter_postgres function | |
| query_excecuter_postgres( | |
| query=query, db_cred=db_cred, params=(hashed_password_str, user_id), insert=True | |
| ) | |
| def reset_pass_key(): | |
| st.session_state["pass_key"] = "" | |
| st.title("User Management") | |
| # hashed_key=fetch_password_hash_key()[0][0] | |
| if "df_users" not in st.session_state: | |
| fetch_users_with_access() | |
| # st.write(hashed_key.encode()) | |
| if "unique_ids_admin" not in st.session_state: | |
| unique_users_query = f""" | |
| SELECT DISTINCT emp_id, emp_nam, emp_typ from mmo_users | |
| Where emp_typ= 'admin'; | |
| """ | |
| unique_users_result = query_excecuter_postgres( | |
| unique_users_query, db_cred, insert=False | |
| ) # retrieves all the users who has access to MMO TOOL | |
| st.session_state["unique_ids_admin"] = { | |
| emp_id: (emp_nam, emp_type) for emp_id, emp_nam, emp_type in unique_users_result | |
| } | |
| if len(st.session_state["unique_ids_admin"]) == 0: | |
| st.error("No admin found in the database!") | |
| st.markdown( | |
| """ | |
| - You can add an admin to the database. | |
| - **Ensure** that you store the passkey securely, as losing it will prevent anyone from accessing the tool. | |
| - Once done, reset the password on the "**Home**" page. | |
| """ | |
| ) | |
| emp_id = st.text_input("employee id", key="emp1111").lower() | |
| password = st.text_input("Enter password to access the page", type="password") | |
| if emp_id not in st.session_state["unique_ids_admin"] and len(emp_id) > 4: | |
| if not is_pswrd_flag_set(emp_id): | |
| st.warning("Reset password in Home page to continue") | |
| st.stop() | |
| st.warning( | |
| 'Incorrect username or password. If you are using the default password, please reset it on the "Home" page.' | |
| ) | |
| st.stop() | |
| # st.write(st.session_state["unique_ids_admin"]) | |
| # Check password or if no admin is present | |
| if verify_password(emp_id, password) or len(st.session_state["unique_ids_admin"]) == 0: | |
| st.success("Access Granted", icon="✅") | |
| st.header("All Users") | |
| st.dataframe( | |
| st.session_state["df_users"], use_container_width=True, hide_index=True | |
| ) | |
| user_manage = st.radio( | |
| "Select a Option", | |
| ["Add New User", "Delete Users", "Manage User Passwords"], | |
| horizontal=True, | |
| ) | |
| if user_manage == "Add New User": | |
| with st.expander("", expanded=True): | |
| add_col = st.columns(3) | |
| with add_col[0]: | |
| user_id = st.text_input("Enter User ID", key="user_id").lower() | |
| with add_col[1]: | |
| user_name = st.text_input("Enter User Name", key="user_name").lower() | |
| with add_col[2]: | |
| if len(st.session_state["unique_ids_admin"]) == 0: | |
| user_types_options = ["Admin"] | |
| else: | |
| user_types_options = ["Data Scientist", "Media Planner", "Admin"] | |
| user_type = ( | |
| st.selectbox("Select User Type", user_types_options) | |
| .replace(" ", "_") | |
| .lower() | |
| ) | |
| warning_box = st.empty() | |
| if "passkey" not in st.session_state: | |
| st.session_state["passkey"] = "" | |
| if len(user_id) < 3 or len(user_name) < 3: | |
| st.session_state["passkey"] = "" | |
| pass_key_col = st.columns(3) | |
| with pass_key_col[0]: | |
| st.markdown( | |
| '<div style="display: flex; position:relative; justify-content: flex-end; align-items: center; height: 100%;">' | |
| '<p style="font-weight: bold; margin: 0; position:absolute; top:4px;">Default Password</p>' | |
| "</div>", | |
| unsafe_allow_html=True, | |
| ) | |
| with pass_key_col[1]: | |
| passkey_box = st.empty() | |
| with passkey_box: | |
| st.code(f"{st.session_state['passkey']}", language="text") | |
| st.button( | |
| "Reset Values", on_click=reset_user_name, use_container_width=True | |
| ) | |
| if st.button("Add User", use_container_width=True): | |
| if user_id in st.session_state["df_users"]["User ID"]: | |
| with warning_box: | |
| st.warning("User id already exists") | |
| st.stop() | |
| if ( | |
| len(user_id) == 0 | |
| or len(user_name) == 0 | |
| or not user_id.startswith("e") | |
| or len(user_id) < 6 | |
| ): | |
| with warning_box: | |
| st.warning("Enter a Valid User ID and User Name!") | |
| st.stop() | |
| if contains_sql_keywords_check(user_id): | |
| with warning_box: | |
| st.warning( | |
| "Input contains SQL keywords. Please avoid using SQL commands." | |
| ) | |
| st.stop() | |
| else: | |
| pass | |
| if not (2 <= len(user_name) <= 50): | |
| # Store the warning message details in session state | |
| with warning_box: | |
| st.warning( | |
| "Please provide a valid user name (2-50 characters, only A-Z, a-z, 0-9, and _)." | |
| ) | |
| st.stop() | |
| if contains_sql_keywords_check(user_name): | |
| with warning_box: | |
| st.warning( | |
| "Input contains SQL keywords. Please avoid using SQL commands." | |
| ) | |
| st.stop() | |
| else: | |
| pass | |
| characters = string.ascii_letters + string.digits # Letters and digits | |
| plain_text_password = "".join( | |
| random.choice(characters) for _ in range(10) | |
| ) | |
| st.session_state["passkey"] = plain_text_password | |
| if add_user(user_id, user_name, user_type, emp_id, plain_text_password): | |
| with st.spinner("Adding New User"): | |
| update_password_and_flag(user_id, plain_text_password) | |
| fetch_users_with_access() | |
| st.success("User added successfully") | |
| st.rerun() | |
| else: | |
| st.warning( | |
| f"User with emp_id {user_id} already exists in the database." | |
| ) | |
| st.stop() | |
| if user_manage == "Delete Users": | |
| with st.expander("", expanded=True): | |
| user_names_to_delete = st.multiselect( | |
| "Select User IDS to Delete", st.session_state["df_users"]["User ID"] | |
| ) | |
| if st.button("Delete", use_container_width=True): | |
| delete_users_by_names(user_names_to_delete) | |
| st.success("Users Deleted") | |
| fetch_users_with_access() | |
| st.rerun() | |
| if user_manage == "Manage User Passwords": | |
| with st.expander("**Manage User Passwords**", expanded=True): | |
| st.markdown( | |
| """ | |
| 1.Click on "**Reset Password**" to generate a new default password for the selected user. | |
| 2. Share the pass key with the corresponding user | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| user_id = st.selectbox( | |
| "Select A User ID", | |
| st.session_state["df_users"]["User ID"], | |
| on_change=reset_pass_key, | |
| ) | |
| if "pass_key" not in st.session_state: | |
| st.session_state["pass_key"] = "" | |
| default_passkey = st.code( | |
| f"Default Password: {st.session_state['pass_key']}", language="text" | |
| ) | |
| if st.button("Reset Password", use_container_width=True, key="reset_pass_button_key"): | |
| with st.spinner("Reseting Password"): | |
| characters = ( | |
| string.ascii_letters + string.digits | |
| ) # Letters and digits | |
| plain_text_password = "".join( | |
| random.choice(characters) for _ in range(10) | |
| ) | |
| st.session_state["pass_key"] = plain_text_password | |
| update_password_and_flag(user_id, plain_text_password, flag=True) | |
| time.sleep(1) | |
| del st.session_state["reset_pass_button_key"] | |
| st.rerun() | |
| elif len(password): | |
| st.warning("Wrong user name or password!") | |