api / backend /services /data_flow_monitor.py
safraeli's picture
Fix HEAD support for UptimeRobot, email alerter bug, update docs
063a7cd verified
"""
Data flow monitor — checks IMS and ThingsBoard data freshness.
Returns per-source status (green / yellow / red) with age and messages.
Used by the /api/health/data-sources endpoint and the email alerter.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
from config.settings import (
IMS_STALE_YELLOW_MIN,
IMS_STALE_RED_MIN,
TB_STALE_YELLOW_MIN,
TB_STALE_RED_MIN,
ENERGY_STALE_YELLOW_MIN,
ENERGY_STALE_RED_MIN,
)
from src.data.data_providers import DataHub
log = logging.getLogger("solarwine.monitor")
def _classify(age_minutes: float | None, yellow: float, red: float) -> str:
"""Return green / yellow / red based on age thresholds."""
if age_minutes is None:
return "red"
if age_minutes < yellow:
return "green"
if age_minutes < red:
return "yellow"
return "red"
def _status_message(source: str, status: str, age: float | None) -> str:
if status == "green":
return f"{source} data is fresh"
if age is not None:
return f"{source} data is {age:.0f} min old"
return f"{source} data is unavailable"
class DataFlowMonitor:
"""Computes per-source data flow status."""
def check_all(self, hub: DataHub) -> dict[str, Any]:
now_iso = datetime.now(timezone.utc).isoformat()
sources: dict[str, dict] = {}
# --- IMS Weather ---
sources["ims_weather"] = self._check_ims(hub)
# --- ThingsBoard Sensors ---
sources["tb_sensors"] = self._check_tb_sensors(hub)
# --- ThingsBoard Energy ---
sources["tb_energy"] = self._check_tb_energy(hub)
# Overall status: worst of all sources
statuses = [s["status"] for s in sources.values()]
if "red" in statuses:
overall = "red"
elif "yellow" in statuses:
overall = "yellow"
else:
overall = "green"
return {
"overall": overall,
"checked_at": now_iso,
"sources": sources,
}
def _check_ims(self, hub: DataHub) -> dict:
try:
wx = hub.weather.get_current()
if wx and "error" not in wx:
age = float(wx.get("age_minutes", -1))
if age < 0:
log.warning("IMS weather returned negative age_minutes — data provider issue")
age = None
status = _classify(age, IMS_STALE_YELLOW_MIN, IMS_STALE_RED_MIN)
return {
"status": status,
"age_minutes": round(age, 1) if age is not None else None,
"last_reading": wx.get("timestamp_local") or wx.get("timestamp_utc"),
"message": _status_message("IMS weather", status, age),
}
return {
"status": "red",
"age_minutes": None,
"last_reading": None,
"message": f"IMS weather error: {wx.get('error', 'unavailable')}",
}
except Exception as exc:
log.warning("IMS health check failed: %s", exc)
return {
"status": "red",
"age_minutes": None,
"last_reading": None,
"message": f"IMS weather unreachable: {exc}",
}
def _check_tb_sensors(self, hub: DataHub) -> dict:
try:
snap = hub.vine_sensors.get_snapshot(light=True)
if snap and "error" not in snap:
stale = snap.get("staleness_minutes")
age = float(stale) if stale is not None else None
status = _classify(age, TB_STALE_YELLOW_MIN, TB_STALE_RED_MIN)
return {
"status": status,
"age_minutes": round(age, 1) if age is not None else None,
"last_reading": snap.get("timestamp"),
"message": _status_message("ThingsBoard sensors", status, age),
}
return {
"status": "red",
"age_minutes": None,
"last_reading": None,
"message": f"TB sensors error: {snap.get('error', 'unavailable')}",
}
except Exception as exc:
log.warning("TB sensors health check failed: %s", exc)
return {
"status": "red",
"age_minutes": None,
"last_reading": None,
"message": f"TB sensors unreachable: {exc}",
}
def _check_tb_energy(self, hub: DataHub) -> dict:
try:
en = hub.energy.get_current()
if en and "error" not in en:
power = en.get("power_kw")
# Energy doesn't always expose age_minutes — infer from cache TTL
age = float(en["age_minutes"]) if "age_minutes" in en else None
status = _classify(age, ENERGY_STALE_YELLOW_MIN, ENERGY_STALE_RED_MIN)
if age is None:
# If no age but we have data, assume green (just fetched)
status = "green"
return {
"status": status,
"age_minutes": round(age, 1) if age is not None else None,
"power_kw": round(float(power), 2) if power is not None else None,
"message": _status_message("Energy telemetry", status, age),
}
return {
"status": "red",
"age_minutes": None,
"power_kw": None,
"message": f"Energy error: {en.get('error', 'unavailable')}",
}
except Exception as exc:
log.warning("Energy health check failed: %s", exc)
return {
"status": "red",
"age_minutes": None,
"power_kw": None,
"message": f"Energy unreachable: {exc}",
}