mirage / metrics.py
MacBook pro
fix(metrics): correct indentation in Metrics.__init__ causing IndentationError
69bb7ad
from __future__ import annotations
import threading
import time
from collections import deque
from typing import Deque, Dict, Any
class Metrics:
"""Simple in-process metrics accumulator with lightweight locking.
Tracks audio/video counts, sizes, EMAs, and rolling FPS.
"""
def __init__(self, fps_window: int = 30, ema_alpha: float = 0.2) -> None:
self._lock = threading.Lock()
# Audio counters
self.audio_chunks = 0
self.audio_bytes = 0
self.audio_avg_chunk_size = 0.0
self.audio_loop_interval_ema = 0.0 # ms between loop ticks (if fed)
self.audio_infer_time_ema = 0.0 # ms processing (placeholder for future inference)
self._last_audio_ts = None # type: ignore
# Video counters
self.video_frames = 0
self.video_bytes = 0
self.video_avg_frame_size = 0.0
self._fps_window = fps_window
self._frame_times: Deque[float] = deque(maxlen=fps_window)
self.video_frame_interval_ema = 0.0
self._last_video_ts = None # type: ignore
self.ema_alpha = ema_alpha
# ---------------- Audio -----------------
def record_audio_chunk(self, size_bytes: int, loop_interval_ms: float | None = None, infer_time_ms: float | None = None) -> None:
with self._lock:
self.audio_chunks += 1
self.audio_bytes += size_bytes
# Running average chunk size
self.audio_avg_chunk_size = ((self.audio_avg_chunk_size * (self.audio_chunks - 1)) + size_bytes) / self.audio_chunks
now = time.time() * 1000.0
if loop_interval_ms is None and self._last_audio_ts is not None:
loop_interval_ms = now - self._last_audio_ts
if loop_interval_ms is not None:
if self.audio_loop_interval_ema == 0.0:
self.audio_loop_interval_ema = loop_interval_ms
else:
self.audio_loop_interval_ema = (self.ema_alpha * loop_interval_ms) + (1 - self.ema_alpha) * self.audio_loop_interval_ema
self._last_audio_ts = now
if infer_time_ms is not None:
if self.audio_infer_time_ema == 0.0:
self.audio_infer_time_ema = infer_time_ms
else:
self.audio_infer_time_ema = (self.ema_alpha * infer_time_ms) + (1 - self.ema_alpha) * self.audio_infer_time_ema
# ---------------- Video -----------------
def record_video_frame(self, size_bytes: int) -> None:
with self._lock:
self.video_frames += 1
self.video_bytes += size_bytes
self.video_avg_frame_size = ((self.video_avg_frame_size * (self.video_frames - 1)) + size_bytes) / self.video_frames
now = time.time()
self._frame_times.append(now)
# Frame interval EMA (ms)
if self._last_video_ts is not None:
interval_ms = (now - self._last_video_ts) * 1000.0
if self.video_frame_interval_ema == 0.0:
self.video_frame_interval_ema = interval_ms
else:
self.video_frame_interval_ema = (self.ema_alpha * interval_ms) + (1 - self.ema_alpha) * self.video_frame_interval_ema
self._last_video_ts = now
# --------------- Report ------------------
def snapshot(self) -> Dict[str, Any]:
with self._lock:
fps = 0.0
if len(self._frame_times) > 1:
span = self._frame_times[-1] - self._frame_times[0]
if span > 0:
fps = (len(self._frame_times) - 1) / span
return {
"audio_chunks": self.audio_chunks,
"audio_bytes": self.audio_bytes,
"audio_avg_chunk_size": self.audio_avg_chunk_size,
"audio_loop_interval_ema_ms": self.audio_loop_interval_ema,
"audio_infer_time_ema_ms": self.audio_infer_time_ema,
"video_frames": self.video_frames,
"video_bytes": self.video_bytes,
"video_avg_frame_size": self.video_avg_frame_size,
"video_fps_rolling": fps,
"video_frame_interval_ema_ms": self.video_frame_interval_ema,
"fps_window": self._fps_window,
}
# Global singleton for simple use
metrics = Metrics()