Spaces:
Configuration error
Configuration error
""" | |
core/monitoring.py - Monitoring system for LLMGuardian | |
""" | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Any | |
from dataclasses import dataclass | |
import threading | |
import time | |
import json | |
from collections import deque | |
import statistics | |
from .logger import SecurityLogger | |
class MonitoringMetric: | |
"""Representation of a monitoring metric""" | |
name: str | |
value: float | |
timestamp: datetime | |
labels: Dict[str, str] | |
class Alert: | |
"""Alert representation""" | |
severity: str | |
message: str | |
metric: str | |
threshold: float | |
current_value: float | |
timestamp: datetime | |
class MetricsCollector: | |
"""Collect and store monitoring metrics""" | |
def __init__(self, max_history: int = 1000): | |
self.metrics: Dict[str, deque] = {} | |
self.max_history = max_history | |
self._lock = threading.Lock() | |
def record_metric(self, name: str, value: float, | |
labels: Optional[Dict[str, str]] = None) -> None: | |
"""Record a new metric value""" | |
with self._lock: | |
if name not in self.metrics: | |
self.metrics[name] = deque(maxlen=self.max_history) | |
metric = MonitoringMetric( | |
name=name, | |
value=value, | |
timestamp=datetime.utcnow(), | |
labels=labels or {} | |
) | |
self.metrics[name].append(metric) | |
def get_metrics(self, name: str, | |
time_window: Optional[timedelta] = None) -> List[MonitoringMetric]: | |
"""Get metrics for a specific name within time window""" | |
with self._lock: | |
if name not in self.metrics: | |
return [] | |
if not time_window: | |
return list(self.metrics[name]) | |
cutoff = datetime.utcnow() - time_window | |
return [m for m in self.metrics[name] if m.timestamp >= cutoff] | |
def calculate_statistics(self, name: str, | |
time_window: Optional[timedelta] = None) -> Dict[str, float]: | |
"""Calculate statistics for a metric""" | |
metrics = self.get_metrics(name, time_window) | |
if not metrics: | |
return {} | |
values = [m.value for m in metrics] | |
return { | |
"min": min(values), | |
"max": max(values), | |
"avg": statistics.mean(values), | |
"median": statistics.median(values), | |
"std_dev": statistics.stdev(values) if len(values) > 1 else 0 | |
} | |
class AlertManager: | |
"""Manage monitoring alerts""" | |
def __init__(self, security_logger: SecurityLogger): | |
self.security_logger = security_logger | |
self.alerts: List[Alert] = [] | |
self.alert_handlers: Dict[str, List[callable]] = {} | |
self._lock = threading.Lock() | |
def add_alert_handler(self, severity: str, handler: callable) -> None: | |
"""Add an alert handler for a specific severity""" | |
with self._lock: | |
if severity not in self.alert_handlers: | |
self.alert_handlers[severity] = [] | |
self.alert_handlers[severity].append(handler) | |
def trigger_alert(self, alert: Alert) -> None: | |
"""Trigger an alert""" | |
with self._lock: | |
self.alerts.append(alert) | |
# Log alert | |
self.security_logger.log_security_event( | |
"monitoring_alert", | |
severity=alert.severity, | |
message=alert.message, | |
metric=alert.metric, | |
threshold=alert.threshold, | |
current_value=alert.current_value | |
) | |
# Call handlers | |
handlers = self.alert_handlers.get(alert.severity, []) | |
for handler in handlers: | |
try: | |
handler(alert) | |
except Exception as e: | |
self.security_logger.log_security_event( | |
"alert_handler_error", | |
error=str(e), | |
handler=handler.__name__ | |
) | |
def get_recent_alerts(self, time_window: timedelta) -> List[Alert]: | |
"""Get recent alerts within time window""" | |
cutoff = datetime.utcnow() - time_window | |
return [a for a in self.alerts if a.timestamp >= cutoff] | |
class MonitoringRule: | |
"""Rule for monitoring metrics""" | |
def __init__(self, metric_name: str, threshold: float, | |
comparison: str, severity: str, message: str): | |
self.metric_name = metric_name | |
self.threshold = threshold | |
self.comparison = comparison | |
self.severity = severity | |
self.message = message | |
def evaluate(self, value: float) -> Optional[Alert]: | |
"""Evaluate the rule against a value""" | |
triggered = False | |
if self.comparison == "gt" and value > self.threshold: | |
triggered = True | |
elif self.comparison == "lt" and value < self.threshold: | |
triggered = True | |
elif self.comparison == "eq" and value == self.threshold: | |
triggered = True | |
if triggered: | |
return Alert( | |
severity=self.severity, | |
message=self.message, | |
metric=self.metric_name, | |
threshold=self.threshold, | |
current_value=value, | |
timestamp=datetime.utcnow() | |
) | |
return None | |
class MonitoringService: | |
"""Main monitoring service""" | |
def __init__(self, security_logger: SecurityLogger): | |
self.collector = MetricsCollector() | |
self.alert_manager = AlertManager(security_logger) | |
self.rules: List[MonitoringRule] = [] | |
self.security_logger = security_logger | |
self._running = False | |
self._monitor_thread = None | |
def add_rule(self, rule: MonitoringRule) -> None: | |
"""Add a monitoring rule""" | |
self.rules.append(rule) | |
def start_monitoring(self, interval: int = 60) -> None: | |
"""Start the monitoring service""" | |
if self._running: | |
return | |
self._running = True | |
self._monitor_thread = threading.Thread( | |
target=self._monitoring_loop, | |
args=(interval,) | |
) | |
self._monitor_thread.daemon = True | |
self._monitor_thread.start() | |
def stop_monitoring(self) -> None: | |
"""Stop the monitoring service""" | |
self._running = False | |
if self._monitor_thread: | |
self._monitor_thread.join() | |
def _monitoring_loop(self, interval: int) -> None: | |
"""Main monitoring loop""" | |
while self._running: | |
try: | |
self._check_rules() | |
time.sleep(interval) | |
except Exception as e: | |
self.security_logger.log_security_event( | |
"monitoring_error", | |
error=str(e) | |
) | |
def _check_rules(self) -> None: | |
"""Check all monitoring rules""" | |
for rule in self.rules: | |
metrics = self.collector.get_metrics( | |
rule.metric_name, | |
timedelta(minutes=5) # Look at last 5 minutes | |
) | |
if not metrics: | |
continue | |
# Use the most recent metric | |
latest_metric = metrics[-1] | |
alert = rule.evaluate(latest_metric.value) | |
if alert: | |
self.alert_manager.trigger_alert(alert) | |
def record_metric(self, name: str, value: float, | |
labels: Optional[Dict[str, str]] = None) -> None: | |
"""Record a new metric""" | |
self.collector.record_metric(name, value, labels) | |
def create_monitoring_service(security_logger: SecurityLogger) -> MonitoringService: | |
"""Create and configure a monitoring service""" | |
service = MonitoringService(security_logger) | |
# Add default rules | |
rules = [ | |
MonitoringRule( | |
metric_name="request_rate", | |
threshold=100, | |
comparison="gt", | |
severity="warning", | |
message="High request rate detected" | |
), | |
MonitoringRule( | |
metric_name="error_rate", | |
threshold=0.1, | |
comparison="gt", | |
severity="error", | |
message="High error rate detected" | |
), | |
MonitoringRule( | |
metric_name="response_time", | |
threshold=1.0, | |
comparison="gt", | |
severity="warning", | |
message="Slow response time detected" | |
) | |
] | |
for rule in rules: | |
service.add_rule(rule) | |
return service | |
if __name__ == "__main__": | |
# Example usage | |
from .logger import setup_logging | |
security_logger, _ = setup_logging() | |
monitoring = create_monitoring_service(security_logger) | |
# Add custom alert handler | |
def alert_handler(alert: Alert): | |
print(f"Alert: {alert.message} (Severity: {alert.severity})") | |
monitoring.alert_manager.add_alert_handler("warning", alert_handler) | |
monitoring.alert_manager.add_alert_handler("error", alert_handler) | |
# Start monitoring | |
monitoring.start_monitoring(interval=10) | |
# Simulate some metrics | |
try: | |
while True: | |
monitoring.record_metric("request_rate", 150) # Should trigger alert | |
time.sleep(5) | |
except KeyboardInterrupt: | |
monitoring.stop_monitoring() |