| """Shared-state store for brain runtime via Google Drive, local files, or Firebase.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import threading |
| import time |
| from pathlib import Path |
| from typing import Any |
|
|
| from shared.google_drive_state import GoogleDriveStateClient |
|
|
|
|
| class FirebaseStore: |
| def __init__(self, component: str, logger_name: str = "kapo.brain.firebase") -> None: |
| self.component = component |
| self.logger = logging.getLogger(logger_name) |
| self._db = None |
| self._lock = threading.Lock() |
| self._read_cache: dict[str, tuple[float, Any]] = {} |
| self._list_cache: dict[str, tuple[float, list[dict[str, Any]]]] = {} |
| self._write_cache: dict[str, tuple[float, str]] = {} |
| self._quota_backoff_until: float = 0.0 |
| self._drive = GoogleDriveStateClient(self.logger) |
|
|
| def backend(self) -> str: |
| configured = str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower() |
| if configured in {"google_drive", "drive", "gdrive"}: |
| return "google_drive" |
| if configured in {"file", "files"}: |
| return "file" |
| if configured in {"firebase", "firestore"}: |
| return "firebase" |
| if configured in {"disabled", "off", "none"}: |
| return "disabled" |
| if self._drive.enabled(): |
| return "google_drive" |
| if str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}: |
| return "firebase" |
| return "file" |
|
|
| def enabled(self) -> bool: |
| return self.backend() != "disabled" |
|
|
| def namespace(self) -> str: |
| return str(os.getenv("FIREBASE_NAMESPACE", "kapo")).strip() or "kapo" |
|
|
| def storage_root(self) -> Path: |
| configured = str(os.getenv("KAPO_SHARED_STATE_DIR", "")).strip() |
| if configured: |
| root = Path(configured).expanduser() |
| if not root.is_absolute(): |
| root = Path.cwd().resolve() / root |
| else: |
| root = (Path.cwd().resolve() / "data" / "local" / "shared_state").resolve() |
| root.mkdir(parents=True, exist_ok=True) |
| return root |
|
|
| def _service_payload(self) -> dict[str, Any] | None: |
| raw = str(os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON", "")).strip() |
| if not raw: |
| return None |
| try: |
| return json.loads(raw) |
| except Exception: |
| self.logger.exception("Invalid Firebase service account JSON") |
| return None |
|
|
| @staticmethod |
| def _service_path() -> str: |
| return str(os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH", "")).strip() |
|
|
| def _client(self): |
| if self.backend() != "firebase": |
| return None |
| with self._lock: |
| if self._db is not None: |
| return self._db |
| try: |
| import firebase_admin |
| from firebase_admin import credentials, firestore |
|
|
| if not firebase_admin._apps: |
| payload = self._service_payload() |
| if payload: |
| cred = credentials.Certificate(payload) |
| else: |
| service_path = self._service_path() |
| if not service_path: |
| return None |
| path_obj = Path(service_path).expanduser() |
| if not path_obj.exists() or not path_obj.is_file(): |
| self.logger.warning( |
| "Firebase service account path is unavailable on this runtime: %s", |
| service_path, |
| ) |
| return None |
| cred = credentials.Certificate(str(path_obj.resolve())) |
| options = {} |
| project_id = str(os.getenv("FIREBASE_PROJECT_ID", "")).strip() |
| if project_id: |
| options["projectId"] = project_id |
| firebase_admin.initialize_app(cred, options or None) |
| self._db = firestore.client() |
| return self._db |
| except Exception: |
| self.logger.exception("Failed to initialize Firebase client") |
| return None |
|
|
| def _collection(self, name: str) -> str: |
| return f"{self.namespace()}_{name}" |
|
|
| @staticmethod |
| def _safe_id(value: str, default: str = "default") -> str: |
| text = str(value or "").strip() or default |
| return "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text)[:180] |
|
|
| @staticmethod |
| def _payload_hash(payload: Any) -> str: |
| return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str) |
|
|
| @staticmethod |
| def _is_quota_error(exc: Exception) -> bool: |
| text = str(exc or "").lower() |
| return "resourceexhausted" in text or "quota exceeded" in text or "429" in text |
|
|
| def _quota_backoff_active(self) -> bool: |
| return time.time() < self._quota_backoff_until |
|
|
| def _activate_quota_backoff(self, seconds: float | None = None) -> None: |
| delay = float(seconds or os.getenv("FIREBASE_QUOTA_BACKOFF_SEC", "120") or 120) |
| self._quota_backoff_until = max(self._quota_backoff_until, time.time() + max(5.0, delay)) |
|
|
| def _should_skip_write(self, key: str, payload: Any, min_interval_sec: float) -> bool: |
| now = time.time() |
| payload_hash = self._payload_hash(payload) |
| last = self._write_cache.get(key) |
| if last and last[1] == payload_hash and (now - last[0]) < min_interval_sec: |
| return True |
| self._write_cache[key] = (now, payload_hash) |
| return False |
|
|
| def _file_collection_dir(self, collection: str) -> Path: |
| path = self.storage_root() / self._safe_id(collection, "collection") |
| path.mkdir(parents=True, exist_ok=True) |
| return path |
|
|
| def _file_doc_path(self, collection: str, doc_id: str) -> Path: |
| return self._file_collection_dir(collection) / f"{self._safe_id(doc_id)}.json" |
|
|
| def _write_json_atomic(self, path: Path, payload: dict[str, Any]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| tmp = path.with_suffix(f"{path.suffix}.tmp") |
| tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") |
| tmp.replace(path) |
|
|
| def get_document(self, collection: str, doc_id: str, ttl_sec: float = 12.0) -> dict[str, Any]: |
| safe_doc = self._safe_id(doc_id) |
| cache_key = f"{collection}:{safe_doc}" |
| now = time.time() |
| cached = self._read_cache.get(cache_key) |
| if cached and (now - cached[0]) < ttl_sec: |
| return dict(cached[1] or {}) |
| backend = self.backend() |
| if backend == "google_drive": |
| payload = self._drive.get_document(collection, safe_doc) |
| self._read_cache[cache_key] = (now, payload) |
| return dict(payload or {}) |
| if backend == "file": |
| path = self._file_doc_path(collection, safe_doc) |
| if not path.exists(): |
| return {} |
| try: |
| payload = json.loads(path.read_text(encoding="utf-8")) |
| except Exception: |
| self.logger.warning("Failed to read shared-state file %s", path, exc_info=True) |
| return {} |
| self._read_cache[cache_key] = (now, payload) |
| return dict(payload or {}) |
| if self._quota_backoff_active(): |
| return dict(cached[1] or {}) if cached else {} |
| db = self._client() |
| if db is None: |
| return dict(cached[1] or {}) if cached else {} |
| try: |
| snapshot = db.collection(self._collection(collection)).document(safe_doc).get() |
| payload = snapshot.to_dict() if snapshot.exists else {} |
| self._read_cache[cache_key] = (now, payload) |
| return dict(payload or {}) |
| except Exception as exc: |
| if self._is_quota_error(exc): |
| self._activate_quota_backoff() |
| self.logger.warning("Firebase quota exceeded while reading %s/%s; using cache/backoff", collection, safe_doc) |
| else: |
| self.logger.exception("Failed to read Firebase document %s/%s", collection, safe_doc) |
| return dict(cached[1] or {}) if cached else {} |
|
|
| def set_document(self, collection: str, doc_id: str, payload: dict[str, Any], merge: bool = True, min_interval_sec: float = 5.0) -> bool: |
| safe_doc = self._safe_id(doc_id) |
| cache_key = f"{collection}:{safe_doc}" |
| body = dict(payload or {}) |
| body["component"] = self.component |
| body["updated_at"] = time.time() |
| if self._should_skip_write(cache_key, body, min_interval_sec): |
| return True |
| backend = self.backend() |
| if backend == "google_drive": |
| stored = self._drive.set_document(collection, safe_doc, body, merge=merge) |
| if stored: |
| self._read_cache.pop(cache_key, None) |
| stale_prefix = f"{collection}:list:" |
| for key in list(self._list_cache.keys()): |
| if key.startswith(stale_prefix): |
| self._list_cache.pop(key, None) |
| return stored |
| if backend == "file": |
| try: |
| path = self._file_doc_path(collection, safe_doc) |
| existing = {} |
| if merge and path.exists(): |
| existing = json.loads(path.read_text(encoding="utf-8")) |
| combined = {**existing, **body} if merge else body |
| combined.setdefault("id", safe_doc) |
| self._write_json_atomic(path, combined) |
| self._read_cache.pop(cache_key, None) |
| stale_prefix = f"{collection}:list:" |
| for key in list(self._list_cache.keys()): |
| if key.startswith(stale_prefix): |
| self._list_cache.pop(key, None) |
| return True |
| except Exception: |
| self.logger.warning("Failed to write shared-state file %s/%s", collection, safe_doc, exc_info=True) |
| return False |
| if self._quota_backoff_active(): |
| return False |
| db = self._client() |
| if db is None: |
| return False |
| try: |
| if self._should_skip_write(cache_key, body, min_interval_sec): |
| return True |
| db.collection(self._collection(collection)).document(safe_doc).set(body, merge=merge) |
| self._read_cache.pop(cache_key, None) |
| stale_prefix = f"{collection}:list:" |
| for key in list(self._list_cache.keys()): |
| if key.startswith(stale_prefix): |
| self._list_cache.pop(key, None) |
| return True |
| except Exception as exc: |
| if self._is_quota_error(exc): |
| self._activate_quota_backoff() |
| self.logger.warning("Firebase quota exceeded while writing %s/%s; write skipped", collection, safe_doc) |
| else: |
| self.logger.exception("Failed to write Firebase document %s/%s", collection, safe_doc) |
| return False |
|
|
| def list_documents(self, collection: str, limit: int = 200, ttl_sec: float = 30.0) -> list[dict[str, Any]]: |
| cache_key = f"{collection}:list:{max(1, int(limit))}" |
| now = time.time() |
| cached = self._list_cache.get(cache_key) |
| if cached and (now - cached[0]) < ttl_sec: |
| return [dict(item) for item in (cached[1] or [])] |
| backend = self.backend() |
| if backend == "google_drive": |
| items = self._drive.list_documents(collection, limit=max(1, int(limit))) |
| self._list_cache[cache_key] = (now, [dict(item) for item in items]) |
| return items |
| if backend == "file": |
| items: list[dict[str, Any]] = [] |
| try: |
| paths = sorted( |
| self._file_collection_dir(collection).glob("*.json"), |
| key=lambda path: path.stat().st_mtime, |
| reverse=True, |
| ) |
| for path in paths[: max(1, int(limit))]: |
| payload = json.loads(path.read_text(encoding="utf-8")) |
| payload.setdefault("id", path.stem) |
| items.append(payload) |
| except Exception: |
| self.logger.warning("Failed to list shared-state files for %s", collection, exc_info=True) |
| self._list_cache[cache_key] = (now, [dict(item) for item in items]) |
| return items |
| if self._quota_backoff_active(): |
| return [dict(item) for item in ((cached[1] if cached else []) or [])] |
| db = self._client() |
| if db is None: |
| return [dict(item) for item in ((cached[1] if cached else []) or [])] |
| try: |
| docs = db.collection(self._collection(collection)).limit(max(1, int(limit))).stream() |
| items: list[dict[str, Any]] = [] |
| for doc in docs: |
| payload = doc.to_dict() or {} |
| payload.setdefault("id", doc.id) |
| items.append(payload) |
| self._list_cache[cache_key] = (now, [dict(item) for item in items]) |
| return items |
| except Exception as exc: |
| if self._is_quota_error(exc): |
| self._activate_quota_backoff() |
| self.logger.warning("Firebase quota exceeded while listing %s; using cache/backoff", collection) |
| else: |
| self.logger.exception("Failed to list Firebase collection %s", collection) |
| return [dict(item) for item in ((cached[1] if cached else []) or [])] |
|
|
| def delete_document(self, collection: str, doc_id: str) -> bool: |
| safe_doc = self._safe_id(doc_id) |
| backend = self.backend() |
| if backend == "google_drive": |
| deleted = self._drive.delete_document(collection, safe_doc) |
| if deleted: |
| self._read_cache.pop(f"{collection}:{safe_doc}", None) |
| self._write_cache.pop(f"{collection}:{safe_doc}", None) |
| stale_prefix = f"{collection}:list:" |
| for key in list(self._list_cache.keys()): |
| if key.startswith(stale_prefix): |
| self._list_cache.pop(key, None) |
| return deleted |
| if backend == "file": |
| try: |
| path = self._file_doc_path(collection, safe_doc) |
| if path.exists(): |
| path.unlink() |
| self._read_cache.pop(f"{collection}:{safe_doc}", None) |
| self._write_cache.pop(f"{collection}:{safe_doc}", None) |
| stale_prefix = f"{collection}:list:" |
| for key in list(self._list_cache.keys()): |
| if key.startswith(stale_prefix): |
| self._list_cache.pop(key, None) |
| return True |
| except Exception: |
| self.logger.warning("Failed to delete shared-state file %s/%s", collection, safe_doc, exc_info=True) |
| return False |
| if self._quota_backoff_active(): |
| return False |
| db = self._client() |
| if db is None: |
| return False |
| try: |
| db.collection(self._collection(collection)).document(safe_doc).delete() |
| self._read_cache.pop(f"{collection}:{safe_doc}", None) |
| self._write_cache.pop(f"{collection}:{safe_doc}", None) |
| stale_prefix = f"{collection}:list:" |
| for key in list(self._list_cache.keys()): |
| if key.startswith(stale_prefix): |
| self._list_cache.pop(key, None) |
| return True |
| except Exception as exc: |
| if self._is_quota_error(exc): |
| self._activate_quota_backoff() |
| self.logger.warning("Firebase quota exceeded while deleting %s/%s; delete skipped", collection, safe_doc) |
| else: |
| self.logger.exception("Failed to delete Firebase document %s/%s", collection, safe_doc) |
| return False |
|
|