safraeli commited on
Commit
9d09bc8
Β·
verified Β·
1 Parent(s): 0143c6f

Update src/data/data_providers.py: circuit breaker, weather fix, TB health fix

Browse files
Files changed (1) hide show
  1. src/data/data_providers.py +56 -3
src/data/data_providers.py CHANGED
@@ -50,6 +50,46 @@ import numpy as np
50
  import pandas as pd
51
 
52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
  # ═══════════════════════════════════════════════════════════════════════
54
  # TTL Cache helper
55
  # ═══════════════════════════════════════════════════════════════════════
@@ -104,9 +144,9 @@ class TTLCache:
104
  return None
105
 
106
  def set(self, key: str, value: Any) -> None:
107
- # Write to Redis if available
108
  redis = self._get_redis()
109
- if redis:
110
  redis.set_json(self._rkey(key), value, ttl=int(self.ttl))
111
  # Always write in-memory too (local fast path)
112
  self._store[key] = _CacheEntry(value=value, expires_at=time.monotonic() + self.ttl)
@@ -204,7 +244,10 @@ class WeatherService(BaseService):
204
  def _load_df(self) -> pd.DataFrame:
205
  cached = self._df_cache.get("ims")
206
  if cached is not None:
207
- return cached
 
 
 
208
  df = self._client().load_cached()
209
  if not df.empty:
210
  self._df_cache.set("ims", df)
@@ -292,6 +335,7 @@ class VineSensorService(BaseService):
292
  def __init__(self, tb_client: Any = None, snapshot_ttl: float = 300):
293
  self._tb = tb_client # lazy
294
  self._snap_cache = TTLCache(ttl_seconds=snapshot_ttl, redis_prefix="vine:")
 
295
 
296
  def _client(self):
297
  if self._tb is None:
@@ -316,12 +360,16 @@ class VineSensorService(BaseService):
316
  cached = self._snap_cache.get(cache_key)
317
  if cached is not None:
318
  return cached
 
 
319
  try:
320
  snapshot = self._client().get_vine_snapshot(light=light, mode=mode)
321
  result = snapshot.to_dict()
322
  self._snap_cache.set(cache_key, result)
 
323
  return result
324
  except Exception as exc:
 
