zenith-backend / scripts /database_optimization.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Database Performance Optimization Script
Adds indexes and optimizations for improved query performance
"""
import logging
from sqlalchemy import text
from sqlalchemy.engine import Engine
logger = logging.getLogger(__name__)
class DatabaseOptimizer:
"""Database performance optimization utilities"""
def __init__(self, engine: Engine):
self.engine = engine
def create_performance_indexes(self) -> None:
"""Create performance indexes for common query patterns"""
logger.info("Creating performance indexes...")
with self.engine.connect() as conn:
# Case Management Indexes
self._create_case_indexes(conn)
# User Management Indexes
self._create_user_indexes(conn)
# Transaction Indexes
self._create_transaction_indexes(conn)
# Audit and Activity Indexes
self._create_audit_indexes(conn)
logger.info("Performance indexes created successfully")
def _create_case_indexes(self, conn) -> None:
"""Create indexes for case-related queries"""
indexes = [
# Composite indexes for common filters
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_status_priority_created ON cases(status, priority, created_at DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_assignee_status ON cases(assignee_id, status) WHERE assignee_id IS NOT NULL",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_priority_risk_score ON cases(priority, risk_score DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_created_at_status ON cases(created_at DESC, status)",
# Partial indexes for active cases
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_active_only ON cases(created_at DESC) WHERE status != 'closed'",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_high_priority_open ON cases(created_at DESC) WHERE priority IN ('high', 'critical') AND status = 'open'",
# Text search indexes (if PostgreSQL)
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_title_trgm ON cases USING gin (title gin_trgm_ops)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_description_trgm ON cases USING gin ((case_metadata->>'description') gin_trgm_ops)",
# JSON indexes for metadata queries
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_type ON cases((case_metadata->>'type'))",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_amount ON cases(CAST(case_metadata->>'amount' AS numeric)) WHERE case_metadata->>'amount' IS NOT NULL",
# Date-based indexes for reporting
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_month_year ON cases(EXTRACT(year FROM created_at), EXTRACT(month FROM created_at))",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create index: {e}")
conn.rollback()
def _create_user_indexes(self, conn) -> None:
"""Create indexes for user-related queries"""
indexes = [
# User authentication and lookup
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email_active ON users(email, is_active) WHERE is_active = true",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_role_active ON users(role, is_active) WHERE is_active = true",
# User activity tracking
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_last_login ON users(last_login DESC) WHERE last_login IS NOT NULL",
# MFA status for security reporting
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_mfa_enabled ON users(mfa_enabled) WHERE mfa_enabled = true",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created user index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create user index: {e}")
conn.rollback()
def _create_transaction_indexes(self, conn) -> None:
"""Create indexes for transaction-related queries"""
indexes = [
# Transaction lookup by case
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_case_amount ON transactions(case_id, amount DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_date_amount ON transactions(transaction_date DESC, amount)",
# Fraud pattern analysis
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_amount_range ON transactions(amount) WHERE amount > 1000",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_merchant_pattern ON transactions(merchant_name, amount)",
# Time-based analysis
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_hourly ON transactions(EXTRACT(hour FROM transaction_date), EXTRACT(day FROM transaction_date))",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created transaction index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create transaction index: {e}")
conn.rollback()
def _create_audit_indexes(self, conn) -> None:
"""Create indexes for audit and activity queries"""
indexes = [
# Case activity tracking
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_case_timestamp ON case_activities(case_id, timestamp DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_user_timestamp ON case_activities(user_id, timestamp DESC)",
# Audit trail queries
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_action_timestamp ON case_activities(action, timestamp DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_notes_case_created ON case_notes(case_id, created_at DESC)",
]
for index_sql in indexes:
try:
conn.execute(text(index_sql))
conn.commit()
logger.info(f"Created audit index: {index_sql.split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Failed to create audit index: {e}")
conn.rollback()
def optimize_table_settings(self) -> None:
"""Apply table-level optimizations"""
logger.info("Applying table optimizations...")
with self.engine.connect() as conn:
# Analyze tables for query planner optimization
tables_to_analyze = [
'cases', 'users', 'transactions', 'case_activities',
'case_notes', 'evidence', 'fraud_alerts'
]
for table in tables_to_analyze:
try:
conn.execute(text(f"ANALYZE {table}"))
logger.info(f"Analyzed table: {table}")
except Exception as e:
logger.warning(f"Failed to analyze table {table}: {e}")
# Vacuum large tables (PostgreSQL specific)
try:
for table in tables_to_analyze:
conn.execute(text(f"VACUUM ANALYZE {table}"))
logger.info(f"Vacuum analyzed table: {table}")
except Exception as e:
logger.warning(f"Vacuum analyze not supported or failed: {e}")
conn.commit()
def create_partitioning_strategy(self) -> None:
"""Set up table partitioning for large datasets"""
logger.info("Setting up partitioning strategy...")
with self.engine.connect() as conn:
# Partition cases table by month (if large dataset)
try:
# Check if partitioning is needed (table size > 1M rows)
result = conn.execute(text("SELECT COUNT(*) FROM cases"))
case_count = result.fetchone()[0]
if case_count > 1000000: # 1M+ cases
logger.info(f"Large cases table detected ({case_count} records), partitioning recommended")
# Note: Actual partitioning would require schema changes
# This is a recommendation for future implementation
else:
logger.info(f"Cases table size OK ({case_count} records), no partitioning needed")
except Exception as e:
logger.warning(f"Could not check table sizes: {e}")
def create_materialized_views(self) -> None:
"""Create materialized views for expensive queries"""
logger.info("Creating materialized views for performance...")
with self.engine.connect() as conn:
# Materialized view for case statistics
try:
conn.execute(text("""
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_case_stats AS
SELECT
DATE_TRUNC('month', created_at) as month,
status,
priority,
COUNT(*) as case_count,
AVG(risk_score) as avg_risk_score,
SUM(fraud_amount) as total_fraud_amount
FROM cases
GROUP BY DATE_TRUNC('month', created_at), status, priority
ORDER BY month DESC, case_count DESC
"""))
# Create index on materialized view
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_mv_case_stats_month ON mv_case_stats(month DESC)"))
logger.info("Created materialized view: mv_case_stats")
except Exception as e:
logger.warning(f"Failed to create materialized view: {e}")
conn.rollback()
def run_performance_audit(self) -> dict:
"""Run performance audit and return recommendations"""
logger.info("Running performance audit...")
audit_results = {
'table_sizes': {},
'index_usage': {},
'slow_queries': [],
'recommendations': []
}
with self.engine.connect() as conn:
# Check table sizes
tables = ['cases', 'users', 'transactions', 'case_activities']
for table in tables:
try:
result = conn.execute(text(f"SELECT COUNT(*) FROM {table}"))
count = result.fetchone()[0]
audit_results['table_sizes'][table] = count
except Exception as e:
logger.warning(f"Could not get size for {table}: {e}")
# Generate recommendations based on data
total_cases = audit_results['table_sizes'].get('cases', 0)
if total_cases > 500000:
audit_results['recommendations'].append("Consider partitioning the cases table by month")
if total_cases > 100000:
audit_results['recommendations'].append("Implement query result caching for case listings")
total_activities = audit_results['table_sizes'].get('case_activities', 0)
if total_activities > 1000000:
audit_results['recommendations'].append("Archive old case activities to separate table")
logger.info(f"Performance audit complete. Found {len(audit_results['recommendations'])} recommendations")
return audit_results
def optimize_database(engine: Engine) -> None:
"""Main function to optimize database performance"""
optimizer = DatabaseOptimizer(engine)
try:
# Run audit first
audit_results = optimizer.run_performance_audit()
logger.info(f"Audit results: {audit_results}")
# Apply optimizations
optimizer.create_performance_indexes()
optimizer.optimize_table_settings()
optimizer.create_materialized_views()
optimizer.create_partitioning_strategy()
logger.info("Database optimization completed successfully")
except Exception as e:
logger.error(f"Database optimization failed: {e}")
raise
if __name__ == "__main__":
# For standalone execution
from core.database import engine
optimize_database(engine)