| """ |
| ENGRAM Protocol β Demo Agent Session |
| |
| |
| End-to-end demonstration: |
| 1. Load model via llama-cpp-python (D1) |
| 2. Generate with a prompt β measure cold TTFT |
| 3. Extract KV cache β compress β serialize to .eng |
| 4. Index in EGR manifold index |
| 5. Reset model β restore from .eng β measure cached TTFT |
| 6. Print speedup ratio |
| |
| D6: Target >10x TTFT reduction at 16K context on Llama 3.1 8B. |
| Cold baseline: ~1,500-5,000ms. Cached target: <500ms. |
| Anything below 4x at 16K is a failure. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import sys |
| import time |
| from pathlib import Path |
|
|
|
|
| def _run_dry_run(args: argparse.Namespace) -> int: |
| """Run full pipeline with synthetic tensors β no model file needed.""" |
| import os |
| import tempfile |
|
|
| import torch |
|
|
| from kvcos.core.cache_spec import LLAMA_3_1_8B |
| from kvcos.core.serializer import EngramSerializer |
| from kvcos.core.types import CompressionMethod, StateExtractionMode |
| from kvcos.core.manifold_index import IndexEntry, ManifoldIndex |
| from kvcos.core.state_extractor import MARStateExtractor |
| from kvcos.storage.local import LocalStorageBackend |
|
|
| spec = LLAMA_3_1_8B |
| ctx_len = args.context |
| model_name = spec["model_id"] |
|
|
| |
| torch.manual_seed(42) |
| shape = (spec["n_layers"], spec["n_kv_heads"], ctx_len, spec["head_dim"]) |
| keys = torch.randn(shape, dtype=torch.float16) |
| values = torch.randn(shape, dtype=torch.float16) |
|
|
| tensor_mb = keys.numel() * keys.element_size() / 1024 / 1024 |
|
|
| with tempfile.TemporaryDirectory() as tmp: |
| tmp_dir = Path(tmp) |
|
|
| |
| serializer = EngramSerializer() |
| eng_path = tmp_dir / "dry_run.eng" |
|
|
| t0 = time.perf_counter() |
| result = serializer.serialize( |
| keys=keys, values=values, |
| agent_id="dry-run-agent", |
| task_description="dry run benchmark", |
| model_id=model_name, |
| output_path=eng_path, |
| compression=CompressionMethod.Q8_0, |
| ) |
| serialize_ms = (time.perf_counter() - t0) * 1000 |
|
|
| |
| t0 = time.perf_counter() |
| k_out, v_out, meta = serializer.deserialize(eng_path) |
| deserialize_ms = (time.perf_counter() - t0) * 1000 |
|
|
| assert k_out.shape == keys.shape, f"Shape mismatch: {k_out.shape} vs {keys.shape}" |
|
|
| |
| extractor = MARStateExtractor( |
| mode=StateExtractionMode.SVD_PROJECT, |
| rank=min(160, spec["head_dim"]), |
| ) |
| dim = extractor.output_dim(spec) |
| index = ManifoldIndex(dim=dim) |
| storage = LocalStorageBackend(data_dir=tmp_dir) |
|
|
| |
| t0 = time.perf_counter() |
| extraction = extractor.extract(keys, spec) |
| t_extract = time.perf_counter() |
|
|
| eng2 = tmp_dir / "indexed.eng" |
| serializer.serialize( |
| keys=keys, values=values, |
| agent_id="dry-run-agent", |
| task_description="dry run benchmark", |
| model_id=model_name, |
| output_path=eng2, |
| compression=CompressionMethod.Q8_0, |
| cache_id="dry-run-001", |
| ) |
| t_serialize = time.perf_counter() |
|
|
| idx_meta = serializer.read_metadata_only(eng2) |
| storage.store_file("dry-run-001", eng2, idx_meta) |
| t_store = time.perf_counter() |
|
|
| from datetime import datetime, timezone |
| entry = IndexEntry( |
| cache_id="dry-run-001", |
| task_description="dry run benchmark", |
| model_id=model_name, |
| created_at=datetime.now(timezone.utc).isoformat(), |
| context_len=ctx_len, |
| l2_norm=extraction.l2_norm, |
| ) |
| index.add(extraction.state_vec, entry) |
| t_add = time.perf_counter() |
|
|
| extract_ms = (t_extract - t0) * 1000 |
| ser_ms = (t_serialize - t_extract) * 1000 |
| store_ms = (t_store - t_serialize) * 1000 |
| add_ms = (t_add - t_store) * 1000 |
| index_ms = (t_add - t0) * 1000 |
|
|
| |
| torch.manual_seed(99) |
| query_keys = torch.randn(shape, dtype=torch.float16) |
|
|
| t0 = time.perf_counter() |
| q_ext = extractor.extract(query_keys, spec) |
| t_qext = time.perf_counter() |
|
|
| results = index.search(q_ext.state_vec, top_k=1) |
| t_search = time.perf_counter() |
|
|
| |
| stored_path = storage.get_path("dry-run-001") |
| k_loaded, v_loaded, _ = serializer.deserialize(stored_path) |
| t_load = time.perf_counter() |
|
|
| q_extract_ms = (t_qext - t0) * 1000 |
| search_ms = (t_search - t_qext) * 1000 |
| load_ms = (t_load - t_search) * 1000 |
| retrieve_ms = (t_load - t0) * 1000 |
|
|
| |
| cold_ms = ctx_len * 0.1 |
| cached_ms = deserialize_ms |
| egr_overhead = extract_ms + search_ms |
| speedup = cold_ms / cached_ms if cached_ms > 0 else float("inf") |
| eng_size_mb = os.path.getsize(eng_path) / 1024 / 1024 |
|
|
| |
| sep = "=" * 35 |
| print(sep) |
| print("ENGRAM Protocol \u2014 EGR Demo") |
| print(f"Model: {model_name}") |
| print(f"Context: {ctx_len} tokens") |
| print(sep) |
| print(f"Cold TTFT: {cold_ms:.1f}ms (simulated)") |
| print(f"Cached TTFT: {cached_ms:.1f}ms (deserialize)") |
| print(f"Speedup: {speedup:.1f}x") |
| print(f"D6 target: >10x at 16K tokens") |
| status = "PASS" if speedup > 10 else "FAIL" |
| print(f"Status: {status}") |
| print(f"EGR overhead: {egr_overhead:.1f}ms (extract+search)") |
| print(f".eng file: {eng_path.name} ({eng_size_mb:.1f}MB)") |
| print(f"Tensor shape: {list(shape)} ({tensor_mb:.0f}MB per K/V)") |
| print(sep) |
| print() |
| print("Index breakdown:") |
| print(f" SVD extract: {extract_ms:8.1f}ms") |
| print(f" Serialize .eng: {ser_ms:8.1f}ms") |
| print(f" Store backend: {store_ms:8.1f}ms") |
| print(f" FAISS add(): {add_ms:8.1f}ms") |
| print(f" TOTAL: {index_ms:8.1f}ms") |
| print() |
| print("Retrieve breakdown:") |
| print(f" SVD extract: {q_extract_ms:8.1f}ms") |
| print(f" FAISS search(): {search_ms:8.1f}ms") |
| print(f" Load+deser: {load_ms:8.1f}ms") |
| print(f" TOTAL: {retrieve_ms:8.1f}ms") |
| print() |
| print("Verification:") |
| print(f" Round-trip shape: {'OK' if k_out.shape == keys.shape else 'FAIL'}") |
| print(f" Retrieval result: {'OK' if len(results) >= 1 else 'FAIL'}") |
| print(f" .eng valid: {'OK' if eng_path.exists() else 'FAIL'}") |
|
|
| return 0 if speedup > 10 else 1 |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="ENGRAM Protocol β Demo Agent Session", |
| epilog="D6: >10x TTFT reduction at 16K context on Llama 3.1 8B", |
| ) |
| parser.add_argument( |
| "--model", "-m", default=None, |
| help="Path to GGUF model file (required unless --dry-run)", |
| ) |
| parser.add_argument( |
| "--context", "-c", type=int, default=4096, |
| help="Context length to fill (tokens). Default: 4096", |
| ) |
| parser.add_argument( |
| "--n-ctx", type=int, default=16384, |
| help="Max context window for model. Default: 16384", |
| ) |
| parser.add_argument( |
| "--data-dir", type=str, default=None, |
| help="ENGRAM data directory. Default: ~/.engram/data", |
| ) |
| parser.add_argument( |
| "--dry-run", action="store_true", |
| help="Run full pipeline with synthetic tensors (no model needed)", |
| ) |
| parser.add_argument( |
| "--verbose", "-v", action="store_true", |
| help="Enable verbose output", |
| ) |
| args = parser.parse_args() |
|
|
| if args.dry_run: |
| return _run_dry_run(args) |
|
|
| if not args.model: |
| parser.error("--model is required unless --dry-run is specified") |
|
|
| print("=" * 70) |
| print("ENGRAM Protocol β Demo Agent Session") |
| print("KV cache fingerprinting for persistent semantic retrieval") |
| print("=" * 70) |
| print() |
|
|
| |
| from kvcos.core.config import get_config |
| from kvcos.core.serializer import EngramSerializer |
| from kvcos.core.types import CompressionMethod, StateExtractionMode |
| from kvcos.core.manifold_index import ManifoldIndex |
| from kvcos.core.retriever import EGRRetriever |
| from kvcos.core.state_extractor import MARStateExtractor |
| from kvcos.storage.local import LocalStorageBackend |
| from integrations.llama_cpp_bridge import LlamaCppBridge |
|
|
| config = get_config() |
| data_dir = Path(args.data_dir) if args.data_dir else config.data_dir |
|
|
| |
| print(f"[1/6] Loading model: {args.model}") |
| bridge = LlamaCppBridge( |
| model_path=args.model, |
| n_ctx=args.n_ctx, |
| n_gpu_layers=0, |
| verbose=args.verbose, |
| ) |
| spec = bridge.load_model() |
| print(f" Model: {spec['model_id']}") |
| print(f" Architecture: {spec['n_layers']}L / {spec['n_heads']}H / {spec['n_kv_heads']}KV / {spec['head_dim']}D") |
| print(f" Context window: {args.n_ctx}") |
| print() |
|
|
| |
| filler = "The quick brown fox jumps over the lazy dog. " * 100 |
| target_tokens = args.context |
| prompt = filler[:target_tokens * 4] |
|
|
| print(f"[2/6] Cold prefill ({target_tokens} target tokens)...") |
| t0 = time.perf_counter() |
| cold = bridge.measure_cold_ttft(prompt) |
| print(f" Cold TTFT: {cold.ttft_ms:.1f}ms ({cold.context_len} tokens)") |
| print() |
|
|
| |
| print("[3/6] Extracting KV cache...") |
| try: |
| parsed = bridge.extract_kv_cache() |
| print(f" Keys shape: {list(parsed.keys.shape)}") |
| print(f" Values shape: {list(parsed.values.shape)}") |
| print(f" Cells: {parsed.n_cells}") |
| except Exception as e: |
| print(f" KV extraction failed: {e}") |
| print(" This is expected if the blob format doesn't match.") |
| print(" Falling back to save_state/load_state raw blob path.") |
| parsed = None |
| print() |
|
|
| print("[3b/6] Saving raw state blob...") |
| raw_state = bridge.llm.save_state() |
| raw_blob = bytes(raw_state.llama_state) |
| print(f" Raw state size: {len(raw_blob) / 1024 / 1024:.1f} MB") |
|
|
| if parsed is not None: |
| print("[3c/6] Serializing to .eng format...") |
| serializer = EngramSerializer() |
| eng_path = data_dir / "demo" / "session_001.eng" |
| result = serializer.serialize( |
| keys=parsed.keys, |
| values=parsed.values, |
| agent_id="demo-agent", |
| task_description="demo session - cold prefill benchmark", |
| model_id=spec["model_id"], |
| output_path=eng_path, |
| compression=CompressionMethod.Q8_0, |
| ) |
| print(f" .eng file: {result['path']}") |
| print(f" Size: {result['size_bytes'] / 1024 / 1024:.1f} MB") |
| print(f" Compression ratio: {result['compression_ratio']:.2f}x") |
| print() |
|
|
| |
| if parsed is not None: |
| print("[4/6] Indexing in EGR manifold index...") |
| storage = LocalStorageBackend(data_dir=data_dir) |
| extractor = MARStateExtractor( |
| mode=StateExtractionMode.SVD_PROJECT, |
| rank=min(160, spec["head_dim"]), |
| ) |
| dim = extractor.output_dim(spec) |
| index = ManifoldIndex(dim=dim) |
| retriever = EGRRetriever(extractor, index, storage) |
|
|
| cache_id = retriever.index_engram( |
| keys=parsed.keys, |
| values=parsed.values, |
| spec=spec, |
| agent_id="demo-agent", |
| task_description="demo session - cold prefill benchmark", |
| model_id=spec["model_id"], |
| ) |
| print(f" Indexed: {cache_id}") |
| print(f" State vector dim: {dim}") |
| print(f" Index entries: {index.n_entries}") |
| else: |
| print("[4/6] Skipped (KV extraction failed)") |
| print() |
|
|
| |
| print("[5/6] Restoring from cached state...") |
| t0 = time.perf_counter() |
| cached = bridge.measure_cached_ttft(raw_blob) |
| print(f" Cached TTFT: {cached.ttft_ms:.1f}ms") |
| print() |
|
|
| |
| cold_ms = cold.ttft_ms |
| cached_ms = cached.ttft_ms |
| speedup = cold_ms / cached_ms if cached_ms > 0 else float("inf") |
|
|
| eng_path_str = result["path"] if parsed else "N/A" |
| eng_size_kb = result["size_bytes"] / 1024 if parsed else 0 |
|
|
| sep = "=" * 35 |
| print(sep) |
| print("ENGRAM Protocol β EGR Demo") |
| print(f"Model: {spec['model_id']}") |
| print(f"Context: {cold.context_len} tokens") |
| print(sep) |
| print(f"Cold TTFT: {cold_ms:.1f}ms") |
| print(f"Cached TTFT: {cached_ms:.1f}ms") |
| print(f"Speedup: {speedup:.1f}x") |
| print(f"D6 target: >10x at 16K tokens") |
| status = "PASS" if speedup > 10 else "FAIL" |
| print(f"Status: {status}") |
| print(f".eng file: {eng_path_str} ({eng_size_kb:.1f}KB)") |
| print(sep) |
|
|
| return 0 if speedup >= 4 else 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|