| """ |
| ENGRAM Protocol β API Routes |
| |
| |
| FastAPI route handlers for the ENGRAM REST API. |
| All endpoints under /v1/ prefix. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from fastapi import APIRouter, HTTPException, UploadFile, File |
|
|
| from kvcos.api.schemas import ( |
| DeleteResponse, |
| HealthResponse, |
| SearchRequest, |
| SearchResponse, |
| SearchResultItem, |
| StatsResponse, |
| StoreResponse, |
| ) |
| from kvcos.core.types import ENGRAM_VERSION |
|
|
| router = APIRouter(prefix="/v1") |
|
|
|
|
| |
| |
| |
|
|
| _retriever = None |
| _storage = None |
| _index = None |
|
|
|
|
| def _get_retriever(): |
| if _retriever is None: |
| raise HTTPException(503, "ENGRAM not initialized. Server starting up.") |
| return _retriever |
|
|
|
|
| def _get_storage(): |
| if _storage is None: |
| raise HTTPException(503, "ENGRAM not initialized. Server starting up.") |
| return _storage |
|
|
|
|
| def _get_index(): |
| if _index is None: |
| raise HTTPException(503, "ENGRAM not initialized. Server starting up.") |
| return _index |
|
|
|
|
| |
|
|
|
|
| @router.get("/health", response_model=HealthResponse) |
| async def health(): |
| """Health check endpoint.""" |
| index = _get_index() |
| storage = _get_storage() |
| return HealthResponse( |
| status="ok", |
| version=ENGRAM_VERSION, |
| index_entries=index.n_entries, |
| storage_backend="local", |
| ) |
|
|
|
|
| |
|
|
|
|
| @router.get("/cache/stats", response_model=StatsResponse) |
| async def cache_stats(): |
| """Get aggregate statistics for the engram store.""" |
| storage = _get_storage() |
| stats = storage.stats() |
| return StatsResponse( |
| total_entries=stats["total_entries"], |
| total_size_bytes=stats["total_size_bytes"], |
| total_size_mb=round(stats["total_size_bytes"] / (1024 * 1024), 2), |
| avg_compression_ratio=stats["avg_compression_ratio"], |
| model_breakdown=stats["model_breakdown"], |
| ) |
|
|
|
|
| |
|
|
|
|
| @router.post("/cache", response_model=StoreResponse) |
| async def store_cache( |
| agent_id: str, |
| task_description: str, |
| model_id: str, |
| file: UploadFile = File(...), |
| compression: str = "q8_0", |
| ): |
| """Store a .eng file in the engram store. |
| |
| Accepts a pre-serialized .eng file upload. |
| The file is stored and its metadata indexed for EGR retrieval. |
| """ |
| storage = _get_storage() |
|
|
| data = await file.read() |
| if len(data) == 0: |
| raise HTTPException(400, "Empty file upload") |
|
|
| import uuid |
| cache_id = str(uuid.uuid4()) |
|
|
| from kvcos.core.types import EngramMetadata |
| from datetime import datetime, timezone |
|
|
| metadata: EngramMetadata = { |
| "engram_version": ENGRAM_VERSION, |
| "cache_id": cache_id, |
| "compression": compression, |
| "model_id": model_id, |
| "model_family": "", |
| "n_layers": "0", |
| "n_heads": "0", |
| "n_kv_heads": "0", |
| "head_dim": "0", |
| "context_len": "0", |
| "agent_id": agent_id, |
| "task_description": task_description, |
| "created_at": datetime.now(timezone.utc).isoformat(), |
| } |
|
|
| path = storage.store(cache_id, data, metadata) |
|
|
| return StoreResponse( |
| cache_id=cache_id, |
| size_bytes=len(data), |
| compression_ratio=1.0, |
| path=path, |
| ) |
|
|
|
|
| |
|
|
|
|
| @router.get("/cache/{cache_id}") |
| async def get_cache(cache_id: str): |
| """Retrieve a .eng file by cache ID. |
| |
| Returns the raw .eng file bytes (application/octet-stream). |
| """ |
| storage = _get_storage() |
|
|
| data = storage.get(cache_id) |
| if data is None: |
| raise HTTPException(404, f"Cache entry not found: {cache_id}") |
|
|
| from fastapi.responses import Response |
| return Response( |
| content=data, |
| media_type="application/octet-stream", |
| headers={"Content-Disposition": f'attachment; filename="{cache_id}.eng"'}, |
| ) |
|
|
|
|
| |
|
|
|
|
| @router.post("/cache/search", response_model=SearchResponse) |
| async def search_cache(req: SearchRequest): |
| """Search for similar engram states via EGR manifold search. |
| |
| Uses inner product similarity (MIPS) in the model's pre-RoPE |
| key manifold. D2: KβK retrieval only. |
| """ |
| index = _get_index() |
|
|
| |
| |
| |
| |
|
|
| |
| storage = _get_storage() |
| entries = storage.list_entries(model_family=None, limit=req.top_k) |
|
|
| results = [ |
| SearchResultItem( |
| cache_id=e.get("cache_id", ""), |
| similarity=0.0, |
| task_description=e.get("task_description", ""), |
| model_id=e.get("model_id", ""), |
| created_at=e.get("created_at", ""), |
| context_len=int(e.get("context_len", "0")), |
| ) |
| for e in entries |
| if (req.model_id is None or e.get("model_id") == req.model_id) |
| ] |
|
|
| return SearchResponse(results=results[:req.top_k], n_searched=index.n_entries) |
|
|
|
|
| |
|
|
|
|
| @router.delete("/cache/{cache_id}", response_model=DeleteResponse) |
| async def delete_cache(cache_id: str): |
| """Delete an engram from storage and index.""" |
| retriever = _get_retriever() |
| deleted = retriever.delete_engram(cache_id) |
| return DeleteResponse(deleted=deleted, cache_id=cache_id) |
|
|
|
|
|
|