Eli Safra commited on
Commit
271a242
Β·
1 Parent(s): 609c2d6

SSE live updates, route cleanup, advisor briefing

Browse files

- Add EventBus + SSE endpoint for push-based data updates
- Route all sensor/control/biology data through DataHub services
- Remove all direct ThingsBoard calls from route handlers
- Add live status briefing to chatbot advisor

backend/api/events.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ In-process event bus for SSE-based live updates.
3
+
4
+ Uses asyncio primitives only β€” no Redis, no external dependencies.
5
+ Works within a single uvicorn process (which is what HF Spaces runs).
6
+ """
7
+
8
+ from __future__ import annotations
9
+
10
+ import asyncio
11
+ import logging
12
+ from dataclasses import dataclass, field
13
+ from typing import Dict
14
+
15
+ log = logging.getLogger("solarwine.events")
16
+
17
+
18
+ @dataclass
19
+ class EventBus:
20
+ """Lightweight publish/subscribe within one async process.
21
+
22
+ Publishers call ``notify(topic)`` to bump a version counter.
23
+ SSE consumers call ``wait()`` to block until any topic changes.
24
+
25
+ Uses an asyncio.Condition to avoid the set/clear race on asyncio.Event.
26
+ """
27
+
28
+ _versions: Dict[str, int] = field(default_factory=dict)
29
+ _condition: asyncio.Condition = field(default_factory=asyncio.Condition)
30
+
31
+ async def notify(self, topic: str) -> None:
32
+ async with self._condition:
33
+ self._versions[topic] = self._versions.get(topic, 0) + 1
34
+ self._condition.notify_all()
35
+ log.debug("EventBus: %s β†’ v%d", topic, self._versions[topic])
36
+
37
+ async def wait(self, timeout: float = 30) -> Dict[str, int]:
38
+ """Block until a notification arrives or *timeout* seconds elapse."""
39
+ try:
40
+ async with self._condition:
41
+ await asyncio.wait_for(self._condition.wait(), timeout)
42
+ except asyncio.TimeoutError:
43
+ pass
44
+ return dict(self._versions)
45
+
46
+ @property
47
+ def versions(self) -> Dict[str, int]:
48
+ return dict(self._versions)
49
+
50
+
51
+ # Module-level singleton β€” imported by routes and data_providers
52
+ event_bus = EventBus()
backend/api/main.py CHANGED
@@ -19,7 +19,7 @@ from slowapi import Limiter, _rate_limit_exceeded_handler
19
  from slowapi.errors import RateLimitExceeded
20
  from slowapi.util import get_remote_address
21
 
22
- from backend.api.routes import health, weather, sensors, energy, photosynthesis, control, chatbot, biology, login
23
 
24
  # ---------------------------------------------------------------------------
25
  # Structured logging
@@ -82,6 +82,7 @@ async def _ims_refresh_loop(interval_sec: int = 6 * 3600):
82
  picks up fresh data.
83
  """
84
  import asyncio
 
85
  await asyncio.sleep(10) # let the server finish starting
86
  while True:
87
  try:
@@ -106,11 +107,45 @@ async def _ims_refresh_loop(interval_sec: int = 6 * 3600):
106
  hub.weather._df_cache._store.clear()
107
  except Exception:
108
  pass
 
109
  except Exception as exc:
110
  log.error("IMS refresh failed: %s", exc)
111
  await asyncio.sleep(interval_sec)
112
 
113
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
  @asynccontextmanager
115
  async def lifespan(app: FastAPI):
116
  global _start_time
@@ -124,12 +159,14 @@ async def lifespan(app: FastAPI):
124
  if not os.environ.get("ADMIN_PASSWORD"):
125
  log.warning("ADMIN_PASSWORD not set β€” login endpoint disabled")
126
 
127
- # Start background IMS data refresh (every 6 hours)
128
  import asyncio
129
  ims_task = asyncio.create_task(_ims_refresh_loop())
 
130
 
131
  yield
132
  ims_task.cancel()
 
133
  log.info("SolarWine API shutting down (uptime=%.0fs)", get_uptime())
134
 
135
 
@@ -202,3 +239,4 @@ app.include_router(control.router, prefix="/api/control", tags=["control"])
202
  app.include_router(chatbot.router, prefix="/api/chatbot", tags=["chatbot"])
203
  app.include_router(biology.router, prefix="/api/biology", tags=["biology"])
204
  app.include_router(login.router, prefix="/api/auth", tags=["auth"])
 
 
19
  from slowapi.errors import RateLimitExceeded
20
  from slowapi.util import get_remote_address
21
 
22
+ from backend.api.routes import health, weather, sensors, energy, photosynthesis, control, chatbot, biology, login, events
23
 
24
  # ---------------------------------------------------------------------------
25
  # Structured logging
 
82
  picks up fresh data.
83
  """
84
  import asyncio
85
+ from backend.api.events import event_bus
86
  await asyncio.sleep(10) # let the server finish starting
87
  while True:
88
  try:
 
107
  hub.weather._df_cache._store.clear()
108
  except Exception:
109
  pass
110
+ await event_bus.notify("weather")
111
  except Exception as exc:
112
  log.error("IMS refresh failed: %s", exc)
113
  await asyncio.sleep(interval_sec)
114
 
115
 
116
+ async def _sensor_refresh_loop(interval_sec: int = 120):
117
+ """Background loop: poll ThingsBoard sensors every `interval_sec` seconds.
118
+
119
+ Refreshes the VineSensorService and EnergyService caches, then
120
+ notifies SSE clients so they refetch immediately.
121
+ """
122
+ import asyncio
123
+ from backend.api.events import event_bus
124
+ await asyncio.sleep(15) # let startup finish
125
+ while True:
126
+ try:
127
+ from backend.api.deps import get_datahub
128
+ hub = get_datahub()
129
+ # Refresh sensor snapshot β€” invalidate all cached variants
130
+ for key in ("snap", "snap_light", "dashboard"):
131
+ hub.vine_sensors._snap_cache.invalidate(key)
132
+ snapshot = hub.vine_sensors.get_snapshot()
133
+ if snapshot and "error" not in snapshot:
134
+ await event_bus.notify("sensors")
135
+ # Photosynthesis depends on sensor data
136
+ await event_bus.notify("photosynthesis")
137
+ log.debug("Sensor snapshot refreshed")
138
+ # Refresh energy current
139
+ hub.energy._current_cache.invalidate("current")
140
+ current = hub.energy.get_current()
141
+ if current and "error" not in current:
142
+ await event_bus.notify("energy")
143
+ log.debug("Energy current refreshed")
144
+ except Exception as exc:
145
+ log.error("Sensor refresh failed: %s", exc)
146
+ await asyncio.sleep(interval_sec)
147
+
148
+
149
  @asynccontextmanager
150
  async def lifespan(app: FastAPI):
151
  global _start_time
 
159
  if not os.environ.get("ADMIN_PASSWORD"):
160
  log.warning("ADMIN_PASSWORD not set β€” login endpoint disabled")
161
 
162
+ # Start background refresh loops
163
  import asyncio
164
  ims_task = asyncio.create_task(_ims_refresh_loop())
165
+ sensor_task = asyncio.create_task(_sensor_refresh_loop())
166
 
167
  yield
168
  ims_task.cancel()
169
+ sensor_task.cancel()
170
  log.info("SolarWine API shutting down (uptime=%.0fs)", get_uptime())
171
 
172
 
 
239
  app.include_router(chatbot.router, prefix="/api/chatbot", tags=["chatbot"])
240
  app.include_router(biology.router, prefix="/api/biology", tags=["biology"])
241
  app.include_router(login.router, prefix="/api/auth", tags=["auth"])
242
+ app.include_router(events.router, prefix="/api/events", tags=["events"])
backend/api/routes/biology.py CHANGED
@@ -48,90 +48,5 @@ async def biology_chill_units(
48
  season_start: str = Query("2025-11-01", description="Season start (YYYY-MM-DD)"),
49
  hub: DataHub = Depends(get_datahub),
50
  ):
51
- """Accumulated chill units from ThingsBoard Air1 on-site temperature (Utah model).
52
-
53
- Model (Richardson et al. 1974):
54
- T <= 7Β°C β†’ +1.0 CU/hour
55
- 7 < T <= 10 β†’ +0.5
56
- 10 < T <= 18 β†’ 0.0
57
- T > 18 β†’ -1.0
58
- Daily totals clipped at 0 (no negative daily chill).
59
- Season cumulative = running sum of daily CU from season_start.
60
-
61
- Two series:
62
- - open_field: on-site Air1 temperature
63
- - under_panels: open_field Γ— 1.1 (panels buffer nighttime β†’ more chill)
64
- Per Research/chill_hours/ANALYSIS_EXPLAINED.md.
65
- """
66
- import numpy as np
67
- import pandas as pd
68
-
69
- PANEL_MULTIPLIER = 1.1
70
-
71
- try:
72
- from src.data.thingsboard_client import ThingsBoardClient
73
- from zoneinfo import ZoneInfo
74
-
75
- client = ThingsBoardClient()
76
- tz = ZoneInfo("Asia/Jerusalem")
77
- start = pd.Timestamp(season_start, tz="UTC")
78
- end = pd.Timestamp.now(tz="UTC")
79
-
80
- # Fetch Air1 temperature in 7-day chunks (TB rejects large ranges)
81
- chunks = []
82
- cursor = start
83
- while cursor < end:
84
- chunk_end = min(cursor + pd.Timedelta(days=7), end)
85
- try:
86
- df = client.get_timeseries(
87
- "Air1", ["airTemperature"],
88
- start=cursor.to_pydatetime(), end=chunk_end.to_pydatetime(),
89
- interval_ms=0, agg="NONE", limit=10000,
90
- )
91
- if not df.empty:
92
- chunks.append(df)
93
- except Exception:
94
- pass
95
- cursor = chunk_end
96
-
97
- if not chunks:
98
- return {"error": "No Air1 temperature data available from ThingsBoard"}
99
-
100
- full = pd.concat(chunks).sort_index()
101
- full = full[~full.index.duplicated(keep="first")]
102
- full = full.tz_convert(tz)
103
-
104
- hourly = full["airTemperature"].resample("1h").mean().dropna()
105
- if hourly.empty:
106
- return {"error": "No hourly temperature after resampling"}
107
-
108
- temps = hourly.values
109
- chill_hourly = np.select(
110
- [temps <= 7.0, (temps > 7.0) & (temps <= 10.0),
111
- (temps > 10.0) & (temps <= 18.0), temps > 18.0],
112
- [1.0, 0.5, 0.0, -1.0],
113
- )
114
-
115
- daily_chill = pd.Series(chill_hourly, index=hourly.index).resample("D").sum().clip(lower=0)
116
- cu_open = daily_chill.cumsum()
117
- cu_panels = (daily_chill * PANEL_MULTIPLIER).cumsum()
118
-
119
- daily = [
120
- {
121
- "date": ts.strftime("%Y-%m-%d"),
122
- "under_panels": round(float(cu_panels.loc[ts]), 1),
123
- "open_field": round(float(cu_open.loc[ts]), 1),
124
- }
125
- for ts in daily_chill.index
126
- ]
127
-
128
- return {
129
- "season_start": season_start,
130
- "latest_under_panels": round(float(cu_panels.iloc[-1]), 1) if len(cu_panels) else 0,
131
- "latest_open_field": round(float(cu_open.iloc[-1]), 1) if len(cu_open) else 0,
132
- "days_counted": len(daily_chill),
133
- "daily": daily,
134
- }
135
- except Exception as exc:
136
- log.error("Chill units failed: %s", exc)
137
- return {"error": f"Chill units failed: {exc}"}
 
48
  season_start: str = Query("2025-11-01", description="Season start (YYYY-MM-DD)"),
49
  hub: DataHub = Depends(get_datahub),
50
  ):
51
+ """Accumulated chill units (Utah model, 6h TTL cache)."""
52
+ return hub.biology.get_chill_units(season_start=season_start)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
backend/api/routes/chatbot.py CHANGED
@@ -83,6 +83,19 @@ async def chat_message(request: Request, req: ChatRequest, hub: DataHub = Depend
83
  raise HTTPException(status_code=500, detail="Chatbot processing failed")
84
 
85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  @router.post("/feedback")
87
  @limiter.limit("60/minute")
88
  async def chat_feedback(request: Request, req: FeedbackRequest):
 
83
  raise HTTPException(status_code=500, detail="Chatbot processing failed")
84
 
85
 
86
+ @router.get("/briefing")
87
+ async def chat_briefing(hub: DataHub = Depends(get_datahub)):
88
+ """Return a short live status briefing for the advisor panel."""
89
+ bot = _get_chatbot(hub)
90
+ if bot is None:
91
+ raise HTTPException(status_code=503, detail="Chatbot unavailable")
92
+ try:
93
+ return {"briefing": bot._build_status_briefing()}
94
+ except Exception as exc:
95
+ log.error("Briefing error: %s", exc)
96
+ return {"briefing": ""}
97
+
98
+
99
  @router.post("/feedback")
100
  @limiter.limit("60/minute")
101
  async def chat_feedback(request: Request, req: FeedbackRequest):
backend/api/routes/control.py CHANGED
@@ -1,12 +1,13 @@
1
- """Control system endpoints β€” reads state from Redis."""
2
 
3
  from __future__ import annotations
4
 
5
  import logging
6
 
7
- from fastapi import APIRouter, HTTPException
8
 
9
  from backend.api.deps import get_datahub, get_redis_client
 
10
 
11
  log = logging.getLogger(__name__)
12
  router = APIRouter()
@@ -56,48 +57,13 @@ async def control_budget():
56
 
57
 
58
  @router.get("/trackers/details")
59
- async def tracker_details():
60
- """Tracker entity table β€” name, mode, angle, last activity for all trackers."""
61
- try:
62
- from src.data.thingsboard_client import ThingsBoardClient, TRACKER_KEYS
63
- client = ThingsBoardClient()
64
- trackers = []
65
- for name in ["Tracker501", "Tracker502", "Tracker503", "Tracker509"]:
66
- try:
67
- vals = client.get_latest_telemetry(name, list(TRACKER_KEYS))
68
- trackers.append({
69
- "name": name,
70
- "label": name.replace("Tracker", "Row "),
71
- "angle": round(float(vals.get("angle", 0)), 1) if vals.get("angle") is not None else None,
72
- "manual_mode": vals.get("manualMode"),
73
- "set_angle": round(float(vals.get("setAngle", 0)), 1) if vals.get("setAngle") is not None else None,
74
- "set_mode": vals.get("setMode"),
75
- })
76
- except Exception as exc:
77
- trackers.append({"name": name, "label": name, "error": str(exc)})
78
- return {"trackers": trackers}
79
- except Exception as exc:
80
- log.error("Tracker details failed: %s", exc)
81
- return {"trackers": [], "error": str(exc)}
82
-
83
-
84
- _tracker_cache: dict = {"data": None, "expires": 0.0}
85
 
86
 
87
  @router.get("/trackers")
88
- async def control_trackers():
89
- """Live tracker angles from ThingsBoard (5-min TTL cache)."""
90
- import time as _time
91
- if _tracker_cache["data"] and _time.monotonic() < _tracker_cache["expires"]:
92
- return _tracker_cache["data"]
93
- try:
94
- from src.tracker_dispatcher import TrackerDispatcher
95
- dispatcher = TrackerDispatcher()
96
- angles = dispatcher.read_current_angles()
97
- result = {"trackers": angles, "source": "ThingsBoard"}
98
- _tracker_cache["data"] = result
99
- _tracker_cache["expires"] = _time.monotonic() + 300 # 5 min
100
- return result
101
- except Exception as exc:
102
- log.error("Tracker fetch failed: %s", exc)
103
- raise HTTPException(status_code=502, detail=f"Tracker fetch failed: {exc}")
 
1
+ """Control system endpoints β€” reads state from Redis + DataHub services."""
2
 
3
  from __future__ import annotations
4
 
5
  import logging
6
 
7
+ from fastapi import APIRouter, Depends, HTTPException
8
 
9
  from backend.api.deps import get_datahub, get_redis_client
10
+ from src.data.data_providers import DataHub
11
 
12
  log = logging.getLogger(__name__)
13
  router = APIRouter()
 
57
 
58
 
59
  @router.get("/trackers/details")
60
+ async def tracker_details(hub: DataHub = Depends(get_datahub)):
61
+ """Tracker entity table β€” name, mode, angle for all trackers (cached)."""
62
+ return hub.vine_sensors.get_tracker_details()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
 
64
 
65
  @router.get("/trackers")
66
+ async def control_trackers(hub: DataHub = Depends(get_datahub)):
67
+ """Live tracker angles (cached via VineSensorService)."""
68
+ result = hub.vine_sensors.get_tracker_details()
69
+ return {"trackers": result.get("trackers", []), "source": "ThingsBoard"}
 
 
 
 
 
 
 
 
 
 
 
 
backend/api/routes/events.py ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ SSE endpoint β€” pushes lightweight version updates to connected browsers.
3
+
4
+ The client receives a JSON dict of topic versions whenever data changes.
5
+ It then invalidates the matching TanStack Query cache, triggering a
6
+ refetch via the existing REST endpoints.
7
+
8
+ No new Redis commands β€” everything is in-memory via EventBus.
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ import json
14
+ import logging
15
+
16
+ from fastapi import APIRouter, Request
17
+ from starlette.responses import StreamingResponse
18
+
19
+ from backend.api.events import event_bus
20
+
21
+ log = logging.getLogger("solarwine.sse")
22
+ router = APIRouter()
23
+
24
+
25
+ @router.get("/stream")
26
+ async def event_stream(request: Request):
27
+ """Server-Sent Events stream.
28
+
29
+ Sends a ``versions`` message whenever the EventBus is notified,
30
+ or a keep-alive comment every ~25 s to prevent proxy timeouts.
31
+ """
32
+
33
+ async def generate():
34
+ # Send current versions immediately on connect
35
+ yield f"data: {json.dumps(event_bus.versions)}\n\n"
36
+ last_versions = dict(event_bus.versions)
37
+ while True:
38
+ if await request.is_disconnected():
39
+ break
40
+ versions = await event_bus.wait(timeout=25)
41
+ if versions != last_versions:
42
+ yield f"data: {json.dumps(versions)}\n\n"
43
+ last_versions = dict(versions)
44
+ else:
45
+ # Keep-alive comment to prevent proxy idle timeout
46
+ yield ": heartbeat\n\n"
47
+
48
+ return StreamingResponse(
49
+ generate(),
50
+ media_type="text/event-stream",
51
+ headers={
52
+ "Cache-Control": "no-cache",
53
+ "X-Accel-Buffering": "no", # disable nginx/proxy buffering
54
+ },
55
+ )
backend/api/routes/sensors.py CHANGED
@@ -1,4 +1,4 @@
1
- """Vine sensor endpoints β€” wraps VineSensorService."""
2
 
3
  from __future__ import annotations
4
 
@@ -69,164 +69,79 @@ async def sensors_history(
69
  @router.get("/soil-moisture")
70
  async def soil_moisture_history(
71
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
 
72
  ):
73
- """Hourly soil moisture time-series from ThingsBoard (treatment Soil1)."""
74
- try:
75
- from datetime import datetime, timezone, timedelta
76
- from src.data.thingsboard_client import ThingsBoardClient
77
-
78
- client = ThingsBoardClient()
79
- end = datetime.now(tz=timezone.utc)
80
- start = end - timedelta(hours=hours)
81
-
82
- df = client.get_timeseries(
83
- "Soil1", ["soilMoisture", "soilMoisture2"],
84
- start=start, end=end,
85
- interval_ms=3_600_000, agg="AVG", limit=2000,
86
- )
87
- if df.empty:
88
- return []
89
- rows = []
90
- for ts, row in df.iterrows():
91
- r = {"timestamp": ts.isoformat()}
92
- for col in df.columns:
93
- val = row[col]
94
- if val is not None and val == val: # not NaN
95
- r[col] = round(float(val), 1)
96
- rows.append(r)
97
- return rows
98
- except Exception as exc:
99
- log.error("Soil moisture history failed: %s", exc)
100
- return []
101
-
102
-
103
- def _tb_timeseries(device: str, keys: list, hours: int):
104
- """Fetch TB time-series with fallback: try AVG aggregation first, then NONE."""
105
- from datetime import datetime, timezone, timedelta
106
- from src.data.thingsboard_client import ThingsBoardClient
107
- client = ThingsBoardClient()
108
- end = datetime.now(tz=timezone.utc)
109
- start = end - timedelta(hours=hours)
110
-
111
- # Try aggregated first
112
- df = client.get_timeseries(device, keys, start=start, end=end,
113
- interval_ms=3_600_000, agg="AVG", limit=2000)
114
- if not df.empty:
115
- return df
116
-
117
- # Fallback: raw data, resample locally
118
- import pandas as pd
119
- df = client.get_timeseries(device, keys, start=start, end=end,
120
- interval_ms=0, agg="NONE", limit=10000)
121
- if df.empty:
122
- return df
123
- return df.resample("1h").mean(numeric_only=True).dropna(how="all")
124
 
125
 
126
  @router.get("/air-leaf-delta")
127
  async def air_leaf_delta(
128
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
 
129
  ):
130
- """Hourly air-leaf temperature delta from ThingsBoard (Air2 under panels)."""
131
- try:
132
- df = _tb_timeseries("Air1", ["airLeafDeltaT"], hours)
133
- if df.empty:
134
- return []
135
- return [{"timestamp": ts.isoformat(), "value": round(float(row["airLeafDeltaT"]), 2)}
136
- for ts, row in df.iterrows() if row.get("airLeafDeltaT") is not None and row["airLeafDeltaT"] == row["airLeafDeltaT"]]
137
- except Exception as exc:
138
- log.error("Air/leaf delta failed: %s", exc)
139
- return []
140
 
141
 
142
  @router.get("/temperature-humidity")
143
  async def temp_humidity(
144
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
 
145
  ):
146
- """Hourly temperature + humidity from ThingsBoard (Air2 under panels)."""
147
- try:
148
- df = _tb_timeseries("Air1", ["airTemperature", "airHumidity"], hours)
149
- if df.empty:
150
- return []
151
- rows = []
152
- for ts, row in df.iterrows():
153
- r = {"timestamp": ts.isoformat()}
154
- for col in ["airTemperature", "airHumidity"]:
155
- v = row.get(col)
156
- if v is not None and v == v: # NaN check (NaN != NaN)
157
- r[col] = round(float(v), 1)
158
- rows.append(r)
159
- return rows
160
- except Exception as exc:
161
- log.error("Temp/humidity failed: %s", exc)
162
- return []
163
 
164
 
165
  @router.get("/ndvi")
166
  async def ndvi_history(
167
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
 
168
  ):
169
- """Hourly NDVI from ThingsBoard β€” treatment (Crop3) and ambient (Air1)."""
170
- try:
171
- rows = []
172
- for device, label in [("Air1", "ambient"), ("Crop3", "treatment")]:
173
- try:
174
- df = _tb_timeseries(device, ["NDVI"], min(hours, 168))
175
- if not df.empty and "NDVI" in df.columns:
176
- for ts, row in df.iterrows():
177
- v = row.get("NDVI")
178
- if v is not None and v == v: # NaN check (NaN != NaN)
179
- rows.append({"timestamp": ts.isoformat(), "device": label, "ndvi": round(float(v), 4)})
180
- except Exception:
181
- pass
182
- return rows
183
- except Exception as exc:
184
- log.error("NDVI failed: %s", exc)
185
- return []
186
 
187
 
188
  @router.get("/vpd")
189
  async def vpd_history(
190
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
 
191
  ):
192
- """Hourly VPD from ThingsBoard (Air2 under panels)."""
193
- try:
194
- df = _tb_timeseries("Air1", ["VPD"], hours)
195
- if df.empty:
196
- return []
197
- return [{"timestamp": ts.isoformat(), "value": round(float(row["VPD"]), 2)}
198
- for ts, row in df.iterrows() if row.get("VPD") is not None and row["VPD"] == row["VPD"]]
199
- except Exception as exc:
200
- log.error("VPD failed: %s", exc)
201
- return []
202
 
203
 
204
  @router.get("/rain")
205
  async def rain_history(
206
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
 
207
  ):
208
- """Hourly rain time-series from ThingsBoard ambient station (Air1)."""
209
- try:
210
- from datetime import datetime, timezone, timedelta
211
- from src.data.thingsboard_client import ThingsBoardClient
212
-
213
- client = ThingsBoardClient()
214
- end = datetime.now(tz=timezone.utc)
215
- start = end - timedelta(hours=hours)
216
-
217
- df = client.get_timeseries(
218
- "Air1", ["rain"],
219
- start=start, end=end,
220
- interval_ms=3_600_000, agg="SUM", limit=2000,
221
- )
222
- if df.empty:
223
- return []
224
- rows = []
225
- for ts, row in df.iterrows():
226
- val = row.get("rain")
227
- if val is not None and val == val:
228
- rows.append({"timestamp": ts.isoformat(), "rain_mm": round(float(val), 2)})
229
- return rows
230
- except Exception as exc:
231
- log.error("Rain history failed: %s", exc)
232
- return []
 
1
+ """Vine sensor endpoints β€” all data routed through VineSensorService."""
2
 
3
  from __future__ import annotations
4
 
 
69
  @router.get("/soil-moisture")
70
  async def soil_moisture_history(
71
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
72
+ hub: DataHub = Depends(get_datahub),
73
  ):
74
+ """Hourly soil moisture time-series (treatment Soil1)."""
75
+ rows = hub.vine_sensors.get_device_timeseries(
76
+ "Soil1", ["soilMoisture", "soilMoisture2"], hours_back=hours,
77
+ )
78
+ return rows
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
79
 
80
 
81
  @router.get("/air-leaf-delta")
82
  async def air_leaf_delta(
83
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
84
+ hub: DataHub = Depends(get_datahub),
85
  ):
86
+ """Hourly air-leaf temperature delta (Air1)."""
87
+ rows = hub.vine_sensors.get_device_timeseries(
88
+ "Air1", ["airLeafDeltaT"], hours_back=hours,
89
+ )
90
+ # Reshape to {timestamp, value} for frontend compatibility
91
+ return [{"timestamp": r["timestamp"], "value": r.get("airLeafDeltaT")}
92
+ for r in rows if r.get("airLeafDeltaT") is not None]
 
 
 
93
 
94
 
95
  @router.get("/temperature-humidity")
96
  async def temp_humidity(
97
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
98
+ hub: DataHub = Depends(get_datahub),
99
  ):
100
+ """Hourly temperature + humidity (Air1)."""
101
+ return hub.vine_sensors.get_device_timeseries(
102
+ "Air1", ["airTemperature", "airHumidity"], hours_back=hours,
103
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
105
 
106
  @router.get("/ndvi")
107
  async def ndvi_history(
108
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
109
+ hub: DataHub = Depends(get_datahub),
110
  ):
111
+ """Hourly NDVI β€” treatment (Crop3) and ambient (Air1)."""
112
+ rows = []
113
+ for device, label in [("Air1", "ambient"), ("Crop3", "treatment")]:
114
+ ts_rows = hub.vine_sensors.get_device_timeseries(
115
+ device, ["NDVI"], hours_back=min(hours, 168),
116
+ )
117
+ for r in ts_rows:
118
+ v = r.get("NDVI")
119
+ if v is not None:
120
+ rows.append({"timestamp": r["timestamp"], "device": label, "ndvi": round(v, 4)})
121
+ return rows
 
 
 
 
 
 
122
 
123
 
124
  @router.get("/vpd")
125
  async def vpd_history(
126
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
127
+ hub: DataHub = Depends(get_datahub),
128
  ):
129
+ """Hourly VPD (Air1)."""
130
+ rows = hub.vine_sensors.get_device_timeseries(
131
+ "Air1", ["VPD"], hours_back=hours,
132
+ )
133
+ return [{"timestamp": r["timestamp"], "value": r.get("VPD")}
134
+ for r in rows if r.get("VPD") is not None]
 
 
 
 
135
 
136
 
137
  @router.get("/rain")
138
  async def rain_history(
139
  hours: int = Query(168, ge=1, le=8760, description="Hours of history"),
140
+ hub: DataHub = Depends(get_datahub),
141
  ):
142
+ """Hourly rain (Air1, SUM aggregation)."""
143
+ rows = hub.vine_sensors.get_device_timeseries(
144
+ "Air1", ["rain"], hours_back=hours, agg="SUM",
145
+ )
146
+ return [{"timestamp": r["timestamp"], "rain_mm": r.get("rain", 0)}
147
+ for r in rows if r.get("rain") is not None]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/chatbot/vineyard_chatbot.py CHANGED
@@ -557,11 +557,24 @@ class VineyardChatbot:
557
  """Build Gemini multi-turn message list with sliding context window.
558
 
559
  Strategy:
 
560
  - Keep the most recent 6 messages verbatim (for conversational flow)
561
  - Summarize older messages into a single context message
562
- - Always include pinned context (current date, season)
563
  """
564
  messages = []
 
 
 
 
 
 
 
 
 
 
 
 
 
565
  n = len(history)
566
 
567
  if n > self._RECENT_MESSAGES:
@@ -685,6 +698,108 @@ class VineyardChatbot:
685
  pass
686
  return ctx
687
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
688
  # ------------------------------------------------------------------
689
  # Main chat method
690
  # ------------------------------------------------------------------
 
557
  """Build Gemini multi-turn message list with sliding context window.
558
 
559
  Strategy:
560
+ - Inject live status briefing as pinned context (from cached data)
561
  - Keep the most recent 6 messages verbatim (for conversational flow)
562
  - Summarize older messages into a single context message
 
563
  """
564
  messages = []
565
+
566
+ # Inject live status briefing so the LLM has immediate context
567
+ briefing = self._build_status_briefing()
568
+ if briefing:
569
+ messages.append({
570
+ "role": "user",
571
+ "parts": [{"text": f"[System status β€” do not repeat verbatim, use as context]\n{briefing}"}],
572
+ })
573
+ messages.append({
574
+ "role": "model",
575
+ "parts": [{"text": "Got it, I have the current status."}],
576
+ })
577
+
578
  n = len(history)
579
 
580
  if n > self._RECENT_MESSAGES:
 
698
  pass
699
  return ctx
700
 
701
+ # ------------------------------------------------------------------
702
+ # Live status briefing β€” injected at conversation start
703
+ # ------------------------------------------------------------------
704
+
705
+ def _build_status_briefing(self) -> str:
706
+ """Assemble a short system status from cached DataHub data.
707
+
708
+ Uses only already-cached values (no new API calls), so it adds
709
+ zero latency. Returns an empty string if nothing is available.
710
+ """
711
+ from datetime import datetime
712
+ import zoneinfo
713
+
714
+ lines: list[str] = []
715
+ tz = zoneinfo.ZoneInfo("Asia/Jerusalem")
716
+ now = datetime.now(tz=tz)
717
+ lines.append(f"CURRENT STATUS ({now.strftime('%Y-%m-%d %H:%M')} IST):")
718
+
719
+ # Weather
720
+ try:
721
+ wx = self.hub.weather.get_current()
722
+ if wx and "error" not in wx:
723
+ t = wx.get("air_temperature_c")
724
+ ghi = wx.get("ghi_w_m2")
725
+ rh = wx.get("rh_percent")
726
+ wind = wx.get("wind_speed_ms")
727
+ parts = []
728
+ if t is not None:
729
+ parts.append(f"T={float(t):.1f}Β°C")
730
+ if ghi is not None:
731
+ parts.append(f"GHI={float(ghi):.0f} W/mΒ²")
732
+ if rh is not None:
733
+ parts.append(f"RH={float(rh):.0f}%")
734
+ if wind is not None:
735
+ parts.append(f"wind={float(wind):.1f} m/s")
736
+ if parts:
737
+ lines.append(f" Weather: {', '.join(parts)}")
738
+ age = wx.get("age_minutes")
739
+ if age is not None and float(age) > 30:
740
+ lines.append(f" (weather data is {int(float(age))} min old)")
741
+ except Exception:
742
+ pass
743
+
744
+ # Sensors
745
+ try:
746
+ snap = self.hub.vine_sensors.get_snapshot(light=True)
747
+ if snap and "error" not in snap:
748
+ parts = []
749
+ for key, label in [
750
+ ("treatment_air_temp_c", "air"),
751
+ ("treatment_crop_par_umol", "PAR"),
752
+ ("treatment_soil_moisture_pct", "soil"),
753
+ ]:
754
+ v = snap.get(key)
755
+ if v is not None:
756
+ if "temp" in key:
757
+ parts.append(f"{label}={float(v):.1f}Β°C")
758
+ elif "par" in key:
759
+ parts.append(f"{label}={float(v):.0f} Β΅mol")
760
+ else:
761
+ parts.append(f"{label}={float(v):.0f}%")
762
+ if parts:
763
+ lines.append(f" Sensors (treatment): {', '.join(parts)}")
764
+ stale = snap.get("staleness_minutes")
765
+ if stale is not None and float(stale) > 15:
766
+ lines.append(f" (sensors {int(float(stale))} min old)")
767
+ except Exception:
768
+ pass
769
+
770
+ # Energy
771
+ try:
772
+ en = self.hub.energy.get_current()
773
+ if en and "error" not in en:
774
+ pw = en.get("power_kw")
775
+ if pw is not None:
776
+ lines.append(f" Energy: {float(pw):.1f} kW now")
777
+ except Exception:
778
+ pass
779
+
780
+ # Phenology
781
+ try:
782
+ from src.phenology import estimate_stage_for_date
783
+ from datetime import date
784
+ stage = estimate_stage_for_date(date.today())
785
+ lines.append(f" Phenology: {stage.name} ({stage.id})")
786
+ except Exception:
787
+ pass
788
+
789
+ # Control status (from Redis via hub β€” no direct Redis import)
790
+ try:
791
+ ctrl = self.hub.advisory.get_status()
792
+ if ctrl and "error" not in ctrl:
793
+ mode = ctrl.get("mode") or ctrl.get("action")
794
+ if mode:
795
+ lines.append(f" Control: {mode}")
796
+ except Exception:
797
+ pass
798
+
799
+ if len(lines) <= 1:
800
+ return ""
801
+ return "\n".join(lines)
802
+
803
  # ------------------------------------------------------------------
804
  # Main chat method
805
  # ------------------------------------------------------------------
src/data/data_providers.py CHANGED
@@ -38,9 +38,12 @@ Loose coupling guarantees:
38
 
39
  from __future__ import annotations
40
 
 
41
  import math
42
  import time
43
  import traceback
 
 
44
  from abc import ABC, abstractmethod
45
  from dataclasses import dataclass, field
46
  from datetime import date, datetime, timedelta, timezone
@@ -335,6 +338,8 @@ class VineSensorService(BaseService):
335
  def __init__(self, tb_client: Any = None, snapshot_ttl: float = 300):
336
  self._tb = tb_client # lazy
337
  self._snap_cache = TTLCache(ttl_seconds=snapshot_ttl, redis_prefix="vine:")
 
 
338
  self._breaker = CircuitBreaker(threshold=3, cooldown_sec=300)
339
 
340
  def _client(self):
@@ -430,6 +435,99 @@ class VineSensorService(BaseService):
430
  log.error("Sensor history query failed: %s", exc)
431
  return {"error": f"Sensor history failed: {exc}"}
432
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
433
 
434
  # ═══════════════════════════════════════════════════════════════════════
435
  # 3. PhotosynthesisService (FvCB + ML + forecast)
@@ -1157,15 +1255,23 @@ class AdvisoryService(BaseService):
1157
  # ═══════════════════════════════════════════════════════════════════════
1158
 
1159
  class BiologyService(BaseService):
1160
- """Biology rules lookup β€” pure in-memory, no API calls."""
1161
 
1162
  service_name = "biology"
1163
 
1164
- def __init__(self, rules: Optional[Dict[str, str]] = None):
1165
  if rules is None:
1166
  from src.vineyard_chatbot import BIOLOGY_RULES
1167
  rules = BIOLOGY_RULES
1168
  self._rules = rules
 
 
 
 
 
 
 
 
1169
 
1170
  def explain_rule(self, rule_name: str) -> Dict[str, Any]:
1171
  key = rule_name.lower().strip()
@@ -1176,6 +1282,91 @@ class BiologyService(BaseService):
1176
  def list_rules(self) -> Dict[str, Any]:
1177
  return {"rules": list(self._rules.keys())}
1178
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1179
 
1180
  # ═══════════════════════════════════════════════════════════════════════
1181
  # DataHub (service registry)
 
38
 
39
  from __future__ import annotations
40
 
41
+ import logging
42
  import math
43
  import time
44
  import traceback
45
+
46
+ log = logging.getLogger("solarwine.data_providers")
47
  from abc import ABC, abstractmethod
48
  from dataclasses import dataclass, field
49
  from datetime import date, datetime, timedelta, timezone
 
338
  def __init__(self, tb_client: Any = None, snapshot_ttl: float = 300):
339
  self._tb = tb_client # lazy
340
  self._snap_cache = TTLCache(ttl_seconds=snapshot_ttl, redis_prefix="vine:")
341
+ self._ts_cache = TTLCache(ttl_seconds=900, redis_prefix="vine_ts:") # 15 min for time-series
342
+ self._tracker_cache = TTLCache(ttl_seconds=300, redis_prefix="tracker:") # 5 min for trackers
343
  self._breaker = CircuitBreaker(threshold=3, cooldown_sec=300)
344
 
345
  def _client(self):
 
435
  log.error("Sensor history query failed: %s", exc)
436
  return {"error": f"Sensor history failed: {exc}"}
437
 
438
+ def get_device_timeseries(
439
+ self,
440
+ device: str,
441
+ keys: List[str],
442
+ hours_back: int = 168,
443
+ agg: str = "AVG",
444
+ ) -> List[Dict[str, Any]]:
445
+ """Hourly time-series for a specific device + keys (15-min TTL cache).
446
+
447
+ Returns a list of ``{timestamp, key1, key2, ...}`` dicts.
448
+ Used by sensor history endpoints (soil moisture, VPD, NDVI, etc.).
449
+ """
450
+ cache_key = f"{device}:{','.join(sorted(keys))}:{hours_back}:{agg}"
451
+ cached = self._ts_cache.get(cache_key)
452
+ if cached is not None:
453
+ return cached
454
+ if self._breaker.is_open:
455
+ return []
456
+ try:
457
+ end = datetime.now(tz=timezone.utc)
458
+ start = end - timedelta(hours=hours_back)
459
+ client = self._client()
460
+
461
+ # Try server-side aggregation first
462
+ df = client.get_timeseries(
463
+ device, keys, start=start, end=end,
464
+ interval_ms=3_600_000, agg=agg, limit=2000,
465
+ )
466
+ if df.empty:
467
+ # Fallback: raw data, resample locally
468
+ df = client.get_timeseries(
469
+ device, keys, start=start, end=end,
470
+ interval_ms=0, agg="NONE", limit=10000,
471
+ )
472
+ if not df.empty:
473
+ df = df.resample("1h").mean(numeric_only=True).dropna(how="all")
474
+
475
+ if df.empty:
476
+ self._breaker.record_success()
477
+ result: List[Dict[str, Any]] = []
478
+ self._ts_cache.set(cache_key, result)
479
+ return result
480
+
481
+ rows: List[Dict[str, Any]] = []
482
+ for ts, row in df.iterrows():
483
+ r: Dict[str, Any] = {"timestamp": ts.isoformat()}
484
+ for col in df.columns:
485
+ val = row[col]
486
+ if val is not None and val == val: # NaN check
487
+ r[col] = round(float(val), 2)
488
+ rows.append(r)
489
+
490
+ self._breaker.record_success()
491
+ self._ts_cache.set(cache_key, rows)
492
+ return rows
493
+ except Exception as exc:
494
+ self._breaker.record_failure()
495
+ log.error("Device timeseries failed (%s): %s", device, exc)
496
+ return []
497
+
498
+ def get_tracker_details(self) -> Dict[str, Any]:
499
+ """Latest tracker angles/modes for all 4 trackers (5-min TTL cache)."""
500
+ cached = self._tracker_cache.get("details")
501
+ if cached is not None:
502
+ return cached
503
+ if self._breaker.is_open:
504
+ return {"trackers": [], "error": "ThingsBoard circuit breaker open"}
505
+ try:
506
+ client = self._client()
507
+ tracker_keys = ["angle", "manualMode", "setAngle", "setMode"]
508
+ trackers = []
509
+ for name in ["Tracker501", "Tracker502", "Tracker503", "Tracker509"]:
510
+ try:
511
+ vals = client.get_latest_telemetry(name, tracker_keys)
512
+ trackers.append({
513
+ "name": name,
514
+ "label": name.replace("Tracker", "Row "),
515
+ "angle": round(float(vals.get("angle", 0)), 1) if vals.get("angle") is not None else None,
516
+ "manual_mode": vals.get("manualMode"),
517
+ "set_angle": round(float(vals.get("setAngle", 0)), 1) if vals.get("setAngle") is not None else None,
518
+ "set_mode": vals.get("setMode"),
519
+ })
520
+ except Exception as exc:
521
+ trackers.append({"name": name, "label": name, "error": str(exc)})
522
+ result = {"trackers": trackers}
523
+ self._breaker.record_success()
524
+ self._tracker_cache.set("details", result)
525
+ return result
526
+ except Exception as exc:
527
+ self._breaker.record_failure()
528
+ log.error("Tracker details failed: %s", exc)
529
+ return {"trackers": [], "error": str(exc)}
530
+
531
 
532
  # ═══════════════════════════════════════════════════════════════════════
533
  # 3. PhotosynthesisService (FvCB + ML + forecast)
 
1255
  # ═══════════════════════════════════════════════════════════════════════
1256
 
1257
  class BiologyService(BaseService):
1258
+ """Biology rules lookup + chill unit computation."""
1259
 
1260
  service_name = "biology"
1261
 
1262
+ def __init__(self, rules: Optional[Dict[str, str]] = None, tb_client: Any = None):
1263
  if rules is None:
1264
  from src.vineyard_chatbot import BIOLOGY_RULES
1265
  rules = BIOLOGY_RULES
1266
  self._rules = rules
1267
+ self._tb = tb_client
1268
+ self._chill_cache = TTLCache(ttl_seconds=21600, redis_prefix="biology:") # 6h TTL
1269
+
1270
+ def _client(self):
1271
+ if self._tb is None:
1272
+ from src.data.thingsboard_client import ThingsBoardClient
1273
+ self._tb = ThingsBoardClient()
1274
+ return self._tb
1275
 
1276
  def explain_rule(self, rule_name: str) -> Dict[str, Any]:
1277
  key = rule_name.lower().strip()
 
1282
  def list_rules(self) -> Dict[str, Any]:
1283
  return {"rules": list(self._rules.keys())}
1284
 
1285
+ def get_chill_units(self, season_start: str = "2025-11-01") -> Dict[str, Any]:
1286
+ """Accumulated chill units from ThingsBoard Air1 temperature (Utah model, 6h TTL).
1287
+
1288
+ Richardson et al. 1974:
1289
+ T <= 7Β°C β†’ +1.0 CU/hour
1290
+ 7 < T <= 10 β†’ +0.5
1291
+ 10 < T <= 18 β†’ 0.0
1292
+ T > 18 β†’ -1.0
1293
+ """
1294
+ cache_key = f"chill:{season_start}"
1295
+ cached = self._chill_cache.get(cache_key)
1296
+ if cached is not None:
1297
+ return cached
1298
+
1299
+ try:
1300
+ import numpy as np
1301
+ from zoneinfo import ZoneInfo
1302
+
1303
+ tz = ZoneInfo("Asia/Jerusalem")
1304
+ client = self._client()
1305
+ start = pd.Timestamp(season_start, tz="UTC")
1306
+ end = pd.Timestamp.now(tz="UTC")
1307
+
1308
+ # Fetch Air1 temperature in 7-day chunks
1309
+ chunks = []
1310
+ cursor = start
1311
+ while cursor < end:
1312
+ chunk_end = min(cursor + pd.Timedelta(days=7), end)
1313
+ try:
1314
+ df = client.get_timeseries(
1315
+ "Air1", ["airTemperature"],
1316
+ start=cursor.to_pydatetime(), end=chunk_end.to_pydatetime(),
1317
+ interval_ms=0, agg="NONE", limit=10000,
1318
+ )
1319
+ if not df.empty:
1320
+ chunks.append(df)
1321
+ except Exception:
1322
+ pass
1323
+ cursor = chunk_end
1324
+
1325
+ if not chunks:
1326
+ return {"error": "No Air1 temperature data available from ThingsBoard"}
1327
+
1328
+ full = pd.concat(chunks).sort_index()
1329
+ full = full[~full.index.duplicated(keep="first")]
1330
+ full = full.tz_convert(tz)
1331
+
1332
+ hourly = full["airTemperature"].resample("1h").mean().dropna()
1333
+ if hourly.empty:
1334
+ return {"error": "No hourly temperature after resampling"}
1335
+
1336
+ PANEL_MULTIPLIER = 1.1
1337
+ temps = hourly.values
1338
+ chill_hourly = np.select(
1339
+ [temps <= 7.0, (temps > 7.0) & (temps <= 10.0),
1340
+ (temps > 10.0) & (temps <= 18.0), temps > 18.0],
1341
+ [1.0, 0.5, 0.0, -1.0],
1342
+ )
1343
+
1344
+ daily_chill = pd.Series(chill_hourly, index=hourly.index).resample("D").sum().clip(lower=0)
1345
+ cu_open = daily_chill.cumsum()
1346
+ cu_panels = (daily_chill * PANEL_MULTIPLIER).cumsum()
1347
+
1348
+ daily = [
1349
+ {
1350
+ "date": ts.strftime("%Y-%m-%d"),
1351
+ "under_panels": round(float(cu_panels.loc[ts]), 1),
1352
+ "open_field": round(float(cu_open.loc[ts]), 1),
1353
+ }
1354
+ for ts in daily_chill.index
1355
+ ]
1356
+
1357
+ result = {
1358
+ "season_start": season_start,
1359
+ "latest_under_panels": round(float(cu_panels.iloc[-1]), 1) if len(cu_panels) else 0,
1360
+ "latest_open_field": round(float(cu_open.iloc[-1]), 1) if len(cu_open) else 0,
1361
+ "days_counted": len(daily_chill),
1362
+ "daily": daily,
1363
+ }
1364
+ self._chill_cache.set(cache_key, result)
1365
+ return result
1366
+ except Exception as exc:
1367
+ log.error("Chill units failed: %s", exc)
1368
+ return {"error": f"Chill units failed: {exc}"}
1369
+
1370
 
1371
  # ═══════════════════════════════════════════════════════════════════════
1372
  # DataHub (service registry)