""" Progress Tracking Module Handles progress monitoring, ETA calculations, and performance statistics """ import time import logging from typing import Optional, Callable, Dict, Any, List from dataclasses import dataclass, field from collections import deque logger = logging.getLogger(__name__) @dataclass class ProgressSnapshot: """Snapshot of progress at a specific point in time""" timestamp: float frame_number: int stage: str fps: float memory_usage_mb: Optional[float] = None custom_metrics: Dict[str, Any] = field(default_factory=dict) class ProgressTracker: """ Enhanced progress tracking with detailed statistics and ETA calculations """ def __init__(self, total_frames: int, callback: Optional[Callable] = None, track_performance: bool = True): self.total_frames = total_frames self.callback = callback self.track_performance = track_performance # Timing data self.start_time = time.time() self.last_update_time = self.start_time self.processed_frames = 0 # Performance tracking self.frame_times = deque(maxlen=100) # Keep last 100 frame times self.fps_history = deque(maxlen=50) # Keep last 50 FPS measurements self.snapshots: List[ProgressSnapshot] = [] # Stage tracking self.current_stage = "initializing" self.stage_start_time = self.start_time self.stages_completed = [] # Statistics self.stats = { 'total_processing_time': 0.0, 'average_fps': 0.0, 'peak_fps': 0.0, 'slowest_fps': float('inf'), 'frames_per_second_variance': 0.0, 'estimated_completion_accuracy': 0.0, 'stage_times': {}, 'memory_peak_mb': 0.0 } # ETA calculation self.eta_smoothing_factor = 0.2 # For exponential smoothing self.smoothed_fps = 0.0 logger.debug(f"ProgressTracker initialized for {total_frames} frames") def update(self, frame_number: int, stage: str = "", custom_metrics: Optional[Dict[str, Any]] = None, memory_usage_mb: Optional[float] = None): """ Update progress with comprehensive tracking Args: frame_number: Current frame being processed stage: Current processing stage description custom_metrics: Additional metrics to track memory_usage_mb: Current memory usage in MB """ current_time = time.time() # Handle stage changes if stage and stage != self.current_stage: self._complete_stage() self.current_stage = stage self.stage_start_time = current_time # Calculate frame timing if self.processed_frames > 0: frame_time = current_time - self.last_update_time self.frame_times.append(frame_time) self.processed_frames = frame_number self.last_update_time = current_time # Calculate performance metrics elapsed_time = current_time - self.start_time current_fps = self._calculate_current_fps() # Update FPS history and smoothing if current_fps > 0: self.fps_history.append(current_fps) self._update_smoothed_fps(current_fps) # Calculate ETA eta_seconds = self._calculate_eta() progress_pct = self.processed_frames / self.total_frames if self.total_frames > 0 else 0 # Update statistics self._update_statistics(current_fps, memory_usage_mb) # Create snapshot if performance tracking is enabled if self.track_performance: snapshot = ProgressSnapshot( timestamp=current_time, frame_number=frame_number, stage=self.current_stage, fps=current_fps, memory_usage_mb=memory_usage_mb, custom_metrics=custom_metrics or {} ) self.snapshots.append(snapshot) # Generate progress message message = self._generate_progress_message( elapsed_time, current_fps, eta_seconds, stage ) # Call progress callback if self.callback: try: self.callback(progress_pct, message) except Exception as e: logger.warning(f"Progress callback failed: {e}") # Log detailed progress periodically if frame_number % 50 == 0 or frame_number == self.total_frames: self._log_detailed_progress(progress_pct, current_fps, eta_seconds) def _calculate_current_fps(self) -> float: """Calculate current FPS based on recent frame times""" if not self.frame_times: return 0.0 # Use average of recent frame times for stability recent_frame_times = list(self.frame_times)[-10:] # Last 10 frames avg_frame_time = sum(recent_frame_times) / len(recent_frame_times) return 1.0 / avg_frame_time if avg_frame_time > 0 else 0.0 def _update_smoothed_fps(self, current_fps: float): """Update smoothed FPS using exponential smoothing""" if self.smoothed_fps == 0.0: self.smoothed_fps = current_fps else: self.smoothed_fps = ( self.eta_smoothing_factor * current_fps + (1 - self.eta_smoothing_factor) * self.smoothed_fps ) def _calculate_eta(self) -> float: """Calculate estimated time to completion""" if self.processed_frames <= 0 or self.smoothed_fps <= 0: return 0.0 remaining_frames = self.total_frames - self.processed_frames return remaining_frames / self.smoothed_fps def _update_statistics(self, current_fps: float, memory_usage_mb: Optional[float]): """Update comprehensive statistics""" current_time = time.time() self.stats['total_processing_time'] = current_time - self.start_time # FPS statistics if self.fps_history: fps_list = list(self.fps_history) self.stats['average_fps'] = sum(fps_list) / len(fps_list) self.stats['peak_fps'] = max(fps_list) self.stats['slowest_fps'] = min(fps_list) # Calculate variance avg_fps = self.stats['average_fps'] variance = sum((fps - avg_fps) ** 2 for fps in fps_list) / len(fps_list) self.stats['frames_per_second_variance'] = variance # Memory tracking if memory_usage_mb and memory_usage_mb > self.stats['memory_peak_mb']: self.stats['memory_peak_mb'] = memory_usage_mb def _complete_stage(self): """Complete the current stage and record its duration""" if self.current_stage: stage_duration = time.time() - self.stage_start_time self.stats['stage_times'][self.current_stage] = stage_duration self.stages_completed.append({ 'stage': self.current_stage, 'duration': stage_duration, 'frames_processed': self.processed_frames }) logger.debug(f"Completed stage '{self.current_stage}' in {stage_duration:.2f}s") def _generate_progress_message(self, elapsed_time: float, current_fps: float, eta_seconds: float, stage: str) -> str: """Generate comprehensive progress message""" # Base progress info message = ( f"Frame {self.processed_frames}/{self.total_frames} | " f"Elapsed: {self._format_time(elapsed_time)} | " f"Speed: {current_fps:.1f} fps" ) # Add ETA if meaningful if eta_seconds > 0: message += f" | ETA: {self._format_time(eta_seconds)}" # Add stage information if stage: message = f"{stage} | {message}" # Add performance indicators if self.fps_history and len(self.fps_history) >= 10: recent_avg = sum(list(self.fps_history)[-10:]) / 10 if abs(current_fps - recent_avg) / recent_avg > 0.2: # 20% difference trend = "↗" if current_fps > recent_avg else "↘" message += f" {trend}" return message def _format_time(self, seconds: float) -> str: """Format time duration in human-readable format""" if seconds < 60: return f"{int(seconds)}s" elif seconds < 3600: minutes = int(seconds // 60) secs = int(seconds % 60) return f"{minutes}m {secs}s" else: hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) return f"{hours}h {minutes}m" def _log_detailed_progress(self, progress_pct: float, current_fps: float, eta_seconds: float): """Log detailed progress information""" logger.info( f"Progress: {progress_pct*100:.1f}% | " f"FPS: {current_fps:.1f} (avg: {self.stats['average_fps']:.1f}) | " f"ETA: {self._format_time(eta_seconds)} | " f"Stage: {self.current_stage}" ) def set_stage(self, stage: str): """Manually set the current processing stage""" if stage != self.current_stage: self._complete_stage() self.current_stage = stage self.stage_start_time = time.time() logger.debug(f"Stage changed to: {stage}") def add_custom_metric(self, key: str, value: Any): """Add a custom metric to the current snapshot""" if self.snapshots: self.snapshots[-1].custom_metrics[key] = value def get_performance_summary(self) -> Dict[str, Any]: """Get comprehensive performance summary""" self._complete_stage() # Complete current stage total_time = time.time() - self.start_time summary = { 'total_frames': self.total_frames, 'processed_frames': self.processed_frames, 'completion_percentage': (self.processed_frames / self.total_frames * 100) if self.total_frames > 0 else 0, 'total_processing_time': total_time, 'overall_fps': self.processed_frames / total_time if total_time > 0 else 0, 'stages_completed': len(self.stages_completed), 'current_stage': self.current_stage, 'statistics': self.stats.copy(), 'stage_breakdown': self.stages_completed.copy() } # Calculate stage percentages if self.stats['stage_times']: total_stage_time = sum(self.stats['stage_times'].values()) summary['stage_percentages'] = { stage: (duration / total_stage_time * 100) for stage, duration in self.stats['stage_times'].items() } # Performance analysis if self.fps_history: fps_list = list(self.fps_history) summary['performance_analysis'] = { 'fps_stability': self._calculate_fps_stability(), 'performance_trend': self._analyze_performance_trend(), 'bottleneck_detection': self._detect_bottlenecks() } return summary def _calculate_fps_stability(self) -> str: """Analyze FPS stability""" if not self.fps_history or len(self.fps_history) < 10: return "insufficient_data" variance = self.stats['frames_per_second_variance'] avg_fps = self.stats['average_fps'] if avg_fps == 0: return "unstable" coefficient_of_variation = (variance ** 0.5) / avg_fps if coefficient_of_variation < 0.1: return "very_stable" elif coefficient_of_variation < 0.2: return "stable" elif coefficient_of_variation < 0.4: return "moderate" else: return "unstable" def _analyze_performance_trend(self) -> str: """Analyze performance trend over time""" if len(self.fps_history) < 20: return "insufficient_data" # Compare first and last quartiles fps_list = list(self.fps_history) quartile_size = len(fps_list) // 4 first_quartile_avg = sum(fps_list[:quartile_size]) / quartile_size last_quartile_avg = sum(fps_list[-quartile_size:]) / quartile_size change_percent = ((last_quartile_avg - first_quartile_avg) / first_quartile_avg) * 100 if change_percent > 10: return "improving" elif change_percent < -10: return "degrading" else: return "stable" def _detect_bottlenecks(self) -> List[str]: """Detect potential performance bottlenecks""" bottlenecks = [] # Check for consistently low FPS if self.stats['average_fps'] < 0.5: bottlenecks.append("very_low_fps") # Check for high variance if self.stats['frames_per_second_variance'] > (self.stats['average_fps'] * 0.5) ** 2: bottlenecks.append("inconsistent_performance") # Check for memory pressure (if tracked) if self.stats['memory_peak_mb'] > 8000: # 8GB bottlenecks.append("high_memory_usage") # Check stage timing imbalances if self.stats['stage_times']: stage_times = list(self.stats['stage_times'].values()) max_time = max(stage_times) avg_time = sum(stage_times) / len(stage_times) if max_time > avg_time * 3: bottlenecks.append("stage_imbalance") return bottlenecks def export_performance_data(self) -> Dict[str, Any]: """Export detailed performance data for analysis""" return { 'metadata': { 'total_frames': self.total_frames, 'tracking_enabled': self.track_performance, 'start_time': self.start_time, 'export_time': time.time() }, 'snapshots': [ { 'timestamp': snap.timestamp, 'frame_number': snap.frame_number, 'stage': snap.stage, 'fps': snap.fps, 'memory_usage_mb': snap.memory_usage_mb, 'custom_metrics': snap.custom_metrics } for snap in self.snapshots ], 'statistics': self.stats, 'stages': self.stages_completed, 'performance_summary': self.get_performance_summary() } def reset(self, new_total_frames: Optional[int] = None): """Reset tracker for new processing session""" if new_total_frames is not None: self.total_frames = new_total_frames self.start_time = time.time() self.last_update_time = self.start_time self.processed_frames = 0 self.frame_times.clear() self.fps_history.clear() self.snapshots.clear() self.current_stage = "initializing" self.stage_start_time = self.start_time self.stages_completed.clear() self.smoothed_fps = 0.0 # Reset statistics self.stats = { 'total_processing_time': 0.0, 'average_fps': 0.0, 'peak_fps': 0.0, 'slowest_fps': float('inf'), 'frames_per_second_variance': 0.0, 'estimated_completion_accuracy': 0.0, 'stage_times': {}, 'memory_peak_mb': 0.0 } logger.debug("ProgressTracker reset") def finalize(self) -> Dict[str, Any]: """Finalize tracking and return comprehensive results""" self._complete_stage() final_summary = self.get_performance_summary() logger.info( f"Processing completed: {self.processed_frames}/{self.total_frames} frames " f"in {self._format_time(final_summary['total_processing_time'])} " f"(avg: {final_summary['overall_fps']:.1f} fps)" ) return final_summary