Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Shared in-memory state for Cain worker process. | |
| This module provides a thread-safe in-memory cache for worker state | |
| that replaces file-based heartbeat writes. The worker process (brain_minimal.py) | |
| updates this state via HTTP POST to app.py's internal endpoint. | |
| Key design: | |
| - No file I/O for heartbeat (eliminates race conditions) | |
| - In-memory state cache in app.py | |
| - Worker state reflects activity level, not just PID existence | |
| """ | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import Any, Optional | |
| class WorkerState: | |
| """ | |
| Thread-safe in-memory cache for worker state. | |
| This is the source of truth for worker activity status. | |
| Updated by brain_minimal.py via HTTP POST to /internal/heartbeat. | |
| """ | |
| def __init__(self): | |
| self._lock = threading.Lock() | |
| self._state = { | |
| # Worker activity state (decoupled from PID) | |
| "worker_state": "idle", # idle, active, processing | |
| "worker_active": False, | |
| "current_state": "idle", # backward compatibility | |
| # Process info (for logging/debugging) | |
| "worker_pid": None, | |
| "worker_mode": None, | |
| # Heartbeat timestamps | |
| "last_heartbeat": None, | |
| "heartbeat_age_seconds": 0, | |
| # Stage info | |
| "stage": "RUNNING_A2A_READY", | |
| # Health status | |
| "health": "HEALTHY", | |
| "error": None, | |
| } | |
| def update(self, **kwargs): | |
| """Update worker state with new values (thread-safe).""" | |
| with self._lock: | |
| # Update heartbeat timestamp | |
| self._state["last_heartbeat"] = datetime.utcnow().isoformat() + "+00:00" | |
| self._state["heartbeat_age_seconds"] = 0 | |
| # Update provided fields | |
| for key, value in kwargs.items(): | |
| if value is not None: | |
| self._state[key] = value | |
| def get(self) -> dict[str, Any]: | |
| """Get current state snapshot (thread-safe).""" | |
| with self._lock: | |
| # Calculate heartbeat age | |
| if self._state["last_heartbeat"]: | |
| try: | |
| heartbeat_time = datetime.fromisoformat( | |
| self._state["last_heartbeat"].replace("+00:00", "").replace("Z", "") | |
| ) | |
| if heartbeat_time.tzinfo is not None: | |
| heartbeat_time = heartbeat_time.replace(tzinfo=None) | |
| age = (datetime.utcnow() - heartbeat_time).total_seconds() | |
| self._state["heartbeat_age_seconds"] = age | |
| except Exception: | |
| self._state["heartbeat_age_seconds"] = 999 | |
| return self._state.copy() | |
| def is_healthy(self, max_age_seconds: int = 15) -> bool: | |
| """ | |
| Check if worker is healthy based on heartbeat age. | |
| Args: | |
| max_age_seconds: Maximum acceptable heartbeat age (default 15s) | |
| Returns: | |
| True if heartbeat is fresh, False otherwise | |
| """ | |
| with self._lock: | |
| return self._state["heartbeat_age_seconds"] < max_age_seconds | |
| def get_worker_state(self) -> str: | |
| """ | |
| Get worker activity state (decoupled from PID). | |
| Returns: | |
| "idle", "active", or "processing" based on actual activity | |
| """ | |
| with self._lock: | |
| # If heartbeat is stale, worker is not active | |
| if not self.is_healthy(): | |
| return "idle" | |
| # Return the worker activity state (not PID-based) | |
| return self._state.get("worker_state", "idle") | |
| # Global singleton | |
| _worker_state: Optional[WorkerState] = None | |
| def get_worker_state() -> WorkerState: | |
| """Get or create the global worker state singleton.""" | |
| global _worker_state | |
| if _worker_state is None: | |
| _worker_state = WorkerState() | |
| return _worker_state | |