325
  return {
326
  "error": f"ThingsBoard unavailable: {exc}",
327
  "hint": "Check THINGSBOARD_USERNAME/PASSWORD in .env",
@@ -823,6 +871,7 @@ class EnergyService(BaseService):
823
 
824
  def __init__(self, tb_client: Any = None):
825
  self._tb = tb_client
 
826
 
827
  def _client(self):
828
  if self._tb is None:
@@ -836,14 +885,18 @@ class EnergyService(BaseService):
836
 
837
  def get_current(self) -> Dict[str, Any]:
838
  """Latest power reading from the Plant asset."""
 
 
839
  try:
840
  vals = self._client().get_asset_latest("Plant", ["power", "production"])
841
  power_w = vals.get("power")
 
842
  return {
843
  "power_kw": round(power_w / 1000, 1) if power_w else None,
844
  "source": "ThingsBoard Plant asset",
845
  }
846
  except Exception as exc:
 
847
  return {"error": f"Energy current failed: {exc}"}
848
 
849
  def get_daily_production(self, target_date: Optional[str] = None) -> Dict[str, Any]:
 
50
  import pandas as pd
51
 
52
 
53
+ # ═══════════════════════════════════════════════════════════════════════
54
+ # Circuit breaker β€” fail-open to cached data when external services down
55
+ # ═══════════════════════════════════════════════════════════════════════
56
+
57
+ class CircuitBreaker:
58
+ """Simple circuit breaker: after `threshold` consecutive failures within
59
+ `window_sec`, the circuit opens and calls are short-circuited for
60
+ `cooldown_sec` before retrying.
61
+ """
62
+
63
+ def __init__(self, threshold: int = 3, cooldown_sec: float = 300, window_sec: float = 60):
64
+ self.threshold = threshold
65
+ self.cooldown_sec = cooldown_sec
66
+ self.window_sec = window_sec
67
+ self._failures: list[float] = []
68
+ self._opened_at: float | None = None
69
+
70
+ @property
71
+ def is_open(self) -> bool:
72
+ if self._opened_at is None:
73
+ return False
74
+ if time.monotonic() - self._opened_at > self.cooldown_sec:
75
+ # Cooldown expired β€” allow retry (half-open)
76
+ self._opened_at = None
77
+ self._failures.clear()
78
+ return False
79
+ return True
80
+
81
+ def record_success(self) -> None:
82
+ self._failures.clear()
83
+ self._opened_at = None
84
+
85
+ def record_failure(self) -> None:
86
+ now = time.monotonic()
87
+ self._failures = [t for t in self._failures if now - t < self.window_sec]
88
+ self._failures.append(now)
89
+ if len(self._failures) >= self.threshold:
90
+ self._opened_at = now
91
+
92
+
93
  # ═══════════════════════════════════════════════════════════════════════
94
  # TTL Cache helper
95
  # ═══════════════════════════════════════════════════════════════════════
 
144
  return None
145
 
146
  def set(self, key: str, value: Any) -> None:
147
+ # Write to Redis if available (skip DataFrames β€” too large for JSON serialisation)
148
  redis = self._get_redis()
149
+ if redis and not isinstance(value, pd.DataFrame):
150
  redis.set_json(self._rkey(key), value, ttl=int(self.ttl))
151
  # Always write in-memory too (local fast path)
152
  self._store[key] = _CacheEntry(value=value, expires_at=time.monotonic() + self.ttl)
 
244
  def _load_df(self) -> pd.DataFrame:
245
  cached = self._df_cache.get("ims")
246
  if cached is not None:
247
+ # Redis may deserialise as dict/list β€” only accept DataFrames
248
+ if isinstance(cached, pd.DataFrame):
249
+ return cached
250
+ # Discard stale non-DataFrame from Redis and reload from CSV
251
  df = self._client().load_cached()
252
  if not df.empty:
253
  self._df_cache.set("ims", df)
 
335
  def __init__(self, tb_client: Any = None, snapshot_ttl: float = 300):
336
  self._tb = tb_client # lazy
337
  self._snap_cache = TTLCache(ttl_seconds=snapshot_ttl, redis_prefix="vine:")
338
+ self._breaker = CircuitBreaker(threshold=3, cooldown_sec=300)
339
 
340
  def _client(self):
341
  if self._tb is None:
 
360
  cached = self._snap_cache.get(cache_key)
361
  if cached is not None:
362
  return cached
363
+ if self._breaker.is_open:
364
+ return {"error": "ThingsBoard circuit breaker open β€” retrying in 5 min", "cached": True}
365
  try:
366
  snapshot = self._client().get_vine_snapshot(light=light, mode=mode)
367
  result = snapshot.to_dict()
368
  self._snap_cache.set(cache_key, result)
369
+ self._breaker.record_success()
370
  return result
371
  except Exception as exc:
372
+ self._breaker.record_failure()
373
  return {
374
  "error": f"ThingsBoard unavailable: {exc}",
375
  "hint": "Check THINGSBOARD_USERNAME/PASSWORD in .env",
 
871
 
872
  def __init__(self, tb_client: Any = None):
873
  self._tb = tb_client
874
+ self._breaker = CircuitBreaker(threshold=3, cooldown_sec=300)
875
 
876
  def _client(self):
877
  if self._tb is None:
 
885
 
886
  def get_current(self) -> Dict[str, Any]:
887
  """Latest power reading from the Plant asset."""
888
+ if self._breaker.is_open:
889
+ return {"error": "ThingsBoard circuit breaker open β€” retrying in 5 min"}
890
  try:
891
  vals = self._client().get_asset_latest("Plant", ["power", "production"])
892
  power_w = vals.get("power")
893
+ self._breaker.record_success()
894
  return {
895
  "power_kw": round(power_w / 1000, 1) if power_w else None,
896
  "source": "ThingsBoard Plant asset",
897
  }
898
  except Exception as exc:
899
+ self._breaker.record_failure()
900
  return {"error": f"Energy current failed: {exc}"}
901
 
902
  def get_daily_production(self, target_date: Optional[str] = None) -> Dict[str, Any]: