| """ |
| ENGRAM Protocol β EGR Semantic Proof Script |
| Definitive KβK retrieval validation with diverse, non-repeated documents. |
| |
| Usage: |
| KMP_DUPLICATE_LIB_OK=TRUE OMP_NUM_THREADS=1 PYTHONPATH=. \ |
| .venv/bin/python scripts/egr_semantic_proof.py \ |
| --model /path/to/model.gguf \ |
| --ctx 16384 --n-trials 3 --layer-range 8 24 \ |
| --output results/egr_semantic_proof_8B_14K.json --verbose |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import gc |
| import json |
| import math |
| import os |
| import sys |
| import tempfile |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
|
|
| import torch |
|
|
| |
|
|
| DOC_A = """ |
| The transformer architecture introduced in "Attention Is All You Need" |
| replaced recurrent networks with self-attention as the core computational |
| primitive. Self-attention computes a weighted sum of value vectors, where |
| weights derive from the compatibility between query and key vectors. |
| For a sequence of length n, the attention matrix has shape nΓn, |
| making vanilla attention quadratic in both time and memory. |
| |
| Multi-head attention partitions the embedding dimension into h parallel |
| subspaces. Each head independently computes attention using its own |
| learned projections W_Q, W_K, W_V of dimension d_model/h. The outputs |
| are concatenated and projected back to d_model via W_O. This allows |
| different heads to specialize in different relational patterns: |
| some heads track syntactic dependencies, others semantic similarity, |
| others coreference chains across longer distances. |
| |
| Grouped-query attention generalizes multi-head and multi-query attention. |
| Rather than one KV pair per query head (MHA) or one KV pair for all |
| heads (MQA), GQA assigns one KV pair per group of g query heads. |
| Llama 3 uses GQA with 8 KV heads for 32 query heads, reducing |
| KV cache memory by 4Γ with minimal quality degradation. |
| |
| Rotary position embeddings encode absolute position by rotating |
| query and key vectors in 2D subspaces of the head dimension. |
| Unlike learned absolute embeddings or sinusoidal encodings, |
| RoPE naturally extrapolates to sequences longer than those seen |
| during training by preserving the inner product between positions |
| i and j as a function only of their relative offset i-j. |
| |
| The KV cache enables efficient autoregressive generation by storing |
| computed key and value matrices from all previous positions. |
| Without caching, generating a sequence of length L requires O(LΒ²) |
| attention operations. With caching, each new token requires only |
| O(L) operations β one attention pass over the cached KV pairs. |
| |
| Flash attention avoids materializing the full nΓn attention matrix |
| by tiling the computation into blocks that fit in SRAM. The forward |
| pass fuses the softmax and matrix multiply into a single kernel, |
| achieving O(n) memory complexity while maintaining exact numerical |
| equivalence to standard attention. |
| |
| Mixture-of-experts transformer variants route each token to a sparse |
| subset of feed-forward experts using a learned routing function. |
| Mistral's Mixtral 8Γ7B activates 2 of 8 experts per token, |
| achieving 7B-parameter inference cost with 47B total parameters. |
| Expert specialization emerges: some experts process syntactic |
| patterns, others domain-specific content, without explicit supervision. |
| |
| Layer normalization applied before the attention sublayer (Pre-LN) |
| stabilizes training compared to Post-LN by ensuring gradients flow |
| through the residual stream without vanishing through normalized paths. |
| Modern architectures including Llama, Mistral, and GPT-NeoX all |
| adopt Pre-LN with RMSNorm, dropping the learned bias parameters. |
| """ |
|
|
| DOC_B = """ |
| DNA replication in eukaryotic cells initiates at multiple origins |
| of replication simultaneously, enabling the duplication of genomes |
| containing billions of base pairs within hours. The origin recognition |
| complex marks these sites, recruiting CDC6 and CDT1 to load the |
| MCM helicase onto double-stranded DNA during G1 phase. |
| |
| The MCM complex unwinds the double helix at replication forks, |
| separating the complementary strands to serve as templates. |
| DNA polymerase delta and epsilon synthesize the lagging and leading |
| strands respectively, both requiring a short RNA primer synthesized |
| by primase to provide a free 3'-OH group for extension. |
| |
| Topoisomerase II resolves the positive supercoils that accumulate |
| ahead of the replication fork as the helix is unwound. Without |
| topoisomerase activity, the torsional stress would stall replication. |
| Type II topoisomerases cleave both strands simultaneously, pass |
| a second duplex through the break, and religate β changing |
| the linking number by two per catalytic cycle. |
| |
| Protein synthesis begins with mRNA recognition by the 43S |
| pre-initiation complex, comprising the 40S ribosomal subunit, |
| eIF2-GTP-Met-tRNA, and accessory factors. The complex scans |
| 5' to 3' until it encounters the AUG start codon in a favorable |
| Kozak context. The 60S subunit then joins to form the 80S ribosome. |
| |
| Elongation proceeds by aminoacyl-tRNA accommodation at the A-site, |
| peptide bond formation catalyzed by the peptidyl transferase center |
| of the 23S rRNA, and translocation driven by EF-G and GTP hydrolysis. |
| Each elongation cycle advances the ribosome by exactly one codon, |
| consuming one GTP equivalent and incorporating one amino acid. |
| |
| Cell signaling cascades amplify extracellular signals through |
| phosphorylation networks. The MAPK/ERK pathway converts growth |
| factor receptor activation into nuclear transcription factor |
| phosphorylation through RAF, MEK, and ERK kinases. Signal amplitude |
| and duration encode distinct transcriptional outcomes β transient |
| ERK activation drives proliferation while sustained activation |
| drives differentiation in PC12 cells. |
| |
| CRISPR-Cas9 genome editing exploits the bacterial adaptive immunity |
| system in which Cas9 endonuclease is guided by a 20-nucleotide |
| spacer sequence in the sgRNA to cleave complementary genomic DNA. |
| The PAM sequence NGG immediately 3' of the target site is required |
| for Cas9 binding and R-loop formation. Double-strand breaks are |
| repaired by NHEJ (causing indels) or HDR (enabling precise edits). |
| """ |
|
|
| QUERY = "How does the attention mechanism use keys and queries to compute weighted context representations in transformer models?" |
|
|
|
|
| def run_trial( |
| llm, |
| n_kv_heads: int, |
| head_dim: int, |
| spec: dict, |
| extractor, |
| doc_a: str, |
| doc_b: str, |
| query: str, |
| trial_id: int, |
| verbose: bool, |
| ) -> dict: |
| """Run a single EGR semantic proof trial.""" |
| from kvcos.core.blob_parser import parse_state_blob |
| from kvcos.core.manifold_index import IndexEntry, ManifoldIndex |
|
|
| dim = extractor.output_dim(spec) |
| index = ManifoldIndex(dim=dim) |
|
|
| |
| llm.reset() |
| t0 = time.perf_counter() |
| llm(doc_a, max_tokens=1, temperature=0.0) |
| cold_ms = (time.perf_counter() - t0) * 1000 |
| n_tok_a = llm.n_tokens |
|
|
| state_a = llm.save_state() |
| blob_a = bytes(state_a.llama_state) |
| blob_mb = len(blob_a) / 1024 / 1024 |
|
|
| |
| llm.reset() |
| gc.collect() |
| t0 = time.perf_counter() |
| llm.load_state(state_a) |
| llm(" ", max_tokens=1, temperature=0.0) |
| warm_ms = (time.perf_counter() - t0) * 1000 |
| speedup = cold_ms / warm_ms if warm_ms > 0 else float("inf") |
|
|
| |
| t0 = time.perf_counter() |
| parsed_a = parse_state_blob(blob_a, n_kv_heads=n_kv_heads, head_dim=head_dim) |
| parse_ms = (time.perf_counter() - t0) * 1000 |
|
|
| t0 = time.perf_counter() |
| ext_a = extractor.extract(parsed_a.keys, spec) |
| extract_ms = (time.perf_counter() - t0) * 1000 |
|
|
| entry_a = IndexEntry( |
| cache_id="session-a", |
| task_description="Transformer attention mechanisms", |
| model_id=spec["model_id"], |
| created_at=datetime.now(timezone.utc).isoformat(), |
| context_len=parsed_a.n_cells, |
| l2_norm=ext_a.l2_norm, |
| ) |
| index.add(ext_a.state_vec, entry_a) |
|
|
| |
| llm.reset() |
| llm(doc_b, max_tokens=1, temperature=0.0) |
| n_tok_b = llm.n_tokens |
| state_b = llm.save_state() |
| blob_b = bytes(state_b.llama_state) |
| parsed_b = parse_state_blob(blob_b, n_kv_heads=n_kv_heads, head_dim=head_dim) |
| ext_b = extractor.extract(parsed_b.keys, spec) |
|
|
| entry_b = IndexEntry( |
| cache_id="session-b", |
| task_description="DNA replication and molecular biology", |
| model_id=spec["model_id"], |
| created_at=datetime.now(timezone.utc).isoformat(), |
| context_len=parsed_b.n_cells, |
| l2_norm=ext_b.l2_norm, |
| ) |
| index.add(ext_b.state_vec, entry_b) |
|
|
| |
| llm.reset() |
| llm(query, max_tokens=1, temperature=0.0) |
| n_tok_q = llm.n_tokens |
| state_q = llm.save_state() |
| blob_q = bytes(state_q.llama_state) |
| parsed_q = parse_state_blob(blob_q, n_kv_heads=n_kv_heads, head_dim=head_dim) |
|
|
| t0 = time.perf_counter() |
| ext_q = extractor.extract(parsed_q.keys, spec) |
| t1 = time.perf_counter() |
| results = index.search(ext_q.state_vec, top_k=2) |
| t2 = time.perf_counter() |
|
|
| search_ms = (t2 - t1) * 1000 |
| egr_total_ms = (t2 - t0) * 1000 + extract_ms |
|
|
| |
| score_a = next((r["similarity"] for r in results if "attention" in r["task_description"].lower() or "transformer" in r["task_description"].lower()), None) |
| score_b = next((r["similarity"] for r in results if "dna" in r["task_description"].lower() or "molecular" in r["task_description"].lower()), None) |
|
|
| if score_a is None or score_b is None: |
| |
| score_a = results[0]["similarity"] if results else 0 |
| score_b = results[1]["similarity"] if len(results) > 1 else 0 |
|
|
| margin = score_a - score_b |
| correct = len(results) > 0 and ( |
| "attention" in results[0]["task_description"].lower() |
| or "transformer" in results[0]["task_description"].lower() |
| ) |
|
|
| layer_range_used = list(extractor.layer_range) if extractor.layer_range else "spec_default" |
|
|
| trial = { |
| "trial_id": trial_id, |
| "n_cells_a": parsed_a.n_cells, |
| "n_cells_b": parsed_b.n_cells, |
| "n_cells_q": parsed_q.n_cells, |
| "score_a": round(score_a, 6), |
| "score_b": round(score_b, 6), |
| "margin": round(margin, 6), |
| "correct": correct, |
| "cold_ms": round(cold_ms, 1), |
| "warm_ms": round(warm_ms, 1), |
| "speedup": round(speedup, 1), |
| "parse_ms": round(parse_ms, 1), |
| "extract_ms": round(extract_ms, 1), |
| "search_ms": round(search_ms, 1), |
| "egr_total_ms": round(egr_total_ms, 1), |
| "blob_size_mb": round(blob_mb, 1), |
| "layer_range_used": layer_range_used, |
| "n_layers_used": extractor.layer_range[1] - extractor.layer_range[0] if extractor.layer_range else len(spec.get("extraction_layers", ())), |
| "svd_rank": extractor.rank, |
| "output_dim": dim, |
| } |
|
|
| if verbose: |
| print(f" Trial {trial_id}: margin={margin:.4f} correct={correct} " |
| f"cold={cold_ms:.0f}ms warm={warm_ms:.0f}ms " |
| f"egr={egr_total_ms:.1f}ms cells_a={parsed_a.n_cells}") |
|
|
| return trial |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser(description="ENGRAM EGR Semantic Proof") |
| parser.add_argument("--model", "-m", required=True, help="Path to GGUF model") |
| parser.add_argument("--ctx", type=int, default=16384, help="Context window") |
| parser.add_argument("--n-trials", type=int, default=3, help="Number of trials") |
| parser.add_argument("--layer-range", type=int, nargs=2, default=[8, 24], help="Layer range start end") |
| parser.add_argument("--gate-start", type=int, default=0, help="Skip top N singular values (0=none)") |
| parser.add_argument("--compression", default="FP16", help="Compression method: FP16, INT8, Q8_0") |
| parser.add_argument("--output", "-o", default="results/egr_semantic_proof.json", help="Output JSON path") |
| parser.add_argument("--verbose", "-v", action="store_true") |
| args = parser.parse_args() |
|
|
| from llama_cpp import Llama |
| import llama_cpp as lc |
|
|
| from kvcos.core.cache_spec import make_spec_from_metadata |
| from kvcos.core.types import StateExtractionMode |
| from kvcos.core.state_extractor import MARStateExtractor |
|
|
| layer_range = tuple(args.layer_range) |
|
|
| print(f"ENGRAM EGR Semantic Proof β {args.n_trials} trials") |
| print(f"Model: {args.model}") |
| print(f"Context: {args.ctx}, Layer range: {layer_range}") |
| print() |
|
|
| trials: list[dict] = [] |
| for trial_id in range(args.n_trials): |
| print(f"Trial {trial_id + 1}/{args.n_trials}...") |
|
|
| llm = Llama(model_path=args.model, n_ctx=args.ctx, n_gpu_layers=-1, verbose=False) |
| meta = llm.metadata |
| n_layers = int(meta.get("llama.block_count", "32")) |
| n_heads = int(meta.get("llama.attention.head_count", "32")) |
| n_kv_heads = int(meta.get("llama.attention.head_count_kv", "8")) |
| head_dim = int(meta.get("llama.embedding_length", "4096")) // n_heads |
| model_name = meta.get("general.name", Path(args.model).stem) |
|
|
| spec = make_spec_from_metadata( |
| model_id=model_name, n_layers=n_layers, n_heads=n_heads, |
| n_kv_heads=n_kv_heads, head_dim=head_dim, |
| ) |
|
|
| extractor = MARStateExtractor( |
| mode=StateExtractionMode.SVD_PROJECT, |
| rank=min(160, head_dim), |
| layer_range=layer_range, |
| gate_start=args.gate_start, |
| ) |
|
|
| trial = run_trial( |
| llm=llm, n_kv_heads=n_kv_heads, head_dim=head_dim, |
| spec=spec, extractor=extractor, |
| doc_a=DOC_A.strip(), doc_b=DOC_B.strip(), query=QUERY.strip(), |
| trial_id=trial_id, verbose=args.verbose, |
| ) |
| trials.append(trial) |
|
|
| del llm |
| gc.collect() |
|
|
| |
| margins = [t["margin"] for t in trials] |
| speedups = [t["speedup"] for t in trials] |
| egr_times = [t["egr_total_ms"] for t in trials] |
| n_correct = sum(1 for t in trials if t["correct"]) |
|
|
| mean_margin = sum(margins) / len(margins) |
| std_margin = math.sqrt(sum((m - mean_margin) ** 2 for m in margins) / max(len(margins) - 1, 1)) if len(margins) > 1 else 0.0 |
| mean_speedup = sum(speedups) / len(speedups) |
| std_speedup = math.sqrt(sum((s - mean_speedup) ** 2 for s in speedups) / max(len(speedups) - 1, 1)) if len(speedups) > 1 else 0.0 |
| mean_egr = sum(egr_times) / len(egr_times) |
| std_egr = math.sqrt(sum((e - mean_egr) ** 2 for e in egr_times) / max(len(egr_times) - 1, 1)) if len(egr_times) > 1 else 0.0 |
|
|
| passed = ( |
| mean_margin > 0.05 |
| and n_correct == args.n_trials |
| and mean_egr < 200 |
| and mean_speedup > 10 |
| ) |
|
|
| summary = { |
| "mean_margin": round(mean_margin, 4), |
| "std_margin": round(std_margin, 4), |
| "mean_speedup": round(mean_speedup, 1), |
| "std_speedup": round(std_speedup, 1), |
| "mean_egr_ms": round(mean_egr, 1), |
| "std_egr_ms": round(std_egr, 1), |
| "n_correct": n_correct, |
| "n_trials": args.n_trials, |
| "min_margin": round(min(margins), 4), |
| "max_margin": round(max(margins), 4), |
| "pass": passed, |
| } |
|
|
| |
| doc_a_tokens = trials[0]["n_cells_a"] if trials else 0 |
| doc_b_tokens = trials[0]["n_cells_b"] if trials else 0 |
| query_tokens = trials[0]["n_cells_q"] if trials else 0 |
|
|
| output = { |
| "metadata": { |
| "model": model_name, |
| "ctx": args.ctx, |
| "layer_range": list(layer_range), |
| "n_trials": args.n_trials, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "platform": "Apple M3 / macOS", |
| "llama_cpp_version": lc.__version__, |
| }, |
| "documents": { |
| "doc_a": {"description": "Transformer attention mechanisms (ML)", "n_tokens": doc_a_tokens}, |
| "doc_b": {"description": "DNA replication and molecular biology", "n_tokens": doc_b_tokens}, |
| "query": {"text": QUERY, "n_tokens": query_tokens}, |
| }, |
| "trials": trials, |
| "summary": summary, |
| } |
|
|
| |
| output_path = Path(args.output) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| output_path.write_text(json.dumps(output, indent=2)) |
| print(f"\nResults written to {output_path}") |
|
|
| |
| print() |
| sep = "=" * 55 |
| print(sep) |
| print("ENGRAM EGR Semantic Proof β Summary") |
| print(sep) |
| print(f"Model: {model_name}") |
| print(f"Context: {args.ctx}") |
| print(f"Layer range: {layer_range}") |
| print(f"Trials: {args.n_trials}") |
| print() |
| print(f"KβK margin: {mean_margin:.4f} Β± {std_margin:.4f} (min={min(margins):.4f}, max={max(margins):.4f})") |
| print(f"Correct: {n_correct}/{args.n_trials}") |
| print(f"Speedup: {mean_speedup:.1f}x Β± {std_speedup:.1f}x") |
| print(f"EGR ms: {mean_egr:.1f}ms Β± {std_egr:.1f}ms") |
| print() |
| verdict = "PASS" if passed else "FAIL" |
| reasons = [] |
| if mean_margin <= 0.05: |
| reasons.append(f"margin {mean_margin:.4f} <= 0.05") |
| if n_correct < args.n_trials: |
| reasons.append(f"correct {n_correct}/{args.n_trials}") |
| if mean_egr >= 200: |
| reasons.append(f"egr {mean_egr:.1f}ms >= 200ms") |
| if mean_speedup <= 10: |
| reasons.append(f"speedup {mean_speedup:.1f}x <= 10x") |
| reason_str = " | ".join(reasons) if reasons else "all criteria met" |
| print(f"Verdict: {verdict} ({reason_str})") |
| print(sep) |
|
|
| return 0 if passed else 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|