Alibrown's picture
Update fundaments/user_handler.py
5724eea verified
# PyFundaments: A Secure Python Architecture
# Copyright 2008-2025 - Volkan Kücükbudak
# Apache License V. 2
# Repo: https://github.com/VolkanSah/PyFundaments
# root/fundaments/user_handler.py
# A Python module for handling user authentication and session management.
import sqlite3
import uuid
from datetime import datetime, timedelta
from passlib.hash import pbkdf2_sha256
import os
class Database:
"""
Handles the SQLite database connection and initialization.
Supports dynamic path selection via environment variables with a
fallback to the local application directory.
"""
def __init__(self, db_name="cms_database.db"):
# 1. Attempt to load the database path from an environment variable
# This allows for flexible configuration in production/Docker environments
env_path = os.getenv("SQLITE_PATH")
if env_path:
# Use the absolute path provided by the environment variable
full_db_path = os.path.abspath(env_path)
else:
# Fallback logic: Locate the 'app' directory relative to this script
# Expected structure: root/fun/user_handler.py -> root/app/
base_path = os.path.dirname(os.path.abspath(__file__))
app_dir = os.path.join(base_path, "..", "app")
full_db_path = os.path.join(app_dir, db_name)
# 2. Ensure the target directory exists before attempting to connect
# SQLite can create the file, but not the parent folders.
db_dir = os.path.dirname(full_db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
# Initialize the connection and cursor
self.conn = sqlite3.connect(full_db_path)
self.cursor = self.conn.cursor()
# Log the active database path for debugging purposes
print(f"Database connected to: {full_db_path}")
def execute(self, query, params=None):
if params is None:
params = []
self.cursor.execute(query, params)
self.conn.commit()
def fetchone(self, query, params=None):
if params is None:
params = []
self.cursor.execute(query, params)
return self.cursor.fetchone()
def fetchall(self, query, params=None):
if params is None:
params = []
self.cursor.execute(query, params)
return self.cursor.fetchall()
def close(self):
self.conn.close()
def setup_tables(self):
"""
Creates the necessary tables for users and sessions.
"""
self.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
account_locked INTEGER NOT NULL DEFAULT 0,
failed_login_attempts INTEGER NOT NULL DEFAULT 0
)
""")
self.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
ip_address TEXT,
user_agent TEXT,
last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
class Security:
"""
Handles secure password hashing and session regeneration.
Using passlib for robust and secure password management.
"""
@staticmethod
def hash_password(password: str) -> str:
"""Hashes a password using PBKDF2 with SHA256."""
return pbkdf2_sha256.hash(password)
@staticmethod
def verify_password(password: str, hashed_password: str) -> bool:
"""Verifies a password against a stored hash."""
return pbkdf2_sha256.verify(password, hashed_password)
@staticmethod
def regenerate_session(session_id: str):
"""
Simulates regenerating a session ID to prevent session fixation.
In a real web framework, this would be a framework-specific function.
"""
print(f"Session regenerated. Old ID: {session_id}")
new_session_id = str(uuid.uuid4())
print(f"New ID: {new_session_id}")
return new_session_id
class UserHandler:
"""
Handles user login, logout, and session validation.
This class mirrors the logic from the user's PHP User class.
"""
def __init__(self, db: Database):
self.db = db
# A simple in-memory session store for this example
self._session = {}
def login(self, username: str, password: str, request_data: dict) -> bool:
"""
Logs in the user by verifying credentials and storing a new session.
:param username: The user's username.
:param password: The user's plain-text password.
:param request_data: A dictionary containing 'ip_address' and 'user_agent'.
:return: True if login is successful, False otherwise.
"""
try:
# Step 1: Find the user in the database
user_data = self.db.fetchone("SELECT id, username, password, is_admin, account_locked, failed_login_attempts FROM users WHERE username = ?", (username,))
if user_data is None:
print(f"Login failed: Username '{username}' not found.")
return False
user = {
'id': user_data[0],
'username': user_data[1],
'password': user_data[2],
'is_admin': user_data[3],
'account_locked': user_data[4],
'failed_login_attempts': user_data[5]
}
# Check if account is locked
if user['account_locked'] == 1:
print(f"Login failed: Account for '{username}' is locked.")
return False
# Step 2: Verify the password
if Security.verify_password(password, user['password']):
print(f"Login successful for user: '{username}'")
# Reset failed login attempts on success
self.reset_failed_attempts(username)
# Step 3: Create a new session record in the database
session_id = str(uuid.uuid4())
ip_address = request_data.get('ip_address', 'unknown')
user_agent = request_data.get('user_agent', 'unknown')
self.db.execute(
"INSERT INTO sessions (id, user_id, ip_address, user_agent) VALUES (?, ?, ?, ?)",
(session_id, user['id'], ip_address, user_agent)
)
# Step 4: Store session data in the in-memory session (or a session store)
self._session = {
'session_id': session_id,
'user_id': user['id'],
'username': user['username'],
'is_admin': user['is_admin']
}
# Security: Regenerate session ID
self._session['session_id'] = Security.regenerate_session(session_id)
return True
else:
print(f"Login failed: Incorrect password for user '{username}'.")
# Increment failed login attempts
self.increment_failed_attempts(username)
return False
except sqlite3.Error as e:
print(f"Database error during login: {e}")
return False
def logout(self) -> bool:
"""
Logs out the current user by deleting the session from the database.
:return: True if logout is successful, False otherwise.
"""
if 'user_id' not in self._session:
print("No active session to log out.")
return False
try:
# Step 1: Delete the session from the database
self.db.execute("DELETE FROM sessions WHERE user_id = ?", (self._session['user_id'],))
# Step 2: Clear the in-memory session data
self._session.clear()
print("User logged out successfully.")
return True
except sqlite3.Error as e:
print(f"Database error during logout: {e}")
return False
def is_logged_in(self) -> bool:
"""
Checks if the current user is logged in.
:return: True if a valid session exists, False otherwise.
"""
if 'user_id' not in self._session:
return False
try:
# Check for the session in the database
session_data = self.db.fetchone(
"SELECT * FROM sessions WHERE id = ? AND user_id = ?",
(self._session['session_id'], self._session['user_id'])
)
return session_data is not None
except sqlite3.Error as e:
print(f"Database error during is_logged_in check: {e}")
return False
def is_admin(self) -> bool:
"""
Checks if the logged-in user is an admin.
:return: True if the user is an admin, False otherwise.
"""
return self._session.get('is_admin', 0) == 1
def validate_session(self, request_data: dict) -> bool:
"""
Validates the current session against IP address and user agent.
:param request_data: A dictionary containing 'ip_address' and 'user_agent'.
:return: True if the session is valid, False otherwise.
"""
if not self.is_logged_in():
return False
try:
ip_address = request_data.get('ip_address', 'unknown')
user_agent = request_data.get('user_agent', 'unknown')
session_data = self.db.fetchone(
"SELECT * FROM sessions WHERE id = ? AND user_id = ? AND ip_address = ? AND user_agent = ?",
(self._session['session_id'], self._session['user_id'], ip_address, user_agent)
)
return session_data is not None
except sqlite3.Error as e:
print(f"Database error during session validation: {e}")
return False
def lock_account(self, username: str):
"""
Locks a user account.
:param username: The username of the account to lock.
"""
try:
self.db.execute("UPDATE users SET account_locked = 1 WHERE username = ?", (username,))
print(f"Account for '{username}' has been locked.")
except sqlite3.Error as e:
print(f"Database error while locking account: {e}")
def reset_failed_attempts(self, username: str):
"""
Resets failed login attempts for a user.
:param username: The username of the account.
"""
try:
self.db.execute("UPDATE users SET failed_login_attempts = 0 WHERE username = ?", (username,))
except sqlite3.Error as e:
print(f"Database error while resetting failed attempts: {e}")
def increment_failed_attempts(self, username: str):
"""
Increments failed login attempts and locks the account if a threshold is met.
:param username: The username of the account.
"""
try:
# Get the current failed attempts
user_data = self.db.fetchone("SELECT failed_login_attempts FROM users WHERE username = ?", (username,))
if user_data:
attempts = user_data[0] + 1
self.db.execute(
"UPDATE users SET failed_login_attempts = ? WHERE username = ?",
(attempts, username)
)
print(f"Failed login attempts for '{username}': {attempts}")
# Check for threshold (e.g., 5 attempts)
if attempts >= 5:
self.lock_account(username)
except sqlite3.Error as e:
print(f"Database error while incrementing failed attempts: {e}")
# --- Example Usage ---
if __name__ == "__main__":
db = Database()
db.setup_tables()
user_handler = UserHandler(db)
# Clean up old test data if it exists
db.execute("DELETE FROM users WHERE username IN (?, ?)", ("testuser", "adminuser"))
db.execute("DELETE FROM sessions")
# 1. Register a new user and an admin user
hashed_password = Security.hash_password("secure_password_123")
db.execute("INSERT INTO users (username, password) VALUES (?, ?)", ("testuser", hashed_password))
db.execute("INSERT INTO users (username, password, is_admin) VALUES (?, ?, 1)", ("adminuser", hashed_password))
print("--- Test 1: Successful Login ---")
# Simulate a web request
request_data = {
'ip_address': '192.168.1.100',
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
login_success = user_handler.login("testuser", "secure_password_123", request_data)
print(f"Login attempt status: {login_success}")
print(f"Is user logged in? {user_handler.is_logged_in()}")
print(f"Is user an admin? {user_handler.is_admin()}")
print(f"Is session valid? {user_handler.validate_session(request_data)}")
print("-" * 20)
# 2. Simulate a logout
print("--- Test 2: Logout ---")
user_handler.logout()
print(f"Is user logged in after logout? {user_handler.is_logged_in()}")
print("-" * 20)
# 3. Simulate a failed login and account lock
print("--- Test 3: Failed Login and Account Lock ---")
# Log in with the wrong password multiple times
for i in range(6):
user_handler.login("testuser", "wrong_password", request_data)
# Now, try to log in with the correct password. It should fail because the account is locked.
print("\nAttempting to log in with correct password after lock:")
login_attempt_after_lock = user_handler.login("testuser", "secure_password_123", request_data)
print(f"Login attempt status: {login_attempt_after_lock}")
print("-" * 20)
# 4. Reset failed attempts for a new login
print("--- Test 4: Resetting failed attempts ---")
user_handler.reset_failed_attempts("testuser")
login_attempt_after_reset = user_handler.login("testuser", "secure_password_123", request_data)
print(f"Login attempt status after reset: {login_attempt_after_reset}")
db.close()
# Optional: Clean up the database file after the run
# OLD
# os.remove("cms_database.db")
# NEW : Clean up the database file after the run:
# os.remove(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "app", "cms_database.db"))