mnemo-memory / mnemo.py
AthelaPerk's picture
v4.1: Add gentle memory decay (1%/day) and pruning (30 days stale)
8633a28 verified
#!/usr/bin/env python3
"""
Mnemo v4: SLM-Inspired Architecture
====================================
Implements key SLM architecture features with parameter adjustments
based on Mnemo benchmark findings.
SLM Features Implemented:
1. Three-Tiered Memory (Working β†’ Token β†’ Semantic)
2. Promotion/Demotion Algorithms
3. Neural Link Types (8 types with decay)
4. Self-Tuning Parameters
5. Memory Utility Predictor (NEW - from benchmarks)
Key Parameter Adjustments (from benchmarks):
- Semantic threshold: 0.65 β†’ 0.50 (SLM was too high)
- Quality acceptance: 0.30 β†’ 0.50 (SLM too permissive)
- Promotion threshold: 0.65 β†’ 0.55 (faster promotion)
- Link pruning: 60 days β†’ 30 days (faster cleanup)
"""
import hashlib
import time
import re
import threading
import numpy as np
from typing import Dict, List, Optional, Tuple, Any, Set
from dataclasses import dataclass, field
from collections import defaultdict
from enum import Enum
import json
# Optional imports
try:
import faiss
HAS_FAISS = True
except ImportError:
HAS_FAISS = False
try:
import networkx as nx
HAS_NETWORKX = True
except ImportError:
HAS_NETWORKX = False
try:
from rank_bm25 import BM25Okapi
HAS_BM25 = True
except ImportError:
HAS_BM25 = False
# =============================================================================
# ENUMS AND CONSTANTS (from SLM spec)
# =============================================================================
class MemoryTier(Enum):
"""Three-tiered memory hierarchy from SLM"""
WORKING = "working" # 32MB, <1ms, current context
TOKEN = "token" # 100-250 items, 1-10ms, compressed
SEMANTIC = "semantic" # Persistent, 10-100ms, full knowledge
class LinkType(Enum):
"""Eight link types from SLM Neural Link system"""
DIRECT_REFERENCE = "direct_reference" # Explicit reference
SEMANTIC_SIMILARITY = "semantic_similarity" # Vector similarity
CO_OCCURRENCE = "co_occurrence" # Appear together
HIERARCHICAL = "hierarchical" # Parent-child
TEMPORAL = "temporal" # Time-based
CAUSAL = "causal" # Cause-effect
CROSS_DOMAIN = "cross_domain" # Different domains
ASSOCIATIVE = "associative" # General association
# SLM Link Type Properties (adjusted based on benchmarks)
LINK_PROPERTIES = {
LinkType.DIRECT_REFERENCE: {
"creation_threshold": 0.85, # SLM: 0.90
"initial_strength": 0.90,
"decay_rate": 0.005, # per day
"usage_boost": 0.05
},
LinkType.SEMANTIC_SIMILARITY: {
"creation_threshold": 0.50, # SLM: 0.65, ADJUSTED from benchmarks
"initial_strength": 0.75,
"decay_rate": 0.01,
"usage_boost": 0.03
},
LinkType.CO_OCCURRENCE: {
"creation_threshold": 0.60,
"initial_strength": 0.70,
"decay_rate": 0.015,
"usage_boost": 0.04
},
LinkType.HIERARCHICAL: {
"creation_threshold": 0.80, # SLM: 0.85
"initial_strength": 0.85,
"decay_rate": 0.003,
"usage_boost": 0.02
},
LinkType.TEMPORAL: {
"creation_threshold": 0.55,
"initial_strength": 0.65,
"decay_rate": 0.02,
"usage_boost": 0.05
},
LinkType.CAUSAL: {
"creation_threshold": 0.75,
"initial_strength": 0.80,
"decay_rate": 0.005,
"usage_boost": 0.03
},
LinkType.CROSS_DOMAIN: {
"creation_threshold": 0.70, # SLM: 0.80
"initial_strength": 0.65, # SLM: 0.70
"decay_rate": 0.008,
"usage_boost": 0.04
},
LinkType.ASSOCIATIVE: {
"creation_threshold": 0.45, # Permissive for exploration
"initial_strength": 0.60,
"decay_rate": 0.025,
"usage_boost": 0.06
}
}
# =============================================================================
# DATA CLASSES
# =============================================================================
@dataclass
class Memory:
"""Memory unit with SLM-style metadata"""
id: str
content: str
embedding: np.ndarray
tier: MemoryTier = MemoryTier.SEMANTIC
namespace: str = "default"
# Quality and relevance (SLM quality gates)
quality_score: float = 0.5
relevance_score: float = 0.5
confidence: float = 0.5
# Access tracking (for promotion/demotion)
access_count: int = 0
last_accessed: float = field(default_factory=time.time)
created_at: float = field(default_factory=time.time)
# SLM priority decay
priority: float = 1.0
metadata: Dict = field(default_factory=dict)
@dataclass
class NeuralLink:
"""SLM Neural Link between memories"""
source_id: str
target_id: str
link_type: LinkType
strength: float
created_at: float = field(default_factory=time.time)
last_traversed: float = field(default_factory=time.time)
traversal_count: int = 0
@dataclass
class SearchResult:
"""Search result with multi-strategy scores"""
id: str
content: str
score: float
tier: MemoryTier = MemoryTier.SEMANTIC
link_path: List[str] = field(default_factory=list)
strategy_scores: Dict[str, float] = field(default_factory=dict)
metadata: Dict = field(default_factory=dict)
# =============================================================================
# MEMORY UTILITY PREDICTOR (NEW - from Mnemo benchmarks)
# =============================================================================
class MemoryUtilityPredictor:
"""
Predicts whether memory injection will help or hurt.
Key finding from benchmarks:
- Within-conversation: Memory often HURTS (-3 to -12 pts)
- Cross-session: Memory HELPS (+2 pts on dependent questions)
"""
# Signals that indicate memory should be used
INJECTION_SIGNALS = [
"previous", "earlier", "before", "you said", "you mentioned",
"as you", "based on", "using your", "your analysis", "your framework",
"we discussed", "we analyzed", "refer to", "from your",
"compare", "contrast", "synthesize", "combine", "integrate",
"apply your", "using your", "based on your",
"you previously", "your earlier", "you have analyzed"
]
# Signals that indicate memory should NOT be used
SKIP_SIGNALS = [
"this is a new", "new topic", "different subject",
"what is", "define", "explain what"
]
def __init__(self):
self.stats = {
"predictions": 0,
"inject_recommended": 0,
"skip_recommended": 0,
"skip_context_window": 0
}
def should_inject(self,
query: str,
context: str = "",
conversation_history: str = "",
model_confidence: float = 0.5) -> Tuple[bool, str, float]:
"""
Predict if memory injection will help.
Returns:
(should_inject, reason, confidence)
"""
self.stats["predictions"] += 1
combined = (query + " " + context).lower()
# Check skip signals first
for signal in self.SKIP_SIGNALS:
if signal in combined:
self.stats["skip_recommended"] += 1
return False, f"skip_signal:{signal}", 0.8
# Check injection signals
for signal in self.INJECTION_SIGNALS:
if signal in combined:
# But check if context window already has info
if self._context_has_info(query, conversation_history):
self.stats["skip_context_window"] += 1
return False, "context_window_sufficient", 0.7
self.stats["inject_recommended"] += 1
return True, f"inject_signal:{signal}", 0.85
# No clear signal - default to skip for simple queries
if self._is_simple_query(query):
self.stats["skip_recommended"] += 1
return False, "simple_query", 0.6
# Model is very confident - skip memory
if model_confidence > 0.85:
self.stats["skip_recommended"] += 1
return False, "model_confident", 0.7
# Default: don't inject (memory often hurts)
self.stats["skip_recommended"] += 1
return False, "no_signal", 0.5
def _context_has_info(self, query: str, history: str) -> bool:
"""Check if conversation history already has needed context"""
if not history or len(history.split()) < 200:
return False
query_keywords = set(query.lower().split()) - {
"the", "a", "is", "are", "to", "of", "in", "for", "what", "how"
}
history_lower = history.lower()
overlap = sum(1 for kw in query_keywords if kw in history_lower)
return overlap >= len(query_keywords) * 0.6
def _is_simple_query(self, query: str) -> bool:
"""Detect simple factual queries that don't need memory"""
simple_patterns = [
r"^what is\b", r"^who is\b", r"^when did\b",
r"^where is\b", r"^how many\b", r"^define\b"
]
query_lower = query.lower()
return any(re.search(p, query_lower) for p in simple_patterns)
# =============================================================================
# SELF-TUNING SYSTEM (from SLM)
# =============================================================================
class SelfTuner:
"""
SLM Self-Tuning Parameter System
Tracks performance and auto-adjusts parameters.
"""
def __init__(self):
self.parameters = {
"similarity_threshold": 0.10, # ADJUSTED from SLM 0.65
"quality_threshold": 0.35, # ADJUSTED from SLM 0.30
"promotion_threshold": 0.55, # ADJUSTED from SLM 0.65
"demotion_threshold": 0.70, # ADJUSTED from SLM 0.75
}
self.performance_history = defaultdict(list)
self.adjustment_count = 0
# SLM learning rates
self.learning_rates = {
"similarity_threshold": 0.01,
"quality_threshold": 0.02,
"promotion_threshold": 0.05,
}
def record_outcome(self, param_name: str, value: float, success: bool):
"""Record outcome for a parameter setting"""
self.performance_history[param_name].append({
"value": value,
"success": success,
"timestamp": time.time()
})
# Keep last 100 outcomes
if len(self.performance_history[param_name]) > 100:
self.performance_history[param_name] = \
self.performance_history[param_name][-100:]
def should_adjust(self, param_name: str) -> bool:
"""Check if parameter should be adjusted (every 10 samples)"""
history = self.performance_history.get(param_name, [])
return len(history) >= 10 and len(history) % 10 == 0
def get_adjustment(self, param_name: str) -> float:
"""Calculate parameter adjustment based on recent performance"""
history = self.performance_history.get(param_name, [])
if len(history) < 10:
return 0.0
recent = history[-10:]
success_rate = sum(1 for h in recent if h["success"]) / len(recent)
lr = self.learning_rates.get(param_name, 0.01)
if success_rate < 0.5:
# Performance poor - try lower threshold
return -lr
elif success_rate > 0.8:
# Performance good - can be more selective
return lr * 0.5
return 0.0
def auto_tune(self):
"""Run auto-tuning cycle"""
adjusted = []
for param_name in self.parameters:
if self.should_adjust(param_name):
adjustment = self.get_adjustment(param_name)
if adjustment != 0:
old_val = self.parameters[param_name]
new_val = max(0.1, min(0.9, old_val + adjustment))
self.parameters[param_name] = new_val
adjusted.append((param_name, old_val, new_val))
self.adjustment_count += 1
return adjusted
# =============================================================================
# THREE-TIERED MEMORY MANAGER (from SLM)
# =============================================================================
class TieredMemoryManager:
"""
SLM Three-Tiered Memory Hierarchy
Working Memory (32MB, <1ms):
- Currently active info
- Priority decay: 0.95/minute
- Eviction threshold: 0.2
Token Memory (100-250 items, 1-10ms):
- Compressed representations
- Loop-based organization
- Merging at 0.8 similarity
Semantic Memory (persistent, 10-100ms):
- Full knowledge representations
- Partition-based organization
"""
# SLM spec values (some adjusted based on benchmarks)
WORKING_MEMORY_SIZE = 50 # items (simplified from 32MB)
TOKEN_LOOP_CAPACITY = 100 # default
TOKEN_LOOP_MAX = 250 # expandable
PRIORITY_DECAY = 0.95 # per access cycle
EVICTION_THRESHOLD = 0.2
LOOP_MERGE_THRESHOLD = 0.8
# Memory decay settings (gentle)
MEMORY_DECAY_RATE = 0.01 # 1% quality decay per day for unused memories
MEMORY_PRUNE_THRESHOLD = 0.15 # Prune memories below this quality
MEMORY_STALE_DAYS = 30 # Consider memory stale after this many days unused
def __init__(self, tuner: SelfTuner):
self.tuner = tuner
# Three tiers
self.working_memory: Dict[str, Memory] = {}
self.token_loops: Dict[str, List[str]] = defaultdict(list) # namespace -> ids
self.semantic_memory: Dict[str, Memory] = {}
self.stats = {
"promotions": 0,
"demotions": 0,
"evictions": 0,
"memories_decayed": 0,
"memories_pruned": 0
}
def add_to_tier(self, memory: Memory, tier: MemoryTier):
"""Add memory to specific tier"""
memory.tier = tier
if tier == MemoryTier.WORKING:
self._add_to_working(memory)
elif tier == MemoryTier.TOKEN:
self._add_to_token(memory)
else:
self.semantic_memory[memory.id] = memory
def _add_to_working(self, memory: Memory):
"""Add to working memory with eviction if needed"""
if len(self.working_memory) >= self.WORKING_MEMORY_SIZE:
self._evict_from_working()
memory.priority = 1.0
self.working_memory[memory.id] = memory
def _add_to_token(self, memory: Memory):
"""Add to token memory loop"""
loop = self.token_loops[memory.namespace]
if len(loop) >= self.TOKEN_LOOP_CAPACITY:
# Demote oldest to semantic
oldest_id = loop.pop(0)
if oldest_id in self.semantic_memory:
self.semantic_memory[oldest_id].tier = MemoryTier.SEMANTIC
loop.append(memory.id)
self.semantic_memory[memory.id] = memory # Store actual data in semantic
memory.tier = MemoryTier.TOKEN
def _evict_from_working(self):
"""Evict lowest priority items from working memory"""
if not self.working_memory:
return
# Find lowest priority
min_id = min(self.working_memory, key=lambda k: self.working_memory[k].priority)
evicted = self.working_memory.pop(min_id)
# Demote to token memory
self._add_to_token(evicted)
self.stats["evictions"] += 1
def decay_priorities(self):
"""Apply SLM priority decay (0.95 per cycle)"""
for memory in self.working_memory.values():
memory.priority *= self.PRIORITY_DECAY
# Evict if below threshold
if memory.priority < self.EVICTION_THRESHOLD:
self._evict_from_working()
def calculate_promotion_score(self, memory: Memory, query_relevance: float) -> float:
"""
SLM Promotion Score:
PromotionScore = (QueryRelevance * 0.6) + (AccessFrequency * 0.3) + (RecencyScore * 0.1)
"""
# Normalize access frequency (0-1)
access_freq = min(memory.access_count / 10, 1.0)
# Recency score (higher = more recent)
age_hours = (time.time() - memory.last_accessed) / 3600
recency = max(0, 1 - (age_hours / 24)) # Decay over 24 hours
return (query_relevance * 0.6) + (access_freq * 0.3) + (recency * 0.1)
def calculate_demotion_score(self, memory: Memory, query_relevance: float) -> float:
"""
SLM Demotion Score:
DemotionScore = (1-QueryRelevance)*0.5 + (1-AccessFrequency)*0.3 + (Age/MAX_AGE)*0.2
"""
access_freq = min(memory.access_count / 10, 1.0)
age_hours = (time.time() - memory.created_at) / 3600
age_score = min(age_hours / 168, 1.0) # MAX_AGE = 1 week
return ((1 - query_relevance) * 0.5) + ((1 - access_freq) * 0.3) + (age_score * 0.2)
def try_promote(self, memory_id: str, query_relevance: float) -> bool:
"""Try to promote memory to higher tier"""
if memory_id not in self.semantic_memory:
return False
memory = self.semantic_memory[memory_id]
score = self.calculate_promotion_score(memory, query_relevance)
threshold = self.tuner.parameters["promotion_threshold"]
if score > threshold:
if memory.tier == MemoryTier.SEMANTIC:
self._add_to_token(memory)
self.stats["promotions"] += 1
return True
elif memory.tier == MemoryTier.TOKEN:
self._add_to_working(memory)
self.stats["promotions"] += 1
return True
return False
def try_demote(self, memory_id: str, query_relevance: float) -> bool:
"""Try to demote memory to lower tier"""
if memory_id in self.working_memory:
memory = self.working_memory[memory_id]
score = self.calculate_demotion_score(memory, query_relevance)
threshold = self.tuner.parameters["demotion_threshold"]
# Also check capacity (SLM: demote if >80% capacity)
capacity_pressure = len(self.working_memory) / self.WORKING_MEMORY_SIZE
if score > threshold and capacity_pressure > 0.8:
self.working_memory.pop(memory_id)
self._add_to_token(memory)
self.stats["demotions"] += 1
return True
return False
def get_all_memories(self) -> Dict[str, Memory]:
"""Get all memories across tiers"""
return {**self.semantic_memory, **self.working_memory}
def decay_memories(self) -> int:
"""
Apply gentle quality decay to unused semantic memories.
Memories that are accessed stay fresh; unused ones gradually decay.
Returns number of memories affected.
"""
now = time.time()
affected = 0
for memory in self.semantic_memory.values():
# Calculate days since last access
days_unused = (now - memory.last_accessed) / 86400 # seconds per day
if days_unused > 1: # Only decay if unused for >1 day
# Gentle decay: quality *= (1 - decay_rate * days_unused)
# Capped to prevent instant destruction
decay_factor = min(days_unused * self.MEMORY_DECAY_RATE, 0.1)
memory.quality_score *= (1 - decay_factor)
affected += 1
return affected
def prune_stale_memories(self) -> Tuple[int, List[str]]:
"""
Remove memories that have decayed below threshold.
Returns (count_pruned, list_of_pruned_ids).
"""
now = time.time()
to_prune = []
for mem_id, memory in self.semantic_memory.items():
days_unused = (now - memory.last_accessed) / 86400
# Prune if: quality too low AND unused for too long
if (memory.quality_score < self.MEMORY_PRUNE_THRESHOLD and
days_unused > self.MEMORY_STALE_DAYS):
to_prune.append(mem_id)
# Remove pruned memories
pruned_ids = []
for mem_id in to_prune:
del self.semantic_memory[mem_id]
pruned_ids.append(mem_id)
return len(pruned_ids), pruned_ids
def refresh_memory(self, memory_id: str):
"""Mark a memory as freshly accessed (resets decay)"""
if memory_id in self.semantic_memory:
self.semantic_memory[memory_id].last_accessed = time.time()
elif memory_id in self.working_memory:
self.working_memory[memory_id].last_accessed = time.time()
def get_tier_stats(self) -> Dict:
"""Get tier statistics"""
return {
"working_memory_count": len(self.working_memory),
"working_memory_capacity": self.WORKING_MEMORY_SIZE,
"token_loops": {ns: len(ids) for ns, ids in self.token_loops.items()},
"semantic_memory_count": len(self.semantic_memory),
"promotions": self.stats["promotions"],
"demotions": self.stats["demotions"],
"evictions": self.stats["evictions"]
}
# =============================================================================
# NEURAL LINK MANAGER (from SLM)
# =============================================================================
class NeuralLinkManager:
"""
SLM Neural Link Pathway System
Creates and manages typed connections between memories.
"""
# SLM path finding limits (adjusted based on benchmarks)
MAX_PATH_DEPTH = 4 # SLM: 4 standard, 6 exhaustive
MIN_PATH_STRENGTH = 0.40 # SLM: 0.45
PATH_STRENGTH_DECAY = 0.9 # SLM: 0.9 per hop
MAX_BRANCHING = 12 # SLM: 12
# Pruning (adjusted based on benchmarks)
PRUNE_STRENGTH_THRESHOLD = 0.25 # SLM: 0.30
PRUNE_AGE_DAYS = 30 # SLM: 60, ADJUSTED
def __init__(self):
self.links: Dict[str, NeuralLink] = {} # link_id -> NeuralLink
self.outgoing: Dict[str, Set[str]] = defaultdict(set) # source -> link_ids
self.incoming: Dict[str, Set[str]] = defaultdict(set) # target -> link_ids
self.stats = {
"links_created": 0,
"links_pruned": 0,
"traversals": 0
}
def _link_id(self, source: str, target: str, link_type: LinkType) -> str:
"""Generate link ID"""
return f"{source}:{target}:{link_type.value}"
def create_link(self, source_id: str, target_id: str,
link_type: LinkType, similarity: float) -> Optional[str]:
"""
Create link if similarity exceeds type-specific threshold.
SLM LinkScore = (VectorSimilarity * 0.6) + (CoOccurrence * 0.25) + (DomainRelatedness * 0.15)
Simplified here to just similarity.
"""
props = LINK_PROPERTIES[link_type]
if similarity < props["creation_threshold"]:
return None
link_id = self._link_id(source_id, target_id, link_type)
if link_id in self.links:
# Strengthen existing link
self.links[link_id].strength = min(
1.0,
self.links[link_id].strength + props["usage_boost"]
)
return link_id
# Create new link
link = NeuralLink(
source_id=source_id,
target_id=target_id,
link_type=link_type,
strength=props["initial_strength"]
)
self.links[link_id] = link
self.outgoing[source_id].add(link_id)
self.incoming[target_id].add(link_id)
self.stats["links_created"] += 1
return link_id
def traverse_link(self, link_id: str) -> Optional[NeuralLink]:
"""Traverse a link, strengthening it"""
if link_id not in self.links:
return None
link = self.links[link_id]
link.traversal_count += 1
link.last_traversed = time.time()
# Strengthen on traversal (up to daily max)
props = LINK_PROPERTIES[link.link_type]
link.strength = min(1.0, link.strength + props["usage_boost"])
self.stats["traversals"] += 1
return link
def find_paths(self, source_id: str, target_id: str,
max_depth: int = None) -> List[List[str]]:
"""Find paths between memories (SLM path finding)"""
max_depth = max_depth or self.MAX_PATH_DEPTH
paths = []
def dfs(current: str, target: str, path: List[str],
strength: float, depth: int):
if depth > max_depth or strength < self.MIN_PATH_STRENGTH:
return
if current == target:
paths.append(path.copy())
return
# Limit branching
link_ids = list(self.outgoing.get(current, set()))[:self.MAX_BRANCHING]
for link_id in link_ids:
link = self.links.get(link_id)
if link and link.target_id not in path:
new_strength = strength * link.strength * self.PATH_STRENGTH_DECAY
path.append(link.target_id)
dfs(link.target_id, target, path, new_strength, depth + 1)
path.pop()
dfs(source_id, target_id, [source_id], 1.0, 0)
return paths
def get_connected(self, memory_id: str, link_types: List[LinkType] = None) -> List[str]:
"""Get memories connected to this one"""
connected = []
for link_id in self.outgoing.get(memory_id, set()):
link = self.links.get(link_id)
if link:
if link_types is None or link.link_type in link_types:
connected.append(link.target_id)
return connected
def decay_links(self):
"""Apply daily decay to all links"""
for link in self.links.values():
props = LINK_PROPERTIES[link.link_type]
link.strength *= (1 - props["decay_rate"])
def prune_weak_links(self) -> int:
"""Prune links below strength threshold and unused for too long"""
to_prune = []
now = time.time()
age_threshold = self.PRUNE_AGE_DAYS * 24 * 3600
for link_id, link in self.links.items():
age = now - link.last_traversed
if link.strength < self.PRUNE_STRENGTH_THRESHOLD and age > age_threshold:
to_prune.append(link_id)
for link_id in to_prune:
link = self.links.pop(link_id)
self.outgoing[link.source_id].discard(link_id)
self.incoming[link.target_id].discard(link_id)
self.stats["links_pruned"] += 1
return len(to_prune)
def remove_links_for_memory(self, memory_id: str) -> int:
"""Remove all links connected to a memory (when memory is pruned)"""
to_remove = []
# Find all links involving this memory
for link_id, link in self.links.items():
if link.source_id == memory_id or link.target_id == memory_id:
to_remove.append(link_id)
# Remove them
for link_id in to_remove:
link = self.links.pop(link_id)
self.outgoing[link.source_id].discard(link_id)
self.incoming[link.target_id].discard(link_id)
self.stats["links_pruned"] += 1
# Clean up empty entries
if memory_id in self.outgoing:
del self.outgoing[memory_id]
if memory_id in self.incoming:
del self.incoming[memory_id]
return len(to_remove)
def get_stats(self) -> Dict:
return {
"total_links": len(self.links),
"links_by_type": {
lt.value: sum(1 for l in self.links.values() if l.link_type == lt)
for lt in LinkType
},
**self.stats
}
# =============================================================================
# MAIN MNEMO v4 CLASS
# =============================================================================
class Mnemo:
"""
Mnemo v4: SLM-Inspired Memory System
Implements:
- Three-tiered memory hierarchy
- Neural link pathways (8 types)
- Self-tuning parameters
- Memory utility prediction
With parameter adjustments based on Mnemo benchmarks.
"""
STOP_WORDS = {"a", "an", "the", "is", "are", "was", "were", "be", "been",
"to", "of", "in", "for", "on", "with", "at", "by", "from",
"and", "but", "or", "not", "this", "that", "i", "me", "my"}
def __init__(self, embedding_dim: int = 384):
self.embedding_dim = embedding_dim
# Core components
self.tuner = SelfTuner()
self.memory_manager = TieredMemoryManager(self.tuner)
self.link_manager = NeuralLinkManager()
self.utility_predictor = MemoryUtilityPredictor()
# Vector index
self._embeddings: List[np.ndarray] = []
self._ids: List[str] = []
if HAS_FAISS:
self.index = faiss.IndexFlatIP(embedding_dim)
else:
self.index = None
# BM25
self.bm25 = None
self._tokenized_docs: List[List[str]] = []
# Knowledge Graph
if HAS_NETWORKX:
self.graph = nx.DiGraph()
else:
self.graph = None
# Cache
self._cache: Dict[str, Any] = {}
self._cache_lock = threading.Lock()
# Stats
self.stats = {
"adds": 0,
"adds_rejected": 0,
"searches": 0,
"cache_hits": 0,
"cache_misses": 0
}
def _get_embedding(self, text: str) -> np.ndarray:
"""Generate embedding (hash-based for POC)"""
cache_key = f"emb:{hashlib.md5(text.encode()).hexdigest()}"
with self._cache_lock:
if cache_key in self._cache:
self.stats["cache_hits"] += 1
return self._cache[cache_key]
self.stats["cache_misses"] += 1
# Hash-based embedding
embedding = np.zeros(self.embedding_dim, dtype=np.float32)
words = text.lower().split()
for i, word in enumerate(words):
idx = hash(word) % self.embedding_dim
embedding[idx] += 1.0 / (i + 1)
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
with self._cache_lock:
self._cache[cache_key] = embedding
return embedding
def _estimate_quality(self, content: str) -> float:
"""Estimate content quality (SLM quality gates)"""
score = 0.5
words = len(content.split())
if words < 5:
score -= 0.3
elif words > 20:
score += 0.1
if any(r in content.lower() for r in ["because", "therefore", "shows"]):
score += 0.2
if re.search(r'\d+', content):
score += 0.1
if any(v in content.lower() for v in ["something", "stuff", "maybe"]):
score -= 0.2
return max(0.0, min(1.0, score))
def should_inject(self, query: str, context: str = "",
conversation_history: str = "",
model_confidence: float = 0.5) -> bool:
"""
Memory Utility Predictor - should we inject memory?
Based on benchmark findings that memory often hurts performance.
"""
should, reason, confidence = self.utility_predictor.should_inject(
query, context, conversation_history, model_confidence
)
return should
def add(self, content: str, namespace: str = "default",
metadata: Dict = None, skip_quality_check: bool = False) -> Optional[str]:
"""Add memory with SLM quality gates"""
quality = self._estimate_quality(content)
threshold = self.tuner.parameters["quality_threshold"]
if not skip_quality_check and quality < threshold:
self.stats["adds_rejected"] += 1
self.tuner.record_outcome("quality_threshold", threshold, False)
return None
memory_id = f"mem_{hashlib.md5(content.encode()).hexdigest()[:8]}"
embedding = self._get_embedding(content)
memory = Memory(
id=memory_id,
content=content,
embedding=embedding,
namespace=namespace,
quality_score=quality,
metadata=metadata or {}
)
# Add to semantic memory (lowest tier)
self.memory_manager.add_to_tier(memory, MemoryTier.SEMANTIC)
# Update indices
self._embeddings.append(embedding)
self._ids.append(memory_id)
if HAS_FAISS and self.index is not None:
self.index.add(embedding.reshape(1, -1))
tokens = content.lower().split()
self._tokenized_docs.append(tokens)
if HAS_BM25:
self.bm25 = BM25Okapi(self._tokenized_docs)
# Create links to similar memories
self._create_links_for_new_memory(memory_id, embedding)
self.stats["adds"] += 1
self.tuner.record_outcome("quality_threshold", threshold, True)
return memory_id
def _create_links_for_new_memory(self, memory_id: str, embedding: np.ndarray):
"""Create neural links to similar memories"""
if len(self._ids) < 2:
return
# Find similar memories
similarities = []
for other_id, other_emb in zip(self._ids, self._embeddings):
if other_id != memory_id:
sim = float(np.dot(embedding, other_emb))
similarities.append((other_id, sim))
# Sort by similarity
similarities.sort(key=lambda x: x[1], reverse=True)
# Create links for top matches
for other_id, sim in similarities[:5]:
# Try different link types
self.link_manager.create_link(
memory_id, other_id, LinkType.SEMANTIC_SIMILARITY, sim
)
self.link_manager.create_link(
other_id, memory_id, LinkType.SEMANTIC_SIMILARITY, sim
)
def search(self, query: str, top_k: int = 5,
namespace: Optional[str] = None,
use_links: bool = True) -> List[SearchResult]:
"""
Search with multi-strategy retrieval + neural links
"""
if not self.memory_manager.semantic_memory:
return []
self.stats["searches"] += 1
query_embedding = self._get_embedding(query)
threshold = self.tuner.parameters["similarity_threshold"]
# Strategy 1: Vector similarity
semantic_scores = {}
if HAS_FAISS and self.index is not None and self.index.ntotal > 0:
k = min(top_k * 3, self.index.ntotal)
scores, indices = self.index.search(query_embedding.reshape(1, -1), k)
for score, idx in zip(scores[0], indices[0]):
if 0 <= idx < len(self._ids):
semantic_scores[self._ids[idx]] = float(score)
else:
for mem_id, emb in zip(self._ids, self._embeddings):
semantic_scores[mem_id] = float(np.dot(query_embedding, emb))
# Strategy 2: BM25
bm25_scores = {}
if HAS_BM25 and self.bm25 is not None:
tokens = query.lower().split()
scores = self.bm25.get_scores(tokens)
max_score = max(scores) if len(scores) > 0 and max(scores) > 0 else 1
for idx, score in enumerate(scores):
if score > 0.1 * max_score:
bm25_scores[self._ids[idx]] = float(score / max_score)
# Strategy 3: Neural link traversal
link_scores = {}
if use_links:
# Find top semantic matches and traverse their links
top_semantic = sorted(semantic_scores.items(), key=lambda x: x[1], reverse=True)[:3]
for mem_id, _ in top_semantic:
connected = self.link_manager.get_connected(mem_id)
for conn_id in connected[:5]:
link_scores[conn_id] = link_scores.get(conn_id, 0) + 0.3
# Combine scores (SLM-style weighting)
all_ids = set(semantic_scores.keys()) | set(bm25_scores.keys()) | set(link_scores.keys())
if namespace:
# Filter by namespace
all_ids = {mid for mid in all_ids
if mid in self.memory_manager.semantic_memory
and self.memory_manager.semantic_memory[mid].namespace == namespace}
results = []
for mem_id in all_ids:
strat = {
"semantic": semantic_scores.get(mem_id, 0),
"bm25": bm25_scores.get(mem_id, 0),
"links": link_scores.get(mem_id, 0)
}
combined = (
strat["semantic"] * 0.5 +
strat["bm25"] * 0.3 +
strat["links"] * 0.2
)
memory = self.memory_manager.semantic_memory.get(mem_id)
if memory and combined >= threshold:
# Update access tracking
memory.access_count += 1
memory.last_accessed = time.time()
# Try promotion
self.memory_manager.try_promote(mem_id, combined)
results.append(SearchResult(
id=mem_id,
content=memory.content,
score=combined,
tier=memory.tier,
strategy_scores=strat,
metadata=memory.metadata
))
self.tuner.record_outcome("similarity_threshold", threshold, True)
else:
self.tuner.record_outcome("similarity_threshold", threshold, False)
results.sort(key=lambda x: x.score, reverse=True)
return results[:top_k]
def get_context(self, query: str, top_k: int = 3,
namespace: Optional[str] = None) -> str:
"""Get formatted context for prompt injection"""
results = self.search(query, top_k=top_k, namespace=namespace)
if not results:
return ""
parts = ["[RELEVANT CONTEXT FROM MEMORY]"]
for r in results:
tier_marker = f"[{r.tier.value.upper()}]" if r.tier != MemoryTier.SEMANTIC else ""
parts.append(f"β€’ {tier_marker} {r.content}")
parts.append("[END CONTEXT]\n")
return "\n".join(parts)
def feedback(self, query: str, memory_id: str, relevance: float):
"""Record feedback for learning"""
relevance = max(-1, min(1, relevance))
if memory_id in self.memory_manager.semantic_memory:
memory = self.memory_manager.semantic_memory[memory_id]
# Update relevance score
memory.relevance_score = 0.7 * memory.relevance_score + 0.3 * ((relevance + 1) / 2)
# Strengthen/weaken links based on feedback
for link_id in self.link_manager.outgoing.get(memory_id, set()):
link = self.link_manager.links.get(link_id)
if link:
link.strength = max(0, min(1, link.strength + relevance * 0.05))
def maintenance_cycle(self):
"""Run SLM maintenance operations"""
# Decay priorities in working memory
self.memory_manager.decay_priorities()
# Decay link strengths
self.link_manager.decay_links()
# Prune weak links
links_pruned = self.link_manager.prune_weak_links()
# Decay memory quality (gentle)
memories_decayed = self.memory_manager.decay_memories()
self.memory_manager.stats["memories_decayed"] += memories_decayed
# Prune stale memories
memories_pruned, pruned_ids = self.memory_manager.prune_stale_memories()
self.memory_manager.stats["memories_pruned"] += memories_pruned
# Clean up links to pruned memories
for mem_id in pruned_ids:
self.link_manager.remove_links_for_memory(mem_id)
# Auto-tune parameters
adjustments = self.tuner.auto_tune()
return {
"links_pruned": links_pruned,
"memories_decayed": memories_decayed,
"memories_pruned": memories_pruned,
"parameter_adjustments": adjustments
}
def get_stats(self) -> Dict:
"""Get comprehensive statistics"""
return {
"memories": {
"total": len(self.memory_manager.semantic_memory),
**self.memory_manager.get_tier_stats()
},
"links": self.link_manager.get_stats(),
"utility_predictor": self.utility_predictor.stats,
"tuner": {
"parameters": self.tuner.parameters,
"adjustments": self.tuner.adjustment_count
},
"operations": self.stats
}
def clear(self):
"""Clear all memory"""
self.memory_manager = TieredMemoryManager(self.tuner)
self.link_manager = NeuralLinkManager()
self._embeddings.clear()
self._ids.clear()
self._tokenized_docs.clear()
self.bm25 = None
self._cache.clear()
if HAS_FAISS:
self.index = faiss.IndexFlatIP(self.embedding_dim)
def __len__(self):
return len(self.memory_manager.semantic_memory)
def __repr__(self):
return f"Mnemo(memories={len(self)}, links={len(self.link_manager.links)})"
# =============================================================================
# DEMO
# =============================================================================
def demo():
print("="*70)
print("MNEMO v4: SLM-INSPIRED ARCHITECTURE")
print("="*70)
m = Mnemo()
print(f"\nβœ“ Initialized: {m}")
# Show tuned parameters
print("\nπŸ“Š Tuned Parameters (adjusted from SLM):")
for param, value in m.tuner.parameters.items():
print(f" {param}: {value}")
# Add memories
print("\nπŸ“ Adding memories...")
memories = [
"User prefers Python because it has clean syntax and good libraries",
"Previous analysis showed gender bias in Victorian psychiatry diagnoses",
"Framework has 5 checkpoints for detecting historical medical bias",
"The project deadline is March 15th for the API redesign",
"User's coffee preference is cappuccino with oat milk"
]
for mem in memories:
result = m.add(mem)
status = "βœ“" if result else "βœ—"
print(f" {status} {mem[:50]}...")
# Test memory utility predictor
print("\n🧠 Memory Utility Predictions:")
tests = [
("What is Python?", False),
("Based on your previous analysis...", True),
("Compare to your earlier findings", True),
("This is a NEW topic", False),
]
for query, expected in tests:
result = m.should_inject(query)
status = "βœ“" if result == expected else "βœ—"
action = "INJECT" if result else "SKIP"
print(f" {status} {action}: {query}")
# Search
print("\nπŸ” Search Results:")
results = m.search("previous analysis framework", top_k=3)
for r in results:
print(f" [{r.tier.value}] score={r.score:.3f}: {r.content[:50]}...")
# Show neural links
print("\nπŸ”— Neural Links:")
link_stats = m.link_manager.get_stats()
print(f" Total links: {link_stats['total_links']}")
for lt, count in link_stats['links_by_type'].items():
if count > 0:
print(f" {lt}: {count}")
# Full stats
print("\nπŸ“Š Full Statistics:")
stats = m.get_stats()
print(f" Memories: {stats['memories']['total']}")
print(f" Working memory: {stats['memories']['working_memory_count']}")
print(f" Links: {stats['links']['total_links']}")
print(f" Utility predictions: {stats['utility_predictor']['predictions']}")
print("\n" + "="*70)
print("βœ… Demo complete!")
print("="*70)
if __name__ == "__main__":
demo()