Spaces:
Sleeping
Sleeping
import streamlit as st | |
import mysql.connector | |
import bcrypt | |
import re | |
import datetime | |
import pytz | |
import pandas as pd | |
import plotly.express as px | |
# MySQL Connection | |
def get_database_connection(): | |
connection = mysql.connector.connect( | |
host="gateway01.ap-southeast-1.prod.aws.tidbcloud.com", | |
port=4000, | |
user="37QUb7dvTn3P6E8.root", | |
password="MBAg14V0HaMdxwX0" | |
) | |
return connection | |
# Initialize database and tables | |
def init_database(): | |
connection = get_database_connection() | |
cursor = connection.cursor(buffered=True) | |
cursor.execute("CREATE DATABASE IF NOT EXISTS EmployeePerformance") | |
cursor.execute('USE EmployeePerformance') | |
# Create User_data table | |
cursor.execute('''CREATE TABLE IF NOT EXISTS User_data ( | |
id INT AUTO_INCREMENT PRIMARY KEY, | |
username VARCHAR(50) UNIQUE NOT NULL, | |
password VARCHAR(255) NOT NULL, | |
email VARCHAR(255) UNIQUE NOT NULL, | |
registered_date TIMESTAMP, | |
last_login TIMESTAMP | |
)''') | |
# Create Employee_data table | |
cursor.execute('''CREATE TABLE IF NOT EXISTS Employee_data ( | |
id INT AUTO_INCREMENT PRIMARY KEY, | |
employee_id VARCHAR(50) UNIQUE NOT NULL, | |
name VARCHAR(100) NOT NULL, | |
age INT, | |
last_performance_score FLOAT, | |
current_performance_score FLOAT, | |
future_advancement TEXT, | |
rank INT | |
)''') | |
connection.commit() | |
cursor.close() | |
connection.close() | |
# User authentication functions | |
def username_exists(username): | |
connection = get_database_connection() | |
cursor = connection.cursor(buffered=True) | |
cursor.execute('USE EmployeePerformance') | |
cursor.execute("SELECT * FROM User_data WHERE username = %s", (username,)) | |
result = cursor.fetchone() is not None | |
cursor.close() | |
connection.close() | |
return result | |
def email_exists(email): | |
connection = get_database_connection() | |
cursor = connection.cursor(buffered=True) | |
cursor.execute('USE EmployeePerformance') | |
cursor.execute("SELECT * FROM User_data WHERE email = %s", (email,)) | |
result = cursor.fetchone() is not None | |
cursor.close() | |
connection.close() | |
return result | |
def is_valid_email(email): | |
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
return re.match(pattern, email) is not None | |
def create_user(username, password, email): | |
if username_exists(username): | |
return 'username_exists' | |
if email_exists(email): | |
return 'email_exists' | |
connection = get_database_connection() | |
cursor = connection.cursor(buffered=True) | |
cursor.execute('USE EmployeePerformance') | |
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) | |
registered_date = datetime.datetime.now(pytz.timezone('Asia/Kolkata')) | |
cursor.execute( | |
"INSERT INTO User_data (username, password, email, registered_date) VALUES (%s, %s, %s, %s)", | |
(username, hashed_password, email, registered_date) | |
) | |
connection.commit() | |
cursor.close() | |
connection.close() | |
return 'success' | |
def verify_user(username, password): | |
connection = get_database_connection() | |
cursor = connection.cursor(buffered=True) | |
cursor.execute('USE EmployeePerformance') | |
cursor.execute("SELECT password FROM User_data WHERE username = %s", (username,)) | |
record = cursor.fetchone() | |
if record and bcrypt.checkpw(password.encode('utf-8'), record[0].encode('utf-8')): | |
cursor.execute("UPDATE User_data SET last_login = %s WHERE username = %s", | |
(datetime.datetime.now(pytz.timezone('Asia/Kolkata')), username)) | |
connection.commit() | |
cursor.close() | |
connection.close() | |
return True | |
cursor.close() | |
connection.close() | |
return False | |
# Employee data functions | |
def get_employee_data(): | |
connection = get_database_connection() | |
cursor = connection.cursor(buffered=True) | |
cursor.execute('USE EmployeePerformance') | |
cursor.execute("SELECT * FROM Employee_data") | |
columns = [col[0] for col in cursor.description] | |
employees = [dict(zip(columns, row)) for row in cursor.fetchall()] | |
cursor.close() | |
connection.close() | |
return employees | |
# Streamlit app | |
def main(): | |
st.set_page_config(page_title="Employee Performance Dashboard", layout="wide") | |
init_database() | |
if 'logged_in' not in st.session_state: | |
st.session_state.logged_in = False | |
if not st.session_state.logged_in: | |
login_page() | |
else: | |
home_page() | |
def login_page(): | |
st.title("Login") | |
username = st.text_input("Username") | |
password = st.text_input("Password", type="password") | |
if st.button("Login"): | |
if verify_user(username, password): | |
st.session_state.logged_in = True | |
st.session_state.username = username | |
st.experimental_rerun() | |
else: | |
st.error("Invalid username or password") | |
def home_page(): | |
st.title("Employee Performance Dashboard") | |
st.write(f"Welcome, {st.session_state.username}!") | |
employees = get_employee_data() | |
if not employees: | |
st.warning("No employee data available.") | |
return | |
# Display employee information | |
for employee in employees: | |
st.subheader(f"Employee: {employee['name']}") | |
col1, col2, col3 = st.columns(3) | |
col1.metric("Employee ID", employee['employee_id']) | |
col2.metric("Age", employee['age']) | |
col3.metric("Rank", employee['rank']) | |
col4, col5 = st.columns(2) | |
col4.metric("Last Performance Score", f"{employee['last_performance_score']:.2f}") | |
col5.metric("Current Performance Score", f"{employee['current_performance_score']:.2f}") | |
st.write("Future Advancement:", employee['future_advancement']) | |
st.markdown("---") | |
# Performance visualization | |
st.subheader("Performance Overview") | |
df = pd.DataFrame(employees) | |
fig = px.scatter(df, x="age", y="current_performance_score", size="rank", | |
hover_data=["name", "employee_id"], color="current_performance_score", | |
labels={"current_performance_score": "Current Performance Score"}) | |
st.plotly_chart(fig) | |
if st.button("Logout"): | |
st.session_state.logged_in = False | |
st.experimental_rerun() | |
if __name__ == "__main__": | |
main() |