SmartHeal-Agentic-AI / src /auth_manager.py
SmartHeal's picture
Upload 33 files
185c377 verified
raw
history blame
5.64 kB
import logging
from datetime import datetime
from src.database_manager import DatabaseManager
class AuthManager:
def __init__(self):
self.db_manager = DatabaseManager()
def authenticate_user(self, username, password):
"""Authenticate user and return user data"""
try:
# Find user by username
user = self.db_manager.execute_query_one(
"SELECT id, username, email, name, role, org FROM users WHERE username = %s",
(username,)
)
if not user:
return None
# Check password (plaintext comparison as per requirements)
user_password = self.db_manager.execute_query_one(
"SELECT password FROM users WHERE username = %s",
(username,)
)
if not user_password or user_password['password'] != password:
return None
# Update last login
self.db_manager.execute_query(
"UPDATE users SET last_login = %s WHERE id = %s",
(datetime.now(), user['id'])
)
return user
except Exception as e:
logging.error(f"Authentication error: {e}")
return None
def create_user(self, username, email, password, name, role, org_name="", phone="",
country_code="", department="", location="", organization_id=None):
"""Create a new user account"""
try:
# Check if username or email already exists
existing_user = self.db_manager.execute_query_one(
"SELECT id FROM users WHERE username = %s OR email = %s",
(username, email)
)
if existing_user:
return False
# Handle organization creation
if role == 'organization':
# Create organization entry first
org_result = self.db_manager.execute_query(
"""INSERT INTO organizations (name, email, phone, country_code, department, location, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(org_name or name, email, phone, country_code, department, location, datetime.now())
)
if not org_result:
return False
# Get the organization ID
org_id = self.db_manager.execute_query_one(
"SELECT id FROM organizations WHERE email = %s ORDER BY created_at DESC LIMIT 1",
(email,)
)
organization_id = org_id['id'] if org_id else None
# Create user entry
user_result = self.db_manager.execute_query(
"""INSERT INTO users (username, email, password, name, role, org, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(username, email, password, name, role, organization_id, datetime.now())
)
return bool(user_result)
except Exception as e:
logging.error(f"User creation error: {e}")
return False
def get_organizations(self):
"""Get list of all organizations"""
try:
organizations = self.db_manager.execute_query(
"SELECT id, name FROM organizations ORDER BY name",
fetch=True
)
return organizations or []
except Exception as e:
logging.error(f"Error fetching organizations: {e}")
return []
def get_organization_practitioners(self, organization_id):
"""Get practitioners for an organization"""
try:
practitioners = self.db_manager.execute_query(
"""SELECT id, username, name, email, created_at, last_login
FROM users WHERE org = %s AND role = 'practitioner'
ORDER BY created_at DESC""",
(organization_id,),
fetch=True
)
return practitioners or []
except Exception as e:
logging.error(f"Error fetching practitioners: {e}")
return []
def update_user_profile(self, user_id, name=None, email=None):
"""Update user profile information"""
try:
if name:
self.db_manager.execute_query(
"UPDATE users SET name = %s WHERE id = %s",
(name, user_id)
)
if email:
# Check if email is already taken by another user
existing = self.db_manager.execute_query_one(
"SELECT id FROM users WHERE email = %s AND id != %s",
(email, user_id)
)
if not existing:
self.db_manager.execute_query(
"UPDATE users SET email = %s WHERE id = %s",
(email, user_id)
)
return True
else:
return False
return True
except Exception as e:
logging.error(f"Error updating user profile: {e}")
return False