Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Advanced Performance Profiler | |
| Real-time performance analysis and bottleneck detection | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| import tracemalloc | |
| from contextlib import asynccontextmanager | |
| from dataclasses import asdict, dataclass | |
| from datetime import datetime, timedelta | |
| from typing import Any, Dict, List, Optional | |
| import psutil | |
| logger = logging.getLogger(__name__) | |
| class PerformanceMetric: | |
| """Performance metric data structure""" | |
| name: str | |
| category: str # cpu, memory, io, network, api, database | |
| value: float | |
| unit: str | |
| threshold: Optional[float] | |
| status: str # optimal, warning, critical | |
| timestamp: datetime | |
| details: Dict[str, Any] | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for JSON serialization""" | |
| result = asdict(self) | |
| result["timestamp"] = self.timestamp.isoformat() | |
| return result | |
| class AdvancedPerformanceProfiler: | |
| """Advanced performance monitoring and profiling""" | |
| def __init__(self): | |
| self.start_time = datetime.now() | |
| self.metrics_history = [] | |
| self.profilers = {} | |
| self.memory_snapshots = [] | |
| # Performance thresholds | |
| self.thresholds = { | |
| "api_response_time": 500.0, # ms | |
| "database_query_time": 100.0, # ms | |
| "memory_usage": 512.0, # MB | |
| "cpu_usage": 70.0, # % | |
| "disk_io": 50.0, # MB/s | |
| "network_latency": 100.0, # ms | |
| } | |
| # Enable memory tracing | |
| tracemalloc.start() | |
| async def profile_function(self, function_name: str, category: str = "general"): | |
| """Context manager for profiling individual functions""" | |
| start_time = time.time() | |
| start_memory = ( | |
| tracemalloc.get_traced_memory()[0] if tracemalloc.is_tracing() else 0 | |
| ) | |
| try: | |
| yield | |
| finally: | |
| end_time = time.time() | |
| end_memory = ( | |
| tracemalloc.get_traced_memory()[0] if tracemalloc.is_tracing() else 0 | |
| ) | |
| execution_time_ms = (end_time - start_time) * 1000 | |
| memory_diff_mb = (end_memory - start_memory) / (1024 * 1024) | |
| # Determine status based on thresholds | |
| if category == "api": | |
| status = ( | |
| "optimal" | |
| if execution_time_ms < self.thresholds["api_response_time"] | |
| else ( | |
| "warning" | |
| if execution_time_ms < self.thresholds["api_response_time"] * 2 | |
| else "critical" | |
| ) | |
| ) | |
| elif category == "database": | |
| status = ( | |
| "optimal" | |
| if execution_time_ms < self.thresholds["database_query_time"] | |
| else ( | |
| "warning" | |
| if execution_time_ms | |
| < self.thresholds["database_query_time"] * 2 | |
| else "critical" | |
| ) | |
| ) | |
| else: | |
| status = "optimal" # Default for general functions | |
| metric = PerformanceMetric( | |
| name=f"{function_name}_execution_time", | |
| category=category, | |
| value=execution_time_ms, | |
| unit="milliseconds", | |
| threshold=self.thresholds.get( | |
| f"{category}_response_time", self.thresholds["api_response_time"] | |
| ), | |
| status=status, | |
| timestamp=datetime.now(), | |
| details={ | |
| "memory_diff_mb": memory_diff_mb, | |
| "start_memory": start_memory, | |
| "end_memory": end_memory, | |
| }, | |
| ) | |
| self.metrics_history.append(metric) | |
| async def profile_database_query(self, query_type: str, query: str): | |
| """Profile database query performance""" | |
| start_time = time.time() | |
| try: | |
| yield | |
| finally: | |
| end_time = time.time() | |
| execution_time_ms = (end_time - start_time) * 1000 | |
| status = ( | |
| "optimal" | |
| if execution_time_ms < self.thresholds["database_query_time"] | |
| else ( | |
| "warning" | |
| if execution_time_ms < self.thresholds["database_query_time"] * 2 | |
| else "critical" | |
| ) | |
| ) | |
| metric = PerformanceMetric( | |
| name=f"database_query_{query_type}", | |
| category="database", | |
| value=execution_time_ms, | |
| unit="milliseconds", | |
| threshold=self.thresholds["database_query_time"], | |
| status=status, | |
| timestamp=datetime.now(), | |
| details={ | |
| "query_type": query_type, | |
| "query_preview": query[:100] + "..." if len(query) > 100 else query, | |
| "query_length": len(query), | |
| }, | |
| ) | |
| self.metrics_history.append(metric) | |
| async def collect_system_performance(self) -> List[PerformanceMetric]: | |
| """Collect real-time system performance metrics""" | |
| metrics = [] | |
| current_time = datetime.now() | |
| # CPU Usage with breakdown | |
| cpu_percent = psutil.cpu_percent(interval=0.1) | |
| cpu_per_core = psutil.cpu_percent(interval=0.1, percpu=True) | |
| metrics.append( | |
| PerformanceMetric( | |
| name="cpu_usage", | |
| category="cpu", | |
| value=cpu_percent, | |
| unit="percent", | |
| threshold=self.thresholds["cpu_usage"], | |
| status=( | |
| "optimal" | |
| if cpu_percent < self.thresholds["cpu_usage"] | |
| else "warning" if cpu_percent < 90 else "critical" | |
| ), | |
| timestamp=current_time, | |
| details={ | |
| "cores": psutil.cpu_count(), | |
| "usage_per_core": cpu_per_core, | |
| "load_avg": ( | |
| psutil.getloadavg() if hasattr(psutil, "getloadavg") else None | |
| ), | |
| }, | |
| ) | |
| ) | |
| # Memory Usage with breakdown | |
| memory = psutil.virtual_memory() | |
| metrics.append( | |
| PerformanceMetric( | |
| name="memory_usage", | |
| category="memory", | |
| value=memory.percent, | |
| unit="percent", | |
| threshold=self.thresholds["memory_usage"] | |
| * 100 | |
| / ( | |
| memory.total / (1024 * 1024 * 1024) | |
| ), # Dynamic threshold based on total memory | |
| status=( | |
| "optimal" | |
| if memory.percent < 80 | |
| else "warning" if memory.percent < 90 else "critical" | |
| ), | |
| timestamp=current_time, | |
| details={ | |
| "total_gb": round(memory.total / (1024**3), 2), | |
| "available_gb": round(memory.available / (1024**3), 2), | |
| "used_gb": round(memory.used / (1024**3), 2), | |
| "swap_total_gb": round(psutil.swap_memory().total / (1024**3), 2), | |
| "swap_used_gb": round(psutil.swap_memory().used / (1024**3), 2), | |
| }, | |
| ) | |
| ) | |
| # Disk I/O | |
| try: | |
| disk_io = psutil.disk_io_counters() | |
| read_mb_s = ( | |
| disk_io.read_bytes / (1024 * 1024) | |
| if hasattr(disk_io, "read_bytes") | |
| else 0 | |
| ) | |
| write_mb_s = ( | |
| disk_io.write_bytes / (1024 * 1024) | |
| if hasattr(disk_io, "write_bytes") | |
| else 0 | |
| ) | |
| total_io_mb_s = read_mb_s + write_mb_s | |
| metrics.append( | |
| PerformanceMetric( | |
| name="disk_io", | |
| category="io", | |
| value=total_io_mb_s, | |
| unit="mb/s", | |
| threshold=self.thresholds["disk_io"], | |
| status=( | |
| "optimal" | |
| if total_io_mb_s < self.thresholds["disk_io"] | |
| else ( | |
| "warning" | |
| if total_io_mb_s < self.thresholds["disk_io"] * 2 | |
| else "critical" | |
| ) | |
| ), | |
| timestamp=current_time, | |
| details={ | |
| "read_mb_s": read_mb_s, | |
| "write_mb_s": write_mb_s, | |
| "read_count": getattr(disk_io, "read_count", 0), | |
| "write_count": getattr(disk_io, "write_count", 0), | |
| "read_time_ms": getattr(disk_io, "read_time", 0), | |
| "write_time_ms": getattr(disk_io, "write_time", 0), | |
| }, | |
| ) | |
| ) | |
| except Exception: | |
| pass # Skip if not available | |
| # Network I/O | |
| try: | |
| network_io = psutil.net_io_counters() | |
| metrics.append( | |
| PerformanceMetric( | |
| name="network_io", | |
| category="network", | |
| value={ | |
| "bytes_sent": network_io.bytes_sent, | |
| "bytes_recv": network_io.bytes_recv, | |
| "packets_sent": network_io.packets_sent, | |
| "packets_recv": network_io.packets_recv, | |
| "errin": network_io.errin, | |
| "errout": network_io.errout, | |
| "dropin": network_io.dropin, | |
| "dropout": network_io.dropout, | |
| }, | |
| unit="bytes", | |
| threshold=None, # Network I/O is informational | |
| status="optimal", | |
| timestamp=current_time, | |
| details={}, | |
| ) | |
| ) | |
| except Exception: | |
| pass # Skip if not available | |
| # Process Information | |
| process = psutil.Process() | |
| metrics.append( | |
| PerformanceMetric( | |
| name="process_performance", | |
| category="system", | |
| value={ | |
| "cpu_percent": process.cpu_percent(), | |
| "memory_percent": process.memory_percent(), | |
| "num_threads": process.num_threads(), | |
| "file_descriptors": ( | |
| process.num_fds() if hasattr(process, "num_fds") else None | |
| ), | |
| "context_switches": ( | |
| process.num_ctx_switches() | |
| if hasattr(process, "num_ctx_switches") | |
| else None | |
| ), | |
| }, | |
| unit="info", | |
| threshold=None, | |
| status="optimal", | |
| timestamp=current_time, | |
| details={ | |
| "pid": process.pid, | |
| "create_time": process.create_time(), | |
| "status": process.status(), | |
| "cmdline": process.cmdline(), | |
| }, | |
| ) | |
| ) | |
| return metrics | |
| async def collect_api_performance( | |
| self, request_data: Dict[str, Any] | |
| ) -> PerformanceMetric: | |
| """Collect API request performance metrics""" | |
| current_time = datetime.now() | |
| # Extract performance data from request | |
| endpoint = request_data.get("endpoint", "unknown") | |
| method = request_data.get("method", "GET") | |
| response_time = request_data.get("response_time", 0) | |
| status_code = request_data.get("status_code", 200) | |
| # Determine status based on response time | |
| status = ( | |
| "optimal" | |
| if response_time < self.thresholds["api_response_time"] | |
| else ( | |
| "warning" | |
| if response_time < self.thresholds["api_response_time"] * 2 | |
| else "critical" | |
| ) | |
| ) | |
| metric = PerformanceMetric( | |
| name=f"api_request_{endpoint}_{method}", | |
| category="api", | |
| value=response_time, | |
| unit="milliseconds", | |
| threshold=self.thresholds["api_response_time"], | |
| status=status, | |
| timestamp=current_time, | |
| details={ | |
| "endpoint": endpoint, | |
| "method": method, | |
| "status_code": status_code, | |
| "request_size": request_data.get("request_size", 0), | |
| "response_size": request_data.get("response_size", 0), | |
| "user_agent": request_data.get("user_agent", ""), | |
| "ip_address": request_data.get("ip_address", ""), | |
| }, | |
| ) | |
| self.metrics_history.append(metric) | |
| return metric | |
| def generate_performance_report(self) -> Dict[str, Any]: | |
| """Generate comprehensive performance analysis report""" | |
| now = datetime.now() | |
| # Categorize metrics | |
| api_metrics = [m for m in self.metrics_history if m.category == "api"] | |
| database_metrics = [m for m in self.metrics_history if m.category == "database"] | |
| cpu_metrics = [m for m in self.metrics_history if m.category == "cpu"] | |
| memory_metrics = [m for m in self.metrics_history if m.category == "memory"] | |
| # Calculate statistics | |
| def calculate_stats( | |
| metrics: List[PerformanceMetric], key: str = "value" | |
| ) -> Dict[str, float]: | |
| if not metrics: | |
| return {} | |
| values = [getattr(m, key) for m in metrics] | |
| if isinstance(values[0], dict): | |
| # Handle complex values (like network I/O) | |
| return {} | |
| return { | |
| "count": len(values), | |
| "avg": sum(values) / len(values), | |
| "min": min(values), | |
| "max": max(values), | |
| "median": sorted(values)[len(values) // 2], | |
| "p95": ( | |
| sorted(values)[int(len(values) * 0.95)] | |
| if len(values) > 20 | |
| else max(values) | |
| ), | |
| "p99": ( | |
| sorted(values)[int(len(values) * 0.99)] | |
| if len(values) > 20 | |
| else max(values) | |
| ), | |
| } | |
| # API Performance Analysis | |
| api_stats = calculate_stats(api_metrics) | |
| # Database Performance Analysis | |
| db_stats = calculate_stats(database_metrics) | |
| # System Performance Analysis | |
| cpu_stats = calculate_stats(cpu_metrics) | |
| memory_stats = calculate_stats(memory_metrics) | |
| # Identify bottlenecks | |
| bottlenecks = [] | |
| # API bottlenecks | |
| if api_metrics: | |
| slow_requests = [ | |
| m for m in api_metrics if m.status in ["warning", "critical"] | |
| ] | |
| if slow_requests: | |
| bottlenecks.append( | |
| { | |
| "type": "api_performance", | |
| "severity": ( | |
| "high" | |
| if any(m.status == "critical" for m in slow_requests) | |
| else "medium" | |
| ), | |
| "description": f"{len(slow_requests)} slow API requests detected", | |
| "affected_endpoints": list( | |
| set( | |
| m.details.get("endpoint", "unknown") | |
| for m in slow_requests | |
| ) | |
| ), | |
| "recommendation": "Optimize slow endpoints and add caching", | |
| } | |
| ) | |
| # Database bottlenecks | |
| if database_metrics: | |
| slow_queries = [ | |
| m for m in database_metrics if m.status in ["warning", "critical"] | |
| ] | |
| if slow_queries: | |
| bottlenecks.append( | |
| { | |
| "type": "database_performance", | |
| "severity": ( | |
| "high" | |
| if any(m.status == "critical" for m in slow_queries) | |
| else "medium" | |
| ), | |
| "description": f"{len(slow_queries)} slow database queries detected", | |
| "affected_queries": list( | |
| set( | |
| m.details.get("query_type", "unknown") | |
| for m in slow_queries | |
| ) | |
| ), | |
| "recommendation": "Add database indexes and optimize queries", | |
| } | |
| ) | |
| # Resource bottlenecks | |
| if cpu_metrics: | |
| high_cpu = [ | |
| m for m in cpu_metrics if m.value > self.thresholds["cpu_usage"] | |
| ] | |
| if high_cpu: | |
| bottlenecks.append( | |
| { | |
| "type": "cpu_usage", | |
| "severity": "high", | |
| "description": f"CPU usage exceeds {self.thresholds['cpu_usage']}% threshold", | |
| "max_cpu": max(m.value for m in high_cpu), | |
| "recommendation": "Scale horizontally or optimize CPU-intensive operations", | |
| } | |
| ) | |
| if memory_metrics: | |
| high_memory = [m for m in memory_metrics if m.value > 80] # 80% threshold | |
| if high_memory: | |
| bottlenecks.append( | |
| { | |
| "type": "memory_usage", | |
| "severity": "high", | |
| "description": "Memory usage exceeds 80%", | |
| "max_memory": max(m.value for m in high_memory), | |
| "recommendation": "Optimize memory usage or increase available memory", | |
| } | |
| ) | |
| # Generate optimization recommendations | |
| recommendations = [] | |
| if bottlenecks: | |
| for bottleneck in bottlenecks: | |
| recommendations.append(bottleneck.get("recommendation", "")) | |
| # Memory efficiency recommendations | |
| if memory_metrics: | |
| avg_memory = sum(m.value for m in memory_metrics) / len(memory_metrics) | |
| if avg_memory > 60: | |
| recommendations.append( | |
| "Implement memory pooling and optimize data structures" | |
| ) | |
| # API caching recommendations | |
| if api_metrics: | |
| avg_response = sum(m.value for m in api_metrics) / len(api_metrics) | |
| if avg_response > self.thresholds["api_response_time"]: | |
| recommendations.append( | |
| "Implement API response caching and query optimization" | |
| ) | |
| report = { | |
| "overall_performance_score": self._calculate_performance_score(), | |
| "timestamp": now.isoformat(), | |
| "analysis_period_hours": (now - self.start_time).total_seconds() / 3600, | |
| "summary": { | |
| "total_metrics_collected": len(self.metrics_history), | |
| "api_requests": len(api_metrics), | |
| "database_queries": len(database_metrics), | |
| "bottlenecks_detected": len(bottlenecks), | |
| "critical_issues": len( | |
| [b for b in bottlenecks if b.get("severity") == "high"] | |
| ), | |
| }, | |
| "performance_by_category": { | |
| "api": { | |
| "statistics": api_stats, | |
| "slow_requests": len( | |
| [m for m in api_metrics if m.status in ["warning", "critical"]] | |
| ), | |
| "avg_response_time": api_stats.get("avg", 0), | |
| }, | |
| "database": { | |
| "statistics": db_stats, | |
| "slow_queries": len( | |
| [ | |
| m | |
| for m in database_metrics | |
| if m.status in ["warning", "critical"] | |
| ] | |
| ), | |
| "avg_query_time": db_stats.get("avg", 0), | |
| }, | |
| "system": { | |
| "cpu": cpu_stats, | |
| "memory": memory_stats, | |
| "current_cpu": cpu_metrics[-1].to_dict() if cpu_metrics else None, | |
| "current_memory": ( | |
| memory_metrics[-1].to_dict() if memory_metrics else None | |
| ), | |
| }, | |
| }, | |
| "bottlenecks": bottlenecks, | |
| "recommendations": list(set(recommendations)), | |
| "optimization_opportunities": self._identify_optimization_opportunities(), | |
| "historical_trends": self._analyze_trends(), | |
| } | |
| return report | |
| def _calculate_performance_score(self) -> float: | |
| """Calculate overall performance score (0-100)""" | |
| if not self.metrics_history: | |
| return 100.0 | |
| recent_metrics = self.metrics_history[-100:] # Last 100 metrics | |
| # Count metrics by status | |
| optimal_count = sum(1 for m in recent_metrics if m.status == "optimal") | |
| warning_count = sum(1 for m in recent_metrics if m.status == "warning") | |
| critical_count = sum(1 for m in recent_metrics if m.status == "critical") | |
| total = len(recent_metrics) | |
| # Calculate weighted score | |
| score = (optimal_count * 100 + warning_count * 50 + critical_count * 0) / total | |
| return round(score, 2) | |
| def _identify_optimization_opportunities(self) -> List[Dict[str, Any]]: | |
| """Identify specific optimization opportunities""" | |
| opportunities = [] | |
| # Analyze API patterns | |
| api_metrics = [m for m in self.metrics_history if m.category == "api"] | |
| if api_metrics: | |
| endpoints = {} | |
| for metric in api_metrics: | |
| endpoint = metric.details.get("endpoint", "unknown") | |
| if endpoint not in endpoints: | |
| endpoints[endpoint] = [] | |
| endpoints[endpoint].append(metric.value) | |
| # Find slow endpoints | |
| slow_endpoints = [] | |
| for endpoint, times in endpoints.items(): | |
| avg_time = sum(times) / len(times) | |
| if avg_time > self.thresholds["api_response_time"]: | |
| slow_endpoints.append( | |
| { | |
| "endpoint": endpoint, | |
| "avg_response_time": avg_time, | |
| "request_count": len(times), | |
| "optimization": ( | |
| "add_caching" if avg_time > 1000 else "optimize_query" | |
| ), | |
| } | |
| ) | |
| if slow_endpoints: | |
| opportunities.append( | |
| { | |
| "category": "api_optimization", | |
| "description": f"{len(slow_endpoints)} endpoints need optimization", | |
| "details": slow_endpoints, | |
| "potential_impact": "high", | |
| } | |
| ) | |
| # Analyze memory patterns | |
| memory_metrics = [m for m in self.metrics_history if m.category == "memory"] | |
| if memory_metrics: | |
| memory_trend = [ | |
| m.value for m in memory_metrics[-20:] | |
| ] # Last 20 memory metrics | |
| if len(memory_trend) > 1: | |
| memory_growth = memory_trend[-1] - memory_trend[0] | |
| if memory_growth > 10: # 10% growth | |
| opportunities.append( | |
| { | |
| "category": "memory_optimization", | |
| "description": f"Memory usage increased by {memory_growth:.1f}%", | |
| "details": { | |
| "growth_percent": memory_growth, | |
| "period": "last 20 samples", | |
| }, | |
| "potential_impact": "medium", | |
| } | |
| ) | |
| return opportunities | |
| def _analyze_trends(self) -> Dict[str, Any]: | |
| """Analyze performance trends over time""" | |
| if len(self.metrics_history) < 10: | |
| return {"message": "Insufficient data for trend analysis"} | |
| # Analyze last hour of data | |
| now = datetime.now() | |
| one_hour_ago = now - timedelta(hours=1) | |
| recent_metrics = [m for m in self.metrics_history if m.timestamp > one_hour_ago] | |
| if not recent_metrics: | |
| return {"message": "No recent data available"} | |
| # Group by category | |
| api_trends = [m for m in recent_metrics if m.category == "api"] | |
| db_trends = [m for m in recent_metrics if m.category == "database"] | |
| cpu_trends = [m for m in recent_metrics if m.category == "cpu"] | |
| # Calculate trend direction | |
| def calculate_trend(values: List[float]) -> str: | |
| if len(values) < 2: | |
| return "stable" | |
| # Simple linear regression to determine trend | |
| x = list(range(len(values))) | |
| n = len(values) | |
| sum_x = sum(x) | |
| sum_y = sum(values) | |
| sum_xy = sum(x[i] * values[i] for i in range(n)) | |
| sum_x2 = sum(x[i] * x[i] for i in range(n)) | |
| if n * sum_x2 - sum_x * sum_x == 0: | |
| return "stable" | |
| slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) | |
| if abs(slope) < 0.01: | |
| return "stable" | |
| elif slope > 0: | |
| return "increasing" | |
| else: | |
| return "decreasing" | |
| trends = { | |
| "analysis_period": "last_hour", | |
| "metrics_analyzed": len(recent_metrics), | |
| "api_response_trend": ( | |
| calculate_trend([m.value for m in api_trends]) | |
| if api_trends | |
| else "no_data" | |
| ), | |
| "database_query_trend": ( | |
| calculate_trend([m.value for m in db_trends]) | |
| if db_trends | |
| else "no_data" | |
| ), | |
| "cpu_usage_trend": ( | |
| calculate_trend([m.value for m in cpu_trends]) | |
| if cpu_trends | |
| else "no_data" | |
| ), | |
| } | |
| return trends | |
| async def start_continuous_monitoring(self, interval: int = 60): | |
| """Start continuous performance monitoring""" | |
| logger.info(f"Starting continuous performance monitoring (interval: {interval}s)") | |
| while True: | |
| try: | |
| # Collect system metrics | |
| system_metrics = await self.collect_system_performance() | |
| self.metrics_history.extend(system_metrics) | |
| # Keep only last 1000 metrics to prevent memory bloat | |
| if len(self.metrics_history) > 1000: | |
| self.metrics_history = self.metrics_history[-1000:] | |
| # Generate and log summary | |
| if len(self.metrics_history) % 60 == 0: # Every hour | |
| report = self.generate_performance_report() | |
| # Log critical issues | |
| critical_bottlenecks = [ | |
| b for b in report["bottlenecks"] if b.get("severity") == "high" | |
| ] | |
| if critical_bottlenecks: | |
| logger.warning( | |
| f"CRITICAL PERFORMANCE ISSUES DETECTED: {len(critical_bottlenecks)}" | |
| ) | |
| for bottleneck in critical_bottlenecks: | |
| logger.warning( | |
| f" - {bottleneck['type']}: {bottleneck['description']}" | |
| ) | |
| await asyncio.sleep(interval) | |
| except Exception as e: | |
| logger.error(f"Error in performance monitoring: {e}") | |
| await asyncio.sleep(interval) | |
| # Global profiler instance | |
| performance_profiler = AdvancedPerformanceProfiler() | |