Spaces:
Paused
Paused
| """ | |
| Database Performance Optimization Script | |
| Adds indexes and optimizations for improved query performance | |
| """ | |
| import logging | |
| from sqlalchemy import text | |
| from sqlalchemy.engine import Engine | |
| logger = logging.getLogger(__name__) | |
| class DatabaseOptimizer: | |
| """Database performance optimization utilities""" | |
| def __init__(self, engine: Engine): | |
| self.engine = engine | |
| def create_performance_indexes(self) -> None: | |
| """Create performance indexes for common query patterns""" | |
| logger.info("Creating performance indexes...") | |
| with self.engine.connect() as conn: | |
| # Case Management Indexes | |
| self._create_case_indexes(conn) | |
| # User Management Indexes | |
| self._create_user_indexes(conn) | |
| # Transaction Indexes | |
| self._create_transaction_indexes(conn) | |
| # Audit and Activity Indexes | |
| self._create_audit_indexes(conn) | |
| logger.info("Performance indexes created successfully") | |
| def _create_case_indexes(self, conn) -> None: | |
| """Create indexes for case-related queries""" | |
| indexes = [ | |
| # Composite indexes for common filters | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_status_priority_created ON cases(status, priority, created_at DESC)", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_assignee_status ON cases(assignee_id, status) WHERE assignee_id IS NOT NULL", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_priority_risk_score ON cases(priority, risk_score DESC)", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_created_at_status ON cases(created_at DESC, status)", | |
| # Partial indexes for active cases | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_active_only ON cases(created_at DESC) WHERE status != 'closed'", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_high_priority_open ON cases(created_at DESC) WHERE priority IN ('high', 'critical') AND status = 'open'", | |
| # Text search indexes (if PostgreSQL) | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_title_trgm ON cases USING gin (title gin_trgm_ops)", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_description_trgm ON cases USING gin ((case_metadata->>'description') gin_trgm_ops)", | |
| # JSON indexes for metadata queries | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_type ON cases((case_metadata->>'type'))", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_metadata_amount ON cases(CAST(case_metadata->>'amount' AS numeric)) WHERE case_metadata->>'amount' IS NOT NULL", | |
| # Date-based indexes for reporting | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_cases_month_year ON cases(EXTRACT(year FROM created_at), EXTRACT(month FROM created_at))", | |
| ] | |
| for index_sql in indexes: | |
| try: | |
| conn.execute(text(index_sql)) | |
| conn.commit() | |
| logger.info(f"Created index: {index_sql.split('ON')[0].strip()}") | |
| except Exception as e: | |
| logger.warning(f"Failed to create index: {e}") | |
| conn.rollback() | |
| def _create_user_indexes(self, conn) -> None: | |
| """Create indexes for user-related queries""" | |
| indexes = [ | |
| # User authentication and lookup | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email_active ON users(email, is_active) WHERE is_active = true", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_role_active ON users(role, is_active) WHERE is_active = true", | |
| # User activity tracking | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_last_login ON users(last_login DESC) WHERE last_login IS NOT NULL", | |
| # MFA status for security reporting | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_mfa_enabled ON users(mfa_enabled) WHERE mfa_enabled = true", | |
| ] | |
| for index_sql in indexes: | |
| try: | |
| conn.execute(text(index_sql)) | |
| conn.commit() | |
| logger.info(f"Created user index: {index_sql.split('ON')[0].strip()}") | |
| except Exception as e: | |
| logger.warning(f"Failed to create user index: {e}") | |
| conn.rollback() | |
| def _create_transaction_indexes(self, conn) -> None: | |
| """Create indexes for transaction-related queries""" | |
| indexes = [ | |
| # Transaction lookup by case | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_case_amount ON transactions(case_id, amount DESC)", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_date_amount ON transactions(transaction_date DESC, amount)", | |
| # Fraud pattern analysis | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_amount_range ON transactions(amount) WHERE amount > 1000", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_merchant_pattern ON transactions(merchant_name, amount)", | |
| # Time-based analysis | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_transactions_hourly ON transactions(EXTRACT(hour FROM transaction_date), EXTRACT(day FROM transaction_date))", | |
| ] | |
| for index_sql in indexes: | |
| try: | |
| conn.execute(text(index_sql)) | |
| conn.commit() | |
| logger.info(f"Created transaction index: {index_sql.split('ON')[0].strip()}") | |
| except Exception as e: | |
| logger.warning(f"Failed to create transaction index: {e}") | |
| conn.rollback() | |
| def _create_audit_indexes(self, conn) -> None: | |
| """Create indexes for audit and activity queries""" | |
| indexes = [ | |
| # Case activity tracking | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_case_timestamp ON case_activities(case_id, timestamp DESC)", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_user_timestamp ON case_activities(user_id, timestamp DESC)", | |
| # Audit trail queries | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_activities_action_timestamp ON case_activities(action, timestamp DESC)", | |
| "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_case_notes_case_created ON case_notes(case_id, created_at DESC)", | |
| ] | |
| for index_sql in indexes: | |
| try: | |
| conn.execute(text(index_sql)) | |
| conn.commit() | |
| logger.info(f"Created audit index: {index_sql.split('ON')[0].strip()}") | |
| except Exception as e: | |
| logger.warning(f"Failed to create audit index: {e}") | |
| conn.rollback() | |
| def optimize_table_settings(self) -> None: | |
| """Apply table-level optimizations""" | |
| logger.info("Applying table optimizations...") | |
| with self.engine.connect() as conn: | |
| # Analyze tables for query planner optimization | |
| tables_to_analyze = [ | |
| 'cases', 'users', 'transactions', 'case_activities', | |
| 'case_notes', 'evidence', 'fraud_alerts' | |
| ] | |
| for table in tables_to_analyze: | |
| try: | |
| conn.execute(text(f"ANALYZE {table}")) | |
| logger.info(f"Analyzed table: {table}") | |
| except Exception as e: | |
| logger.warning(f"Failed to analyze table {table}: {e}") | |
| # Vacuum large tables (PostgreSQL specific) | |
| try: | |
| for table in tables_to_analyze: | |
| conn.execute(text(f"VACUUM ANALYZE {table}")) | |
| logger.info(f"Vacuum analyzed table: {table}") | |
| except Exception as e: | |
| logger.warning(f"Vacuum analyze not supported or failed: {e}") | |
| conn.commit() | |
| def create_partitioning_strategy(self) -> None: | |
| """Set up table partitioning for large datasets""" | |
| logger.info("Setting up partitioning strategy...") | |
| with self.engine.connect() as conn: | |
| # Partition cases table by month (if large dataset) | |
| try: | |
| # Check if partitioning is needed (table size > 1M rows) | |
| result = conn.execute(text("SELECT COUNT(*) FROM cases")) | |
| case_count = result.fetchone()[0] | |
| if case_count > 1000000: # 1M+ cases | |
| logger.info(f"Large cases table detected ({case_count} records), partitioning recommended") | |
| # Note: Actual partitioning would require schema changes | |
| # This is a recommendation for future implementation | |
| else: | |
| logger.info(f"Cases table size OK ({case_count} records), no partitioning needed") | |
| except Exception as e: | |
| logger.warning(f"Could not check table sizes: {e}") | |
| def create_materialized_views(self) -> None: | |
| """Create materialized views for expensive queries""" | |
| logger.info("Creating materialized views for performance...") | |
| with self.engine.connect() as conn: | |
| # Materialized view for case statistics | |
| try: | |
| conn.execute(text(""" | |
| CREATE MATERIALIZED VIEW IF NOT EXISTS mv_case_stats AS | |
| SELECT | |
| DATE_TRUNC('month', created_at) as month, | |
| status, | |
| priority, | |
| COUNT(*) as case_count, | |
| AVG(risk_score) as avg_risk_score, | |
| SUM(fraud_amount) as total_fraud_amount | |
| FROM cases | |
| GROUP BY DATE_TRUNC('month', created_at), status, priority | |
| ORDER BY month DESC, case_count DESC | |
| """)) | |
| # Create index on materialized view | |
| conn.execute(text("CREATE INDEX IF NOT EXISTS idx_mv_case_stats_month ON mv_case_stats(month DESC)")) | |
| logger.info("Created materialized view: mv_case_stats") | |
| except Exception as e: | |
| logger.warning(f"Failed to create materialized view: {e}") | |
| conn.rollback() | |
| def run_performance_audit(self) -> dict: | |
| """Run performance audit and return recommendations""" | |
| logger.info("Running performance audit...") | |
| audit_results = { | |
| 'table_sizes': {}, | |
| 'index_usage': {}, | |
| 'slow_queries': [], | |
| 'recommendations': [] | |
| } | |
| with self.engine.connect() as conn: | |
| # Check table sizes | |
| tables = ['cases', 'users', 'transactions', 'case_activities'] | |
| for table in tables: | |
| try: | |
| result = conn.execute(text(f"SELECT COUNT(*) FROM {table}")) | |
| count = result.fetchone()[0] | |
| audit_results['table_sizes'][table] = count | |
| except Exception as e: | |
| logger.warning(f"Could not get size for {table}: {e}") | |
| # Generate recommendations based on data | |
| total_cases = audit_results['table_sizes'].get('cases', 0) | |
| if total_cases > 500000: | |
| audit_results['recommendations'].append("Consider partitioning the cases table by month") | |
| if total_cases > 100000: | |
| audit_results['recommendations'].append("Implement query result caching for case listings") | |
| total_activities = audit_results['table_sizes'].get('case_activities', 0) | |
| if total_activities > 1000000: | |
| audit_results['recommendations'].append("Archive old case activities to separate table") | |
| logger.info(f"Performance audit complete. Found {len(audit_results['recommendations'])} recommendations") | |
| return audit_results | |
| def optimize_database(engine: Engine) -> None: | |
| """Main function to optimize database performance""" | |
| optimizer = DatabaseOptimizer(engine) | |
| try: | |
| # Run audit first | |
| audit_results = optimizer.run_performance_audit() | |
| logger.info(f"Audit results: {audit_results}") | |
| # Apply optimizations | |
| optimizer.create_performance_indexes() | |
| optimizer.optimize_table_settings() | |
| optimizer.create_materialized_views() | |
| optimizer.create_partitioning_strategy() | |
| logger.info("Database optimization completed successfully") | |
| except Exception as e: | |
| logger.error(f"Database optimization failed: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| # For standalone execution | |
| from core.database import engine | |
| optimize_database(engine) | |