Spaces:
Running
Running
| """ | |
| Performance Timing Utilities for Epic 2 Demo | |
| ============================================ | |
| Provides timing context managers and performance instrumentation for accurate | |
| measurement of component performance in the Epic 2 demo system. | |
| """ | |
| import time | |
| import logging | |
| from contextlib import contextmanager | |
| from typing import Dict, Any, Optional, List | |
| from dataclasses import dataclass, field | |
| from threading import Lock | |
| logger = logging.getLogger(__name__) | |
| class TimingResult: | |
| """Represents a timing measurement result""" | |
| stage_name: str | |
| start_time: float | |
| end_time: float | |
| duration_ms: float | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| def duration_seconds(self) -> float: | |
| return self.duration_ms / 1000.0 | |
| class PipelineTimings: | |
| """Aggregates timing results for a complete pipeline""" | |
| total_start: float | |
| total_end: Optional[float] = None | |
| stages: List[TimingResult] = field(default_factory=list) | |
| def total_duration_ms(self) -> float: | |
| if self.total_end is None: | |
| return 0.0 | |
| return (self.total_end - self.total_start) * 1000.0 | |
| def get_stage_timings(self) -> Dict[str, Dict[str, Any]]: | |
| """Get stage timings in format expected by demo UI""" | |
| timings = {} | |
| for stage in self.stages: | |
| timings[stage.stage_name] = { | |
| "time_ms": stage.duration_ms, | |
| "results": stage.metadata.get("results", 0), | |
| "metadata": stage.metadata | |
| } | |
| return timings | |
| def add_stage(self, stage_name: str, duration_ms: float, metadata: Dict[str, Any] = None): | |
| """Add a completed stage timing""" | |
| current_time = time.time() | |
| stage = TimingResult( | |
| stage_name=stage_name, | |
| start_time=current_time - (duration_ms / 1000.0), | |
| end_time=current_time, | |
| duration_ms=duration_ms, | |
| metadata=metadata or {} | |
| ) | |
| self.stages.append(stage) | |
| class PerformanceInstrumentation: | |
| """Main performance timing instrumentation for Epic 2 demo""" | |
| def __init__(self): | |
| self._active_timings: Dict[str, PipelineTimings] = {} | |
| self._lock = Lock() | |
| def start_pipeline(self, pipeline_id: str) -> PipelineTimings: | |
| """Start timing a new pipeline""" | |
| with self._lock: | |
| timing = PipelineTimings(total_start=time.time()) | |
| self._active_timings[pipeline_id] = timing | |
| return timing | |
| def finish_pipeline(self, pipeline_id: str) -> Optional[PipelineTimings]: | |
| """Finish timing a pipeline and return results""" | |
| with self._lock: | |
| if pipeline_id in self._active_timings: | |
| timing = self._active_timings[pipeline_id] | |
| timing.total_end = time.time() | |
| del self._active_timings[pipeline_id] | |
| return timing | |
| return None | |
| def time_stage(self, pipeline_id: str, stage_name: str, metadata: Dict[str, Any] = None): | |
| """Context manager for timing a pipeline stage""" | |
| start_time = time.time() | |
| try: | |
| yield | |
| finally: | |
| end_time = time.time() | |
| duration_ms = (end_time - start_time) * 1000.0 | |
| with self._lock: | |
| if pipeline_id in self._active_timings: | |
| timing = self._active_timings[pipeline_id] | |
| timing.add_stage(stage_name, duration_ms, metadata or {}) | |
| logger.debug(f"Stage '{stage_name}' completed in {duration_ms:.2f}ms") | |
| def get_timing(self, pipeline_id: str) -> Optional[PipelineTimings]: | |
| """Get current timing for a pipeline""" | |
| with self._lock: | |
| return self._active_timings.get(pipeline_id) | |
| class ComponentPerformanceExtractor: | |
| """Extracts performance metrics from RAG system components""" | |
| def extract_retriever_metrics(retriever) -> Dict[str, Any]: | |
| """Extract detailed timing metrics from ModularUnifiedRetriever""" | |
| metrics = {} | |
| # Try to get performance metrics from the retriever | |
| if hasattr(retriever, 'get_metrics'): | |
| component_metrics = retriever.get_metrics() | |
| if component_metrics: | |
| # Extract stats from the actual format | |
| retrieval_stats = component_metrics.get('retrieval_stats', {}) | |
| # Get sub-component statistics | |
| sub_components = component_metrics.get('sub_components', {}) | |
| # Extract reranker statistics | |
| reranker_stats = sub_components.get('reranker', {}).get('statistics', {}) | |
| fusion_stats = sub_components.get('fusion_strategy', {}).get('statistics', {}) | |
| # Create metrics in expected format | |
| metrics['dense_retrieval'] = { | |
| 'time_ms': retrieval_stats.get('last_retrieval_time', 0) * 1000, | |
| 'results': component_metrics.get('indexed_documents', 0) | |
| } | |
| metrics['sparse_retrieval'] = { | |
| 'time_ms': retrieval_stats.get('avg_time', 0) * 1000, | |
| 'results': component_metrics.get('indexed_documents', 0) | |
| } | |
| metrics['fusion'] = { | |
| 'time_ms': fusion_stats.get('avg_graph_latency_ms', 0), | |
| 'results': fusion_stats.get('total_fusions', 0) | |
| } | |
| metrics['neural_reranking'] = { | |
| 'time_ms': reranker_stats.get('total_latency_ms', 0), | |
| 'results': reranker_stats.get('successful_queries', 0) | |
| } | |
| # Total retrieval time | |
| metrics['total_retrieval_time_ms'] = retrieval_stats.get('total_time', 0) * 1000 | |
| return metrics | |
| def extract_generator_metrics(generator) -> Dict[str, Any]: | |
| """Extract detailed timing metrics from AnswerGenerator""" | |
| metrics = {} | |
| # Try to get performance metrics from the generator | |
| if hasattr(generator, 'get_metrics'): | |
| component_metrics = generator.get_metrics() | |
| if component_metrics: | |
| # Extract stats from the actual format | |
| generation_count = component_metrics.get('generation_count', 0) | |
| total_time = component_metrics.get('total_time', 0) | |
| avg_time = component_metrics.get('avg_time', 0) | |
| # Get sub-component information | |
| sub_components = component_metrics.get('sub_components', {}) | |
| llm_client = sub_components.get('llm_client', {}) | |
| # Create metrics in expected format | |
| metrics['prompt_building'] = { | |
| 'time_ms': avg_time * 1000 * 0.1, # Estimate 10% of total time | |
| 'results': generation_count | |
| } | |
| metrics['llm_generation'] = { | |
| 'time_ms': avg_time * 1000 * 0.8, # Estimate 80% of total time | |
| 'results': generation_count | |
| } | |
| metrics['response_parsing'] = { | |
| 'time_ms': avg_time * 1000 * 0.05, # Estimate 5% of total time | |
| 'results': generation_count | |
| } | |
| metrics['confidence_scoring'] = { | |
| 'time_ms': avg_time * 1000 * 0.05, # Estimate 5% of total time | |
| 'results': generation_count | |
| } | |
| # Total generation time | |
| metrics['total_generation_time_ms'] = total_time * 1000 | |
| return metrics | |
| def create_demo_timing_format(retriever_metrics: Dict[str, Any], | |
| generator_metrics: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create timing format expected by the demo UI""" | |
| return { | |
| # Retrieval stages | |
| "dense_retrieval": retriever_metrics.get('dense_retrieval', {"time_ms": 0, "results": 0}), | |
| "sparse_retrieval": retriever_metrics.get('sparse_retrieval', {"time_ms": 0, "results": 0}), | |
| "graph_enhancement": retriever_metrics.get('fusion', {"time_ms": 0, "results": 0}), | |
| "neural_reranking": retriever_metrics.get('neural_reranking', {"time_ms": 0, "results": 0}), | |
| # Generation stages | |
| "prompt_building": generator_metrics.get('prompt_building', {"time_ms": 0, "results": 0}), | |
| "llm_generation": generator_metrics.get('llm_generation', {"time_ms": 0, "results": 0}), | |
| "response_parsing": generator_metrics.get('response_parsing', {"time_ms": 0, "results": 0}), | |
| "confidence_scoring": generator_metrics.get('confidence_scoring', {"time_ms": 0, "results": 0}), | |
| } | |
| # Global performance instrumentation instance | |
| performance_instrumentation = PerformanceInstrumentation() | |
| def time_query_pipeline(query: str): | |
| """Context manager for timing a complete query processing pipeline""" | |
| pipeline_id = f"query_{int(time.time() * 1000)}" | |
| timing = performance_instrumentation.start_pipeline(pipeline_id) | |
| try: | |
| yield timing, pipeline_id | |
| finally: | |
| final_timing = performance_instrumentation.finish_pipeline(pipeline_id) | |
| if final_timing: | |
| logger.info(f"Query pipeline completed in {final_timing.total_duration_ms:.2f}ms") |