MogensR commited on
Commit
6b58990
·
1 Parent(s): 84a78ca

Create progress_tracker.py

Browse files
Files changed (1) hide show
  1. progress_tracker.py +437 -0
progress_tracker.py ADDED
@@ -0,0 +1,437 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Progress Tracking Module
3
+ Handles progress monitoring, ETA calculations, and performance statistics
4
+ """
5
+
6
+ import time
7
+ import logging
8
+ from typing import Optional, Callable, Dict, Any, List
9
+ from dataclasses import dataclass, field
10
+ from collections import deque
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+ @dataclass
15
+ class ProgressSnapshot:
16
+ """Snapshot of progress at a specific point in time"""
17
+ timestamp: float
18
+ frame_number: int
19
+ stage: str
20
+ fps: float
21
+ memory_usage_mb: Optional[float] = None
22
+ custom_metrics: Dict[str, Any] = field(default_factory=dict)
23
+
24
+ class ProgressTracker:
25
+ """
26
+ Enhanced progress tracking with detailed statistics and ETA calculations
27
+ """
28
+
29
+ def __init__(self, total_frames: int, callback: Optional[Callable] = None,
30
+ track_performance: bool = True):
31
+ self.total_frames = total_frames
32
+ self.callback = callback
33
+ self.track_performance = track_performance
34
+
35
+ # Timing data
36
+ self.start_time = time.time()
37
+ self.last_update_time = self.start_time
38
+ self.processed_frames = 0
39
+
40
+ # Performance tracking
41
+ self.frame_times = deque(maxlen=100) # Keep last 100 frame times
42
+ self.fps_history = deque(maxlen=50) # Keep last 50 FPS measurements
43
+ self.snapshots: List[ProgressSnapshot] = []
44
+
45
+ # Stage tracking
46
+ self.current_stage = "initializing"
47
+ self.stage_start_time = self.start_time
48
+ self.stages_completed = []
49
+
50
+ # Statistics
51
+ self.stats = {
52
+ 'total_processing_time': 0.0,
53
+ 'average_fps': 0.0,
54
+ 'peak_fps': 0.0,
55
+ 'slowest_fps': float('inf'),
56
+ 'frames_per_second_variance': 0.0,
57
+ 'estimated_completion_accuracy': 0.0,
58
+ 'stage_times': {},
59
+ 'memory_peak_mb': 0.0
60
+ }
61
+
62
+ # ETA calculation
63
+ self.eta_smoothing_factor = 0.2 # For exponential smoothing
64
+ self.smoothed_fps = 0.0
65
+
66
+ logger.debug(f"ProgressTracker initialized for {total_frames} frames")
67
+
68
+ def update(self, frame_number: int, stage: str = "",
69
+ custom_metrics: Optional[Dict[str, Any]] = None,
70
+ memory_usage_mb: Optional[float] = None):
71
+ """
72
+ Update progress with comprehensive tracking
73
+
74
+ Args:
75
+ frame_number: Current frame being processed
76
+ stage: Current processing stage description
77
+ custom_metrics: Additional metrics to track
78
+ memory_usage_mb: Current memory usage in MB
79
+ """
80
+ current_time = time.time()
81
+
82
+ # Handle stage changes
83
+ if stage and stage != self.current_stage:
84
+ self._complete_stage()
85
+ self.current_stage = stage
86
+ self.stage_start_time = current_time
87
+
88
+ # Calculate frame timing
89
+ if self.processed_frames > 0:
90
+ frame_time = current_time - self.last_update_time
91
+ self.frame_times.append(frame_time)
92
+
93
+ self.processed_frames = frame_number
94
+ self.last_update_time = current_time
95
+
96
+ # Calculate performance metrics
97
+ elapsed_time = current_time - self.start_time
98
+ current_fps = self._calculate_current_fps()
99
+
100
+ # Update FPS history and smoothing
101
+ if current_fps > 0:
102
+ self.fps_history.append(current_fps)
103
+ self._update_smoothed_fps(current_fps)
104
+
105
+ # Calculate ETA
106
+ eta_seconds = self._calculate_eta()
107
+ progress_pct = self.processed_frames / self.total_frames if self.total_frames > 0 else 0
108
+
109
+ # Update statistics
110
+ self._update_statistics(current_fps, memory_usage_mb)
111
+
112
+ # Create snapshot if performance tracking is enabled
113
+ if self.track_performance:
114
+ snapshot = ProgressSnapshot(
115
+ timestamp=current_time,
116
+ frame_number=frame_number,
117
+ stage=self.current_stage,
118
+ fps=current_fps,
119
+ memory_usage_mb=memory_usage_mb,
120
+ custom_metrics=custom_metrics or {}
121
+ )
122
+ self.snapshots.append(snapshot)
123
+
124
+ # Generate progress message
125
+ message = self._generate_progress_message(
126
+ elapsed_time, current_fps, eta_seconds, stage
127
+ )
128
+
129
+ # Call progress callback
130
+ if self.callback:
131
+ try:
132
+ self.callback(progress_pct, message)
133
+ except Exception as e:
134
+ logger.warning(f"Progress callback failed: {e}")
135
+
136
+ # Log detailed progress periodically
137
+ if frame_number % 50 == 0 or frame_number == self.total_frames:
138
+ self._log_detailed_progress(progress_pct, current_fps, eta_seconds)
139
+
140
+ def _calculate_current_fps(self) -> float:
141
+ """Calculate current FPS based on recent frame times"""
142
+ if not self.frame_times:
143
+ return 0.0
144
+
145
+ # Use average of recent frame times for stability
146
+ recent_frame_times = list(self.frame_times)[-10:] # Last 10 frames
147
+ avg_frame_time = sum(recent_frame_times) / len(recent_frame_times)
148
+
149
+ return 1.0 / avg_frame_time if avg_frame_time > 0 else 0.0
150
+
151
+ def _update_smoothed_fps(self, current_fps: float):
152
+ """Update smoothed FPS using exponential smoothing"""
153
+ if self.smoothed_fps == 0.0:
154
+ self.smoothed_fps = current_fps
155
+ else:
156
+ self.smoothed_fps = (
157
+ self.eta_smoothing_factor * current_fps +
158
+ (1 - self.eta_smoothing_factor) * self.smoothed_fps
159
+ )
160
+
161
+ def _calculate_eta(self) -> float:
162
+ """Calculate estimated time to completion"""
163
+ if self.processed_frames <= 0 or self.smoothed_fps <= 0:
164
+ return 0.0
165
+
166
+ remaining_frames = self.total_frames - self.processed_frames
167
+ return remaining_frames / self.smoothed_fps
168
+
169
+ def _update_statistics(self, current_fps: float, memory_usage_mb: Optional[float]):
170
+ """Update comprehensive statistics"""
171
+ current_time = time.time()
172
+ self.stats['total_processing_time'] = current_time - self.start_time
173
+
174
+ # FPS statistics
175
+ if self.fps_history:
176
+ fps_list = list(self.fps_history)
177
+ self.stats['average_fps'] = sum(fps_list) / len(fps_list)
178
+ self.stats['peak_fps'] = max(fps_list)
179
+ self.stats['slowest_fps'] = min(fps_list)
180
+
181
+ # Calculate variance
182
+ avg_fps = self.stats['average_fps']
183
+ variance = sum((fps - avg_fps) ** 2 for fps in fps_list) / len(fps_list)
184
+ self.stats['frames_per_second_variance'] = variance
185
+
186
+ # Memory tracking
187
+ if memory_usage_mb and memory_usage_mb > self.stats['memory_peak_mb']:
188
+ self.stats['memory_peak_mb'] = memory_usage_mb
189
+
190
+ def _complete_stage(self):
191
+ """Complete the current stage and record its duration"""
192
+ if self.current_stage:
193
+ stage_duration = time.time() - self.stage_start_time
194
+ self.stats['stage_times'][self.current_stage] = stage_duration
195
+ self.stages_completed.append({
196
+ 'stage': self.current_stage,
197
+ 'duration': stage_duration,
198
+ 'frames_processed': self.processed_frames
199
+ })
200
+ logger.debug(f"Completed stage '{self.current_stage}' in {stage_duration:.2f}s")
201
+
202
+ def _generate_progress_message(self, elapsed_time: float, current_fps: float,
203
+ eta_seconds: float, stage: str) -> str:
204
+ """Generate comprehensive progress message"""
205
+ # Base progress info
206
+ message = (
207
+ f"Frame {self.processed_frames}/{self.total_frames} | "
208
+ f"Elapsed: {self._format_time(elapsed_time)} | "
209
+ f"Speed: {current_fps:.1f} fps"
210
+ )
211
+
212
+ # Add ETA if meaningful
213
+ if eta_seconds > 0:
214
+ message += f" | ETA: {self._format_time(eta_seconds)}"
215
+
216
+ # Add stage information
217
+ if stage:
218
+ message = f"{stage} | {message}"
219
+
220
+ # Add performance indicators
221
+ if self.fps_history and len(self.fps_history) >= 10:
222
+ recent_avg = sum(list(self.fps_history)[-10:]) / 10
223
+ if abs(current_fps - recent_avg) / recent_avg > 0.2: # 20% difference
224
+ trend = "↗" if current_fps > recent_avg else "↘"
225
+ message += f" {trend}"
226
+
227
+ return message
228
+
229
+ def _format_time(self, seconds: float) -> str:
230
+ """Format time duration in human-readable format"""
231
+ if seconds < 60:
232
+ return f"{int(seconds)}s"
233
+ elif seconds < 3600:
234
+ minutes = int(seconds // 60)
235
+ secs = int(seconds % 60)
236
+ return f"{minutes}m {secs}s"
237
+ else:
238
+ hours = int(seconds // 3600)
239
+ minutes = int((seconds % 3600) // 60)
240
+ return f"{hours}h {minutes}m"
241
+
242
+ def _log_detailed_progress(self, progress_pct: float, current_fps: float, eta_seconds: float):
243
+ """Log detailed progress information"""
244
+ logger.info(
245
+ f"Progress: {progress_pct*100:.1f}% | "
246
+ f"FPS: {current_fps:.1f} (avg: {self.stats['average_fps']:.1f}) | "
247
+ f"ETA: {self._format_time(eta_seconds)} | "
248
+ f"Stage: {self.current_stage}"
249
+ )
250
+
251
+ def set_stage(self, stage: str):
252
+ """Manually set the current processing stage"""
253
+ if stage != self.current_stage:
254
+ self._complete_stage()
255
+ self.current_stage = stage
256
+ self.stage_start_time = time.time()
257
+ logger.debug(f"Stage changed to: {stage}")
258
+
259
+ def add_custom_metric(self, key: str, value: Any):
260
+ """Add a custom metric to the current snapshot"""
261
+ if self.snapshots:
262
+ self.snapshots[-1].custom_metrics[key] = value
263
+
264
+ def get_performance_summary(self) -> Dict[str, Any]:
265
+ """Get comprehensive performance summary"""
266
+ self._complete_stage() # Complete current stage
267
+
268
+ total_time = time.time() - self.start_time
269
+
270
+ summary = {
271
+ 'total_frames': self.total_frames,
272
+ 'processed_frames': self.processed_frames,
273
+ 'completion_percentage': (self.processed_frames / self.total_frames * 100) if self.total_frames > 0 else 0,
274
+ 'total_processing_time': total_time,
275
+ 'overall_fps': self.processed_frames / total_time if total_time > 0 else 0,
276
+ 'stages_completed': len(self.stages_completed),
277
+ 'current_stage': self.current_stage,
278
+ 'statistics': self.stats.copy(),
279
+ 'stage_breakdown': self.stages_completed.copy()
280
+ }
281
+
282
+ # Calculate stage percentages
283
+ if self.stats['stage_times']:
284
+ total_stage_time = sum(self.stats['stage_times'].values())
285
+ summary['stage_percentages'] = {
286
+ stage: (duration / total_stage_time * 100)
287
+ for stage, duration in self.stats['stage_times'].items()
288
+ }
289
+
290
+ # Performance analysis
291
+ if self.fps_history:
292
+ fps_list = list(self.fps_history)
293
+ summary['performance_analysis'] = {
294
+ 'fps_stability': self._calculate_fps_stability(),
295
+ 'performance_trend': self._analyze_performance_trend(),
296
+ 'bottleneck_detection': self._detect_bottlenecks()
297
+ }
298
+
299
+ return summary
300
+
301
+ def _calculate_fps_stability(self) -> str:
302
+ """Analyze FPS stability"""
303
+ if not self.fps_history or len(self.fps_history) < 10:
304
+ return "insufficient_data"
305
+
306
+ variance = self.stats['frames_per_second_variance']
307
+ avg_fps = self.stats['average_fps']
308
+
309
+ if avg_fps == 0:
310
+ return "unstable"
311
+
312
+ coefficient_of_variation = (variance ** 0.5) / avg_fps
313
+
314
+ if coefficient_of_variation < 0.1:
315
+ return "very_stable"
316
+ elif coefficient_of_variation < 0.2:
317
+ return "stable"
318
+ elif coefficient_of_variation < 0.4:
319
+ return "moderate"
320
+ else:
321
+ return "unstable"
322
+
323
+ def _analyze_performance_trend(self) -> str:
324
+ """Analyze performance trend over time"""
325
+ if len(self.fps_history) < 20:
326
+ return "insufficient_data"
327
+
328
+ # Compare first and last quartiles
329
+ fps_list = list(self.fps_history)
330
+ quartile_size = len(fps_list) // 4
331
+
332
+ first_quartile_avg = sum(fps_list[:quartile_size]) / quartile_size
333
+ last_quartile_avg = sum(fps_list[-quartile_size:]) / quartile_size
334
+
335
+ change_percent = ((last_quartile_avg - first_quartile_avg) / first_quartile_avg) * 100
336
+
337
+ if change_percent > 10:
338
+ return "improving"
339
+ elif change_percent < -10:
340
+ return "degrading"
341
+ else:
342
+ return "stable"
343
+
344
+ def _detect_bottlenecks(self) -> List[str]:
345
+ """Detect potential performance bottlenecks"""
346
+ bottlenecks = []
347
+
348
+ # Check for consistently low FPS
349
+ if self.stats['average_fps'] < 0.5:
350
+ bottlenecks.append("very_low_fps")
351
+
352
+ # Check for high variance
353
+ if self.stats['frames_per_second_variance'] > (self.stats['average_fps'] * 0.5) ** 2:
354
+ bottlenecks.append("inconsistent_performance")
355
+
356
+ # Check for memory pressure (if tracked)
357
+ if self.stats['memory_peak_mb'] > 8000: # 8GB
358
+ bottlenecks.append("high_memory_usage")
359
+
360
+ # Check stage timing imbalances
361
+ if self.stats['stage_times']:
362
+ stage_times = list(self.stats['stage_times'].values())
363
+ max_time = max(stage_times)
364
+ avg_time = sum(stage_times) / len(stage_times)
365
+
366
+ if max_time > avg_time * 3:
367
+ bottlenecks.append("stage_imbalance")
368
+
369
+ return bottlenecks
370
+
371
+ def export_performance_data(self) -> Dict[str, Any]:
372
+ """Export detailed performance data for analysis"""
373
+ return {
374
+ 'metadata': {
375
+ 'total_frames': self.total_frames,
376
+ 'tracking_enabled': self.track_performance,
377
+ 'start_time': self.start_time,
378
+ 'export_time': time.time()
379
+ },
380
+ 'snapshots': [
381
+ {
382
+ 'timestamp': snap.timestamp,
383
+ 'frame_number': snap.frame_number,
384
+ 'stage': snap.stage,
385
+ 'fps': snap.fps,
386
+ 'memory_usage_mb': snap.memory_usage_mb,
387
+ 'custom_metrics': snap.custom_metrics
388
+ }
389
+ for snap in self.snapshots
390
+ ],
391
+ 'statistics': self.stats,
392
+ 'stages': self.stages_completed,
393
+ 'performance_summary': self.get_performance_summary()
394
+ }
395
+
396
+ def reset(self, new_total_frames: Optional[int] = None):
397
+ """Reset tracker for new processing session"""
398
+ if new_total_frames is not None:
399
+ self.total_frames = new_total_frames
400
+
401
+ self.start_time = time.time()
402
+ self.last_update_time = self.start_time
403
+ self.processed_frames = 0
404
+ self.frame_times.clear()
405
+ self.fps_history.clear()
406
+ self.snapshots.clear()
407
+ self.current_stage = "initializing"
408
+ self.stage_start_time = self.start_time
409
+ self.stages_completed.clear()
410
+ self.smoothed_fps = 0.0
411
+
412
+ # Reset statistics
413
+ self.stats = {
414
+ 'total_processing_time': 0.0,
415
+ 'average_fps': 0.0,
416
+ 'peak_fps': 0.0,
417
+ 'slowest_fps': float('inf'),
418
+ 'frames_per_second_variance': 0.0,
419
+ 'estimated_completion_accuracy': 0.0,
420
+ 'stage_times': {},
421
+ 'memory_peak_mb': 0.0
422
+ }
423
+
424
+ logger.debug("ProgressTracker reset")
425
+
426
+ def finalize(self) -> Dict[str, Any]:
427
+ """Finalize tracking and return comprehensive results"""
428
+ self._complete_stage()
429
+ final_summary = self.get_performance_summary()
430
+
431
+ logger.info(
432
+ f"Processing completed: {self.processed_frames}/{self.total_frames} frames "
433
+ f"in {self._format_time(final_summary['total_processing_time'])} "
434
+ f"(avg: {final_summary['overall_fps']:.1f} fps)"
435
+ )
436
+
437
+ return final_summary