| """ |
| ENGRAM Protocol β ENGRAM Python Client |
| |
| |
| Async HTTP client wrapping all ENGRAM API endpoints. |
| This is what agents import to interact with the engram store. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Any |
|
|
| import httpx |
|
|
|
|
| class EngramClient: |
| """Python client for the ENGRAM REST API. |
| |
| Usage: |
| client = EngramClient("http://localhost:8080") |
| result = client.store_file(path, agent_id="worker", task="analyze code", model_id="llama-3.1-8b") |
| matches = client.search(task_description="debug auth error", top_k=3) |
| data = client.get(matches[0]["cache_id"]) |
| """ |
|
|
| def __init__(self, base_url: str = "http://localhost:8080", timeout: float = 30.0): |
| self.base_url = base_url.rstrip("/") |
| self._client = httpx.Client(base_url=f"{self.base_url}/v1", timeout=timeout) |
|
|
| def close(self) -> None: |
| self._client.close() |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, *args): |
| self.close() |
|
|
| |
|
|
| def health(self) -> dict[str, Any]: |
| """Check ENGRAM server health.""" |
| resp = self._client.get("/health") |
| resp.raise_for_status() |
| return resp.json() |
|
|
| |
|
|
| def store_file( |
| self, |
| file_path: Path, |
| agent_id: str, |
| task_description: str, |
| model_id: str, |
| compression: str = "q8_0", |
| ) -> dict[str, Any]: |
| """Upload a .eng file to the engram store. |
| |
| Args: |
| file_path: Path to the .eng file |
| agent_id: Agent identifier |
| task_description: Human-readable description |
| model_id: Model identifier |
| compression: Compression method used |
| |
| Returns: |
| Dict with cache_id, size_bytes, compression_ratio, path |
| """ |
| with open(file_path, "rb") as f: |
| resp = self._client.post( |
| "/cache", |
| params={ |
| "agent_id": agent_id, |
| "task_description": task_description, |
| "model_id": model_id, |
| "compression": compression, |
| }, |
| files={"file": (file_path.name, f, "application/octet-stream")}, |
| ) |
| resp.raise_for_status() |
| return resp.json() |
|
|
| def store_bytes( |
| self, |
| data: bytes, |
| agent_id: str, |
| task_description: str, |
| model_id: str, |
| compression: str = "q8_0", |
| filename: str = "cache.eng", |
| ) -> dict[str, Any]: |
| """Upload raw .eng bytes to the engram store.""" |
| resp = self._client.post( |
| "/cache", |
| params={ |
| "agent_id": agent_id, |
| "task_description": task_description, |
| "model_id": model_id, |
| "compression": compression, |
| }, |
| files={"file": (filename, data, "application/octet-stream")}, |
| ) |
| resp.raise_for_status() |
| return resp.json() |
|
|
| |
|
|
| def get(self, cache_id: str) -> bytes: |
| """Retrieve a .eng file by cache ID. |
| |
| Returns raw bytes of the .eng file. |
| """ |
| resp = self._client.get(f"/cache/{cache_id}") |
| resp.raise_for_status() |
| return resp.content |
|
|
| |
|
|
| def search( |
| self, |
| task_description: str, |
| model_id: str | None = None, |
| top_k: int = 5, |
| min_similarity: float | None = None, |
| ) -> list[dict[str, Any]]: |
| """Search for similar engram states. |
| |
| Returns list of search result dicts with cache_id, similarity, etc. |
| """ |
| body: dict[str, Any] = { |
| "task_description": task_description, |
| "top_k": top_k, |
| } |
| if model_id: |
| body["model_id"] = model_id |
| if min_similarity is not None: |
| body["min_similarity"] = min_similarity |
|
|
| resp = self._client.post("/cache/search", json=body) |
| resp.raise_for_status() |
| return resp.json()["results"] |
|
|
| |
|
|
| def delete(self, cache_id: str) -> bool: |
| """Delete an engram from storage and index.""" |
| resp = self._client.delete(f"/cache/{cache_id}") |
| resp.raise_for_status() |
| return resp.json()["deleted"] |
|
|
| |
|
|
| def stats(self) -> dict[str, Any]: |
| """Get aggregate engram store statistics.""" |
| resp = self._client.get("/cache/stats") |
| resp.raise_for_status() |
| return resp.json() |
|
|