fraud-detector-app / database.py
soupstick's picture
fix: update authenticator file
ff87b10
import sqlite3
import hashlib
import pandas as pd
from typing import Optional
import os
from threading import Lock
class DatabaseManager:
def __init__(self, db_path=":memory:"):
self.db_path = db_path
self._lock = Lock() # Thread safety
self.init_db()
def init_db(self):
"""Initialize database tables"""
with self._lock:
# Ensure directory exists if using file-based DB
if self.db_path != ":memory:":
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
conn = sqlite3.connect(self.db_path, check_same_thread=False)
cur = conn.cursor()
# Users table
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Transactions table
cur.execute("""
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
transaction_id TEXT,
timestamp TEXT,
amount REAL,
description TEXT,
merchant TEXT,
category TEXT,
is_flagged INTEGER DEFAULT 0,
risk_score REAL DEFAULT 0.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
conn.commit()
conn.close()
print(f"βœ… Database initialized at: {self.db_path}")
def _hash_password(self, password: str) -> str:
"""Hash password using SHA256"""
return hashlib.sha256(password.encode('utf-8')).hexdigest()
def create_user(self, username: str, password: str, email: str = None) -> bool:
"""Create a new user"""
if not username or not password:
print("❌ Username and password required")
return False
try:
with self._lock:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
cur = conn.cursor()
# Check if user already exists
cur.execute("SELECT id FROM users WHERE username = ?", (username,))
if cur.fetchone():
print(f"❌ User '{username}' already exists")
conn.close()
return False
# Create new user
pwd_hash = self._hash_password(password)
cur.execute(
"INSERT INTO users (username, password_hash, email) VALUES (?, ?, ?)",
(username, pwd_hash, email)
)
conn.commit()
user_id = cur.lastrowid
conn.close()
print(f"βœ… User '{username}' created successfully with ID: {user_id}")
return True
except sqlite3.IntegrityError as e:
print(f"❌ Database integrity error: {e}")
return False
except Exception as e:
print(f"❌ Error creating user: {e}")
return False
def authenticate_user(self, username: str, password: str) -> Optional[int]:
"""Authenticate user and return user ID if successful"""
if not username or not password:
print("❌ Username and password required")
return None
try:
with self._lock:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
cur = conn.cursor()
pwd_hash = self._hash_password(password)
print(f"πŸ” Authenticating user: '{username}'")
print(f"πŸ”‘ Password hash: {pwd_hash[:10]}...")
cur.execute(
"SELECT id, username FROM users WHERE username = ? AND password_hash = ?",
(username, pwd_hash)
)
row = cur.fetchone()
conn.close()
if row:
user_id = row[0]
print(f"βœ… Authentication successful for user '{username}' (ID: {user_id})")
return user_id
else:
# Debug: Check if user exists with different password
conn = sqlite3.connect(self.db_path, check_same_thread=False)
cur = conn.cursor()
cur.execute("SELECT id, password_hash FROM users WHERE username = ?", (username,))
debug_row = cur.fetchone()
conn.close()
if debug_row:
print(f"❌ User '{username}' exists but password incorrect")
print(f" Expected hash: {debug_row[1][:10]}...")
print(f" Provided hash: {pwd_hash[:10]}...")
else:
print(f"❌ User '{username}' not found")
return None
except Exception as e:
print(f"❌ Authentication error: {e}")
return None
def list_users(self):
"""Debug method to list all users"""
try:
with self._lock:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
cur = conn.cursor()
cur.execute("SELECT id, username, email, created_at FROM users")
rows = cur.fetchall()
conn.close()
print("πŸ“‹ All users in database:")
for row in rows:
print(f" ID: {row[0]}, Username: {row[1]}, Email: {row[2]}, Created: {row[3]}")
return rows
except Exception as e:
print(f"❌ Error listing users: {e}")
return []
def store_transactions(self, user_id: int, df: pd.DataFrame):
"""Store transactions for a user"""
try:
with self._lock:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
df = df.copy()
df['user_id'] = user_id
# Ensure required columns exist
required_cols = ['transaction_id', 'timestamp', 'amount', 'description', 'merchant']
for col in required_cols:
if col not in df.columns:
df[col] = ''
df.to_sql('transactions', conn, if_exists='append', index=False)
conn.commit()
conn.close()
print(f"βœ… Stored {len(df)} transactions for user {user_id}")
except Exception as e:
print(f"❌ Error storing transactions: {e}")
def get_transactions(self, user_id: int, days: int = 30):
"""Get transactions for a user"""
try:
with self._lock:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
cur = conn.cursor()
cur.execute(
"SELECT * FROM transactions WHERE user_id = ? ORDER BY timestamp DESC",
(user_id,)
)
rows = cur.fetchall()
cols = [desc[0] for desc in cur.description]
conn.close()
return pd.DataFrame(rows, columns=cols)
except Exception as e:
print(f"❌ Error getting transactions: {e}")
return pd.DataFrame()
def reset_database(self):
"""Reset database (for testing)"""
try:
with self._lock:
conn = sqlite3.connect(self.db_path, check_same_thread=False)
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS users")
cur.execute("DROP TABLE IF EXISTS transactions")
conn.commit()
conn.close()
self.init_db()
print("πŸ”„ Database reset successfully")
except Exception as e:
print(f"❌ Error resetting database: {e}")