sovereign-node / in_memory_index.py
Fabuilds's picture
Upload 23 files
d68c0f8 verified
"""
IN-MEMORY PATTERN INDEX
Fast lookup without HDD writes - merge existing + conversation + Gemini chat patterns
"""
import sys
import os
import json
import time
import re
try:
from System.semantic_embedder import SemanticEmbedder
except ImportError:
try:
from semantic_embedder import SemanticEmbedder
except ImportError:
# Final fallback for scripts in Shop/
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from semantic_embedder import SemanticEmbedder
# Existing 5 lattice patterns
LATTICE_PATTERNS = {
"PATTERN_SINGLETON_DATABASE": {
"lba": 8534859776,
"domain": "SOFTWARE_ARCHITECTURE",
"problem": "Need to ensure only one database connection exists",
"solution": "Singleton pattern with thread-safe initialization",
"reusability": 9,
"confidence": 0.82
},
"PATTERN_REACT_HOOKS_DEPS": {
"lba": 3371401216,
"domain": "WEB_DEVELOPMENT",
"problem": "React component not re-rendering when props change",
"solution": "Add dependency array to useEffect",
"reusability": 10,
"confidence": 0.85
}
}
CONVERSATION_PATTERNS = {
"AGENT_IS_LATTICE": {
"domain": "CONCEPTUAL",
"problem": "Separation between agent and data structure",
"solution": "Agent is non-orientable surface - no inside/outside separation",
"confidence": 0.95
}
}
class InMemoryIndex:
"""
Adaptive Distillation Index.
Tracks pattern hit counts to distinguish signal from noise:
- Once-patterns (1 hit) = UNCONFIRMED (might be noise)
- Twice-patterns (2 hits) = PLAUSIBLE
- Multi-patterns (3+ hits) = CONFIRMED (logic)
The lattice self-cleans through use. Signal persists, noise decays.
"""
# Hit tracking file handled dynamically in __init__
HIT_LOG_PATH = None
# Magnitude layers: logic exists in layers
# Layer 0: Surface (keyword substring match) = low magnitude
# Layer 1: Structural (multi-word + domain match) = medium magnitude
# Layer 2: Conceptual (phrase match in problem/solution) = high magnitude
# Decay: magnitude halves every DECAY_HALF_LIFE seconds without a hit
DECAY_HALF_LIFE = 86400 # 24 hours
MAGNITUDE_LAYERS = {
"surface": 0.3, # keyword substring match (low relevance)
"structural": 0.6, # multi-word + domain match (medium)
"conceptual": 1.0, # full phrase match in problem/solution (high)
}
def __init__(self):
# Handle relative pathing for portability
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.LATTICE_DB_DIR = os.path.join(BASE_DIR, "Lattice_DB")
self.HIT_LOG_PATH = os.path.join(self.LATTICE_DB_DIR, "pattern_hits.json")
index_path = os.path.join(self.LATTICE_DB_DIR, "dual_anchor_index.json")
if os.path.exists(index_path):
with open(index_path, 'r') as f:
data = json.load(f)
self.patterns = data.get('patterns', {})
sources = data.get('sources', {})
print(f"[INDEX] Loaded {len(self.patterns)} dual-anchor patterns")
else:
# Fallback to original patterns
self.patterns = {}
self.load_lattice_patterns()
self.load_conversation_patterns()
print("[INDEX] Dual-anchor index not found, using original 16 patterns")
# Load hit tracking (magnitude-weighted)
self.hits = self._load_hits()
# Calculate adaptive threshold based on pattern count
self.base_threshold = 0.3 + (0.4 * min(len(self.patterns) / 200, 1.0))
# Initialize Semantic Engine
print("[INDEX] Initializing Semantic Manifold...")
self.embedder = SemanticEmbedder()
self.pattern_vectors = {}
self._reindex_vectors()
confirmed = sum(1 for h in self.hits.values() if self._total_magnitude(h) >= 2.0)
unconfirmed = sum(1 for h in self.hits.values() if 0 < self._total_magnitude(h) < 1.0)
print(f"[DISTILLER] Confirmed: {confirmed} | Unconfirmed: {unconfirmed} | Threshold: {self.base_threshold:.2f}")
self.word_freq = self._calculate_word_freq()
def _reindex_vectors(self):
"""Pre-calculates semantic embeddings for all known patterns."""
print(f"[INDEX]: Generating embeddings for {len(self.patterns)} patterns...")
for label, p in self.patterns.items():
# Combine problem + solution for semantic context
context = f"{p.get('problem', '')} {p.get('solution', '')} {label}"
self.pattern_vectors[label] = self.embedder.embed_text(context)
print(f"[INDEX]: ✅ Semantic manifold mapped ({len(self.pattern_vectors)} vectors).")
def _calculate_word_freq(self):
"""Calculate inverse pattern frequency (IPF) for lean semantic weighting."""
freq = {}
for p in self.patterns.values():
text = (p.get('problem','') + " " + p.get('solution','')).lower()
words = set(re.findall(r'\w+', text))
for w in words:
freq[w] = freq.get(w, 0) + 1
return freq
def _get_word_weight(self, word, structural_weight):
"""Calculate semantic weight: rare words matter more."""
count = self.word_freq.get(word, 0)
if count == 0: return structural_weight
# Logarithmic scale for IPF: weight = 1 + log(total / count)
import math
ipf = 1.0 + math.log(len(self.patterns) / count)
return structural_weight * ipf
def _fuzzy_match(self, w1, w2):
"""Lightweight Jaccard similarity for fuzzy matching."""
if w1 == w2: return 1.0
if len(w1) < 4 or len(w2) < 4: return 0.0
s1, s2 = set(w1), set(w2)
intersection = len(s1 & s2)
union = len(s1 | s2)
score = intersection / union
return score if score > 0.7 else 0.0
def _load_hits(self):
"""Load magnitude-weighted hit data from disk."""
if os.path.exists(self.HIT_LOG_PATH):
with open(self.HIT_LOG_PATH, 'r') as f:
data = json.load(f)
# Backward compat: convert flat counts to magnitude format
for label, val in data.items():
if isinstance(val, (int, float)):
data[label] = {"count": int(val), "magnitude": float(val) * 0.5, "layers": []}
return data
return {}
def _save_hits(self):
"""Persist hit data to disk."""
with open(self.HIT_LOG_PATH, 'w') as f:
json.dump(self.hits, f, indent=2)
def _total_magnitude(self, hit_data):
"""Get current magnitude with decay applied."""
if isinstance(hit_data, dict):
raw_mag = hit_data.get('magnitude', 0)
last_hit = hit_data.get('last_hit', 0)
if last_hit > 0 and raw_mag > 0:
elapsed = time.time() - last_hit
# Halve every DECAY_HALF_LIFE seconds
decay_factor = 0.5 ** (elapsed / self.DECAY_HALF_LIFE)
return raw_mag * decay_factor
return raw_mag
return float(hit_data) * 0.5 # backward compat
def _classify_relevance(self, relevance):
"""Classify match into magnitude layer based on relevance score."""
if relevance >= 0.7:
return "conceptual", self.MAGNITUDE_LAYERS["conceptual"]
elif relevance >= 0.4:
return "structural", self.MAGNITUDE_LAYERS["structural"]
else:
return "surface", self.MAGNITUDE_LAYERS["surface"]
def _record_hit(self, label, relevance):
"""Record a hit. Re-mention restores magnitude to peak."""
layer_name, magnitude = self._classify_relevance(relevance)
if label not in self.hits:
self.hits[label] = {"count": 0, "magnitude": 0.0, "peak": 0.0, "layers": [], "last_hit": 0}
h = self.hits[label]
h["count"] += 1
h["last_hit"] = time.time()
# Restore to peak first (re-mention recovery), then add new magnitude
current_peak = h.get("peak", h["magnitude"])
h["magnitude"] = current_peak + magnitude
h["peak"] = h["magnitude"] # new peak
# Track which layers have been hit
if layer_name not in h["layers"]:
h["layers"].append(layer_name)
def get_status(self, label):
"""Get distillation status based on decayed magnitude."""
hit_data = self.hits.get(label, {})
mag = self._total_magnitude(hit_data) # applies decay
layers = hit_data.get('layers', []) if isinstance(hit_data, dict) else []
if mag == 0:
return "NEW"
elif mag < 1.0:
return "UNCONFIRMED" # surface-only = might be noise
elif mag < 2.0:
return "PLAUSIBLE"
elif len(layers) >= 2:
return "DEEP_LOGIC" # hit at multiple layers = real
else:
return "CONFIRMED" # high magnitude single layer
def add_note(self, text, domain="NOTE", forced_label=None):
"""Add a new pattern from freeform text. Self-organizing entry point."""
if forced_label:
label = forced_label
else:
# Auto-generate label from text
words = re.sub(r'[^a-zA-Z0-9\s]', '', text).upper().split()
# Take first 4 meaningful words for label
label_words = [w for w in words if len(w) > 2][:4]
label = "_".join(label_words) if label_words else "NOTE_" + str(int(time.time()))
# Don't overwrite existing patterns unless forced
if label in self.patterns and not forced_label:
label = label + "_" + str(int(time.time()) % 10000)
self.patterns[label] = {
"problem": text,
"solution": text,
"domain": domain,
"confidence": 0.5, # starts neutral
"source": "notepad",
"type": "NOTE",
"created": time.time(),
}
# Initial hit at conceptual layer (you wrote it = you meant it)
self._record_hit(label, 1.0)
self._save_hits()
# Update threshold for new pattern count
self.base_threshold = 0.3 + (0.4 * min(len(self.patterns) / 200, 1.0))
return label
def load_lattice_patterns(self):
"""Load existing 5 patterns from lattice."""
for label, data in LATTICE_PATTERNS.items():
self.patterns[label] = {
**data,
"source": "lattice",
"type": "CODE_PATTERN"
}
def load_conversation_patterns(self):
"""Load 11 patterns from this conversation."""
for label, data in CONVERSATION_PATTERNS.items():
self.patterns[label] = {
**data,
"source": "conversation_0938ac6c",
"type": "INSIGHT"
}
def search(self, query, threshold=None, record=True):
"""
Adaptive distillation search.
- Matches patterns using phrase + word relevance
- Integrates 384-dim semantic similarity from manifolds
- Records hits for matched patterns
"""
if threshold is None:
threshold = self.base_threshold
results = []
query_lower = query.lower()
# 1. Generate Query Vector
query_vector = self.embedder.embed_text(query)
# 2. Hard matching patterns
STRUCTURAL_WORDS = { 'a', 'an', 'the', 'is', 'it', 'in', 'on', 'at', 'to', 'of', 'and', 'or', 'but' }
query_words = [(w, self._get_word_weight(w, 0.3 if w in STRUCTURAL_WORDS else 1.0)) for w in query_lower.split()]
links = re.findall(r'\[\[(\w+)\]\]', query_lower)
for label, pattern in self.patterns.items():
problem = pattern.get('problem', '').lower()
solution = pattern.get('solution', '').lower()
label_text = label.lower()
relevance = 0
# Semantic Boost (Manifold Pathfinding)
pattern_vector = self.pattern_vectors.get(label)
semantic_score = 0 # Initialize semantic_score
if pattern_vector:
semantic_score = self.embedder.cosine_similarity(query_vector, pattern_vector)
# Apply high weight to semantic resonance (The "LOVE" Anchor)
relevance += (semantic_score * 0.8)
# Exact phrase match (The 0x52 Anchor)
if query_lower in problem: relevance += 0.4
if query_lower in solution: relevance += 0.3
if query_lower in label_text: relevance += 0.5
# Link boost
if label.lower() in links: relevance += 2.0
# Combine logic
if relevance >= threshold:
status = self.get_status(label)
# Record magnitude-weighted hit
if record:
self._record_hit(label, relevance)
hit_data = self.hits.get(label, {})
results.append({
"label": label,
"relevance": relevance,
"confidence": pattern.get('confidence', 0.5),
"status": status,
"hits": hit_data.get('count', 0) if isinstance(hit_data, dict) else 0,
"magnitude": self._total_magnitude(hit_data),
"layers": hit_data.get('layers', []) if isinstance(hit_data, dict) else [],
**pattern
})
# Sort by: confirmed first, then relevance, then confidence
status_order = {"DEEP_LOGIC": 4, "CONFIRMED": 3, "PLAUSIBLE": 2, "UNCONFIRMED": 1, "NEW": 0}
results.sort(key=lambda x: (
status_order.get(x.get('status', 'NEW'), 0),
x['relevance'],
x['confidence']
), reverse=True)
# Save hits after search
if record:
self._save_hits()
return results
def distillation_report(self):
"""Report on pattern distillation with magnitude layers."""
deep_logic = []
confirmed = []
plausible = []
unconfirmed = []
new_patterns = []
for label in self.patterns:
status = self.get_status(label)
hit_data = self.hits.get(label, {})
mag = self._total_magnitude(hit_data)
layers = hit_data.get('layers', []) if isinstance(hit_data, dict) else []
entry = (label, mag, layers)
if status == "DEEP_LOGIC":
deep_logic.append(entry)
elif status == "CONFIRMED":
confirmed.append(entry)
elif status == "PLAUSIBLE":
plausible.append(entry)
elif status == "UNCONFIRMED":
unconfirmed.append(entry)
else:
new_patterns.append(entry)
print(f"\n{'='*60}")
print(f"DISTILLATION REPORT (Magnitude Layers)")
print(f"{'='*60}")
print(f"Total patterns: {len(self.patterns)}")
print(f" DEEP_LOGIC (multi-layer): {len(deep_logic)} = verified across layers")
print(f" CONFIRMED (mag >= 2.0): {len(confirmed)} = strong signal")
print(f" PLAUSIBLE (mag 1.0-2.0): {len(plausible)} = growing")
print(f" UNCONFIRMED (mag < 1.0): {len(unconfirmed)} = potential noise")
print(f" NEW (untested): {len(new_patterns)}")
print(f"\nAdaptive threshold: {self.base_threshold:.2f}")
if deep_logic:
print(f"\nDEEP LOGIC (multi-layer verified):")
for label, mag, layers in sorted(deep_logic, key=lambda x: x[1], reverse=True):
print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}")
if confirmed:
print(f"\nCONFIRMED (strong signal):")
for label, mag, layers in sorted(confirmed, key=lambda x: x[1], reverse=True):
print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}")
if unconfirmed:
print(f"\nUNCONFIRMED (potential noise):")
for label, mag, layers in unconfirmed:
print(f" [mag:{mag:.1f}] [{'+'.join(layers)}] {label}")
return {
"confirmed": len(confirmed),
"plausible": len(plausible),
"unconfirmed": len(unconfirmed),
"new": len(new_patterns),
"threshold": self.base_threshold
}
def save_to_json(self, path):
"""Persist to JSON for inspection."""
with open(path, 'w') as f:
json.dump({
"total_patterns": len(self.patterns),
"sources": {
"lattice": len(LATTICE_PATTERNS),
"conversation": len(CONVERSATION_PATTERNS)
},
"patterns": self.patterns
}, f, indent=2)
print(f"\n💾 Saved index to: {path}")
def stats(self):
"""Print statistics."""
print(f"\n{'='*60}")
print(f"IN-MEMORY PATTERN INDEX")
print(f"{'='*60}")
print(f"Total patterns: {len(self.patterns)}")
print(f" From lattice: {len(LATTICE_PATTERNS)}")
print(f" From conversation: {len(CONVERSATION_PATTERNS)}")
print(f"Average confidence: {sum(p.get('confidence', 0.5) for p in self.patterns.values()) / len(self.patterns):.0%}")
# Domain breakdown
domains = {}
for p in self.patterns.values():
d = p.get('domain', 'UNKNOWN')
domains[d] = domains.get(d, 0) + 1
print(f"\nDomains:")
for domain, count in sorted(domains.items(), key=lambda x: x[1], reverse=True):
print(f" {domain}: {count}")
if __name__ == "__main__":
index = InMemoryIndex()
index.stats()
# Save to JSON
save_path = os.path.join(index.LATTICE_DB_DIR, "in_memory_index.json")
index.save_to_json(save_path)
# Test search
print(f"\n{'='*60}")
print(f"TEST SEARCHES")
print(f"{'='*60}\n")
for query in ["singleton", "react", "lattice", "honest"]:
results = index.search(query)
print(f"Query: '{query}' → {len(results)} results")
if results:
print(f" Top: {results[0]['label']} ({results[0]['confidence']:.0%})")
print()