import sqlite3 import hashlib import pandas as pd from typing import Optional import os from threading import Lock class DatabaseManager: def __init__(self, db_path=":memory:"): self.db_path = db_path self._lock = Lock() # Thread safety self.init_db() def init_db(self): """Initialize database tables""" with self._lock: # Ensure directory exists if using file-based DB if self.db_path != ":memory:": os.makedirs(os.path.dirname(self.db_path), exist_ok=True) conn = sqlite3.connect(self.db_path, check_same_thread=False) cur = conn.cursor() # Users table cur.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, email TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Transactions table cur.execute(""" CREATE TABLE IF NOT EXISTS transactions ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, transaction_id TEXT, timestamp TEXT, amount REAL, description TEXT, merchant TEXT, category TEXT, is_flagged INTEGER DEFAULT 0, risk_score REAL DEFAULT 0.0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ) """) conn.commit() conn.close() print(f"✅ Database initialized at: {self.db_path}") def _hash_password(self, password: str) -> str: """Hash password using SHA256""" return hashlib.sha256(password.encode('utf-8')).hexdigest() def create_user(self, username: str, password: str, email: str = None) -> bool: """Create a new user""" if not username or not password: print("❌ Username and password required") return False try: with self._lock: conn = sqlite3.connect(self.db_path, check_same_thread=False) cur = conn.cursor() # Check if user already exists cur.execute("SELECT id FROM users WHERE username = ?", (username,)) if cur.fetchone(): print(f"❌ User '{username}' already exists") conn.close() return False # Create new user pwd_hash = self._hash_password(password) cur.execute( "INSERT INTO users (username, password_hash, email) VALUES (?, ?, ?)", (username, pwd_hash, email) ) conn.commit() user_id = cur.lastrowid conn.close() print(f"✅ User '{username}' created successfully with ID: {user_id}") return True except sqlite3.IntegrityError as e: print(f"❌ Database integrity error: {e}") return False except Exception as e: print(f"❌ Error creating user: {e}") return False def authenticate_user(self, username: str, password: str) -> Optional[int]: """Authenticate user and return user ID if successful""" if not username or not password: print("❌ Username and password required") return None try: with self._lock: conn = sqlite3.connect(self.db_path, check_same_thread=False) cur = conn.cursor() pwd_hash = self._hash_password(password) print(f"🔍 Authenticating user: '{username}'") print(f"🔑 Password hash: {pwd_hash[:10]}...") cur.execute( "SELECT id, username FROM users WHERE username = ? AND password_hash = ?", (username, pwd_hash) ) row = cur.fetchone() conn.close() if row: user_id = row[0] print(f"✅ Authentication successful for user '{username}' (ID: {user_id})") return user_id else: # Debug: Check if user exists with different password conn = sqlite3.connect(self.db_path, check_same_thread=False) cur = conn.cursor() cur.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,)) debug_row = cur.fetchone() conn.close() if debug_row: print(f"❌ User '{username}' exists but password incorrect") print(f" Expected hash: {debug_row[1][:10]}...") print(f" Provided hash: {pwd_hash[:10]}...") else: print(f"❌ User '{username}' not found") return None except Exception as e: print(f"❌ Authentication error: {e}") return None def list_users(self): """Debug method to list all users""" try: with self._lock: conn = sqlite3.connect(self.db_path, check_same_thread=False) cur = conn.cursor() cur.execute("SELECT id, username, email, created_at FROM users") rows = cur.fetchall() conn.close() print("📋 All users in database:") for row in rows: print(f" ID: {row[0]}, Username: {row[1]}, Email: {row[2]}, Created: {row[3]}") return rows except Exception as e: print(f"❌ Error listing users: {e}") return [] def store_transactions(self, user_id: int, df: pd.DataFrame): """Store transactions for a user""" try: with self._lock: conn = sqlite3.connect(self.db_path, check_same_thread=False) df = df.copy() df['user_id'] = user_id # Ensure required columns exist required_cols = ['transaction_id', 'timestamp', 'amount', 'description', 'merchant'] for col in required_cols: if col not in df.columns: df[col] = '' df.to_sql('transactions', conn, if_exists='append', index=False) conn.commit() conn.close() print(f"✅ Stored {len(df)} transactions for user {user_id}") except Exception as e: print(f"❌ Error storing transactions: {e}") def get_transactions(self, user_id: int, days: int = 30): """Get transactions for a user""" try: with self._lock: conn = sqlite3.connect(self.db_path, check_same_thread=False) cur = conn.cursor() cur.execute( "SELECT * FROM transactions WHERE user_id = ? ORDER BY timestamp DESC", (user_id,) ) rows = cur.fetchall() cols = [desc[0] for desc in cur.description] conn.close() return pd.DataFrame(rows, columns=cols) except Exception as e: print(f"❌ Error getting transactions: {e}") return pd.DataFrame() def reset_database(self): """Reset database (for testing)""" try: with self._lock: conn = sqlite3.connect(self.db_path, check_same_thread=False) cur = conn.cursor() cur.execute("DROP TABLE IF EXISTS users") cur.execute("DROP TABLE IF EXISTS transactions") conn.commit() conn.close() self.init_db() print("🔄 Database reset successfully") except Exception as e: print(f"❌ Error resetting database: {e}")