| """ |
| Real-time evaluation module for streaming data and generating outputs on-the-fly. |
| |
| Supports: |
| - Streaming evaluation |
| - Real-time monitoring |
| - Progressive result aggregation |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import time |
| from dataclasses import asdict, dataclass, field |
| from pathlib import Path |
| from typing import Any, Callable, Dict, Iterator, List, Optional |
|
|
| from src.utils.performance_monitor import PerformanceMonitor |
|
|
|
|
| @dataclass |
| class RealtimeMetric: |
| """Single metric update in real-time evaluation.""" |
|
|
| timestamp: float |
| sample_id: str |
| metric_name: str |
| value: float |
| metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass |
| class RealtimeResult: |
| """Result from real-time evaluation.""" |
|
|
| sample_id: str |
| timestamp: float |
| scores: Dict[str, float] |
| coherence: Dict[str, Any] |
| performance: Dict[str, Any] = field(default_factory=dict) |
| metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| class RealtimeEvaluator: |
| """Real-time evaluator for streaming evaluation.""" |
|
|
| def __init__( |
| self, |
| evaluation_func: Callable[[Any], Dict[str, Any]], |
| output_dir: Optional[str] = None, |
| enable_monitoring: bool = True, |
| ): |
| self.evaluation_func = evaluation_func |
| self.output_dir = Path(output_dir) if output_dir else None |
| if self.output_dir: |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
| |
| self.enable_monitoring = enable_monitoring |
| self.monitor = PerformanceMonitor() if enable_monitoring else None |
| |
| self.results: List[RealtimeResult] = [] |
| self.metrics: List[RealtimeMetric] = [] |
| self._start_time = time.time() |
|
|
| def evaluate_stream( |
| self, |
| samples: Iterator[Any], |
| sample_id_func: Optional[Callable[[Any], str]] = None, |
| ) -> Iterator[RealtimeResult]: |
| """ |
| Evaluate samples in a stream, yielding results as they become available. |
| |
| Args: |
| samples: Iterator of samples to evaluate |
| sample_id_func: Function to extract sample ID from sample |
| |
| Yields: |
| RealtimeResult for each evaluated sample |
| """ |
| for idx, sample in enumerate(samples): |
| sample_id = sample_id_func(sample) if sample_id_func else f"sample_{idx}" |
| |
| |
| start_time = time.time() |
| |
| if self.enable_monitoring and self.monitor: |
| from src.utils.performance_monitor import measure_performance |
| with measure_performance( |
| self.monitor, |
| operation_name="realtime_evaluation", |
| batch_size=1, |
| metadata={"sample_id": sample_id}, |
| ): |
| result_data = self.evaluation_func(sample) |
| else: |
| result_data = self.evaluation_func(sample) |
| |
| eval_time = time.time() - start_time |
| |
| |
| scores = result_data.get("scores", {}) |
| coherence = result_data.get("coherence", {}) |
| |
| |
| performance = {} |
| if self.enable_monitoring and self.monitor: |
| stats = self.monitor.get_stats("realtime_evaluation") |
| if stats: |
| perf_stats = stats.get("realtime_evaluation") |
| if perf_stats: |
| performance = { |
| "inference_time": perf_stats.avg_time, |
| "throughput": perf_stats.avg_throughput, |
| } |
| |
| |
| result = RealtimeResult( |
| sample_id=sample_id, |
| timestamp=time.time(), |
| scores=scores, |
| coherence=coherence, |
| performance=performance, |
| metadata=result_data.get("metadata", {}), |
| ) |
| |
| self.results.append(result) |
| |
| |
| for metric_name, value in scores.items(): |
| metric = RealtimeMetric( |
| timestamp=time.time(), |
| sample_id=sample_id, |
| metric_name=metric_name, |
| value=value, |
| ) |
| self.metrics.append(metric) |
| |
| |
| if self.output_dir: |
| self._save_result(result) |
| |
| yield result |
|
|
| def evaluate_batch( |
| self, |
| samples: List[Any], |
| sample_id_func: Optional[Callable[[Any], str]] = None, |
| ) -> List[RealtimeResult]: |
| """ |
| Evaluate a batch of samples, returning results. |
| |
| Args: |
| samples: List of samples to evaluate |
| sample_id_func: Function to extract sample ID from sample |
| |
| Returns: |
| List of RealtimeResult |
| """ |
| results = list(self.evaluate_stream(iter(samples), sample_id_func=sample_id_func)) |
| return results |
|
|
| def get_aggregate_stats(self) -> Dict[str, Any]: |
| """Get aggregate statistics from evaluated results.""" |
| if not self.results: |
| return {} |
| |
| |
| all_scores: Dict[str, List[float]] = {} |
| for result in self.results: |
| for metric_name, value in result.scores.items(): |
| if metric_name not in all_scores: |
| all_scores[metric_name] = [] |
| all_scores[metric_name].append(value) |
| |
| |
| stats = {} |
| for metric_name, values in all_scores.items(): |
| import numpy as np |
| stats[metric_name] = { |
| "mean": float(np.mean(values)), |
| "std": float(np.std(values)), |
| "min": float(np.min(values)), |
| "max": float(np.max(values)), |
| "count": len(values), |
| } |
| |
| |
| stats["total_samples"] = len(self.results) |
| stats["total_time"] = time.time() - self._start_time |
| stats["avg_throughput"] = len(self.results) / stats["total_time"] if stats["total_time"] > 0 else 0.0 |
| |
| return stats |
|
|
| def get_metrics_history(self, metric_name: Optional[str] = None) -> List[RealtimeMetric]: |
| """Get history of metrics.""" |
| if metric_name: |
| return [m for m in self.metrics if m.metric_name == metric_name] |
| return self.metrics.copy() |
|
|
| def _save_result(self, result: RealtimeResult) -> None: |
| """Save individual result to disk.""" |
| if not self.output_dir: |
| return |
| |
| result_file = self.output_dir / f"{result.sample_id}.json" |
| with result_file.open("w") as f: |
| json.dump(asdict(result), f, indent=2, default=str) |
|
|
| def save_summary(self, output_path: Optional[str] = None) -> None: |
| """Save evaluation summary to disk.""" |
| if output_path is None and self.output_dir: |
| output_path = str(self.output_dir / "summary.json") |
| |
| if output_path is None: |
| return |
| |
| summary = { |
| "aggregate_stats": self.get_aggregate_stats(), |
| "total_results": len(self.results), |
| "performance_summary": self.monitor.get_summary() if self.monitor else {}, |
| } |
| |
| with open(output_path, "w") as f: |
| json.dump(summary, f, indent=2, default=str) |
|
|
| def reset(self) -> None: |
| """Reset evaluator state.""" |
| self.results.clear() |
| self.metrics.clear() |
| self._start_time = time.time() |
| if self.monitor: |
| self.monitor.reset() |
|
|