| | |
| | """ |
| | Mnemo v4: SLM-Inspired Architecture |
| | ==================================== |
| | |
| | Implements key SLM architecture features with parameter adjustments |
| | based on Mnemo benchmark findings. |
| | |
| | SLM Features Implemented: |
| | 1. Three-Tiered Memory (Working β Token β Semantic) |
| | 2. Promotion/Demotion Algorithms |
| | 3. Neural Link Types (8 types with decay) |
| | 4. Self-Tuning Parameters |
| | 5. Memory Utility Predictor (NEW - from benchmarks) |
| | |
| | Key Parameter Adjustments (from benchmarks): |
| | - Semantic threshold: 0.65 β 0.50 (SLM was too high) |
| | - Quality acceptance: 0.30 β 0.50 (SLM too permissive) |
| | - Promotion threshold: 0.65 β 0.55 (faster promotion) |
| | - Link pruning: 60 days β 30 days (faster cleanup) |
| | """ |
| |
|
| | import hashlib |
| | import time |
| | import re |
| | import threading |
| | import numpy as np |
| | from typing import Dict, List, Optional, Tuple, Any, Set |
| | from dataclasses import dataclass, field |
| | from collections import defaultdict |
| | from enum import Enum |
| | import json |
| |
|
| | |
| | try: |
| | import faiss |
| | HAS_FAISS = True |
| | except ImportError: |
| | HAS_FAISS = False |
| |
|
| | try: |
| | import networkx as nx |
| | HAS_NETWORKX = True |
| | except ImportError: |
| | HAS_NETWORKX = False |
| |
|
| | try: |
| | from rank_bm25 import BM25Okapi |
| | HAS_BM25 = True |
| | except ImportError: |
| | HAS_BM25 = False |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class MemoryTier(Enum): |
| | """Three-tiered memory hierarchy from SLM""" |
| | WORKING = "working" |
| | TOKEN = "token" |
| | SEMANTIC = "semantic" |
| |
|
| |
|
| | class LinkType(Enum): |
| | """Eight link types from SLM Neural Link system""" |
| | DIRECT_REFERENCE = "direct_reference" |
| | SEMANTIC_SIMILARITY = "semantic_similarity" |
| | CO_OCCURRENCE = "co_occurrence" |
| | HIERARCHICAL = "hierarchical" |
| | TEMPORAL = "temporal" |
| | CAUSAL = "causal" |
| | CROSS_DOMAIN = "cross_domain" |
| | ASSOCIATIVE = "associative" |
| |
|
| |
|
| | |
| | LINK_PROPERTIES = { |
| | LinkType.DIRECT_REFERENCE: { |
| | "creation_threshold": 0.85, |
| | "initial_strength": 0.90, |
| | "decay_rate": 0.005, |
| | "usage_boost": 0.05 |
| | }, |
| | LinkType.SEMANTIC_SIMILARITY: { |
| | "creation_threshold": 0.50, |
| | "initial_strength": 0.75, |
| | "decay_rate": 0.01, |
| | "usage_boost": 0.03 |
| | }, |
| | LinkType.CO_OCCURRENCE: { |
| | "creation_threshold": 0.60, |
| | "initial_strength": 0.70, |
| | "decay_rate": 0.015, |
| | "usage_boost": 0.04 |
| | }, |
| | LinkType.HIERARCHICAL: { |
| | "creation_threshold": 0.80, |
| | "initial_strength": 0.85, |
| | "decay_rate": 0.003, |
| | "usage_boost": 0.02 |
| | }, |
| | LinkType.TEMPORAL: { |
| | "creation_threshold": 0.55, |
| | "initial_strength": 0.65, |
| | "decay_rate": 0.02, |
| | "usage_boost": 0.05 |
| | }, |
| | LinkType.CAUSAL: { |
| | "creation_threshold": 0.75, |
| | "initial_strength": 0.80, |
| | "decay_rate": 0.005, |
| | "usage_boost": 0.03 |
| | }, |
| | LinkType.CROSS_DOMAIN: { |
| | "creation_threshold": 0.70, |
| | "initial_strength": 0.65, |
| | "decay_rate": 0.008, |
| | "usage_boost": 0.04 |
| | }, |
| | LinkType.ASSOCIATIVE: { |
| | "creation_threshold": 0.45, |
| | "initial_strength": 0.60, |
| | "decay_rate": 0.025, |
| | "usage_boost": 0.06 |
| | } |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class Memory: |
| | """Memory unit with SLM-style metadata""" |
| | id: str |
| | content: str |
| | embedding: np.ndarray |
| | tier: MemoryTier = MemoryTier.SEMANTIC |
| | namespace: str = "default" |
| | |
| | |
| | quality_score: float = 0.5 |
| | relevance_score: float = 0.5 |
| | confidence: float = 0.5 |
| | |
| | |
| | access_count: int = 0 |
| | last_accessed: float = field(default_factory=time.time) |
| | created_at: float = field(default_factory=time.time) |
| | |
| | |
| | priority: float = 1.0 |
| | |
| | metadata: Dict = field(default_factory=dict) |
| |
|
| |
|
| | @dataclass |
| | class NeuralLink: |
| | """SLM Neural Link between memories""" |
| | source_id: str |
| | target_id: str |
| | link_type: LinkType |
| | strength: float |
| | created_at: float = field(default_factory=time.time) |
| | last_traversed: float = field(default_factory=time.time) |
| | traversal_count: int = 0 |
| |
|
| |
|
| | @dataclass |
| | class SearchResult: |
| | """Search result with multi-strategy scores""" |
| | id: str |
| | content: str |
| | score: float |
| | tier: MemoryTier = MemoryTier.SEMANTIC |
| | link_path: List[str] = field(default_factory=list) |
| | strategy_scores: Dict[str, float] = field(default_factory=dict) |
| | metadata: Dict = field(default_factory=dict) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class MemoryUtilityPredictor: |
| | """ |
| | Predicts whether memory injection will help or hurt. |
| | |
| | Key finding from benchmarks: |
| | - Within-conversation: Memory often HURTS (-3 to -12 pts) |
| | - Cross-session: Memory HELPS (+2 pts on dependent questions) |
| | """ |
| | |
| | |
| | INJECTION_SIGNALS = [ |
| | "previous", "earlier", "before", "you said", "you mentioned", |
| | "as you", "based on", "using your", "your analysis", "your framework", |
| | "we discussed", "we analyzed", "refer to", "from your", |
| | "compare", "contrast", "synthesize", "combine", "integrate", |
| | "apply your", "using your", "based on your", |
| | "you previously", "your earlier", "you have analyzed" |
| | ] |
| | |
| | |
| | SKIP_SIGNALS = [ |
| | "this is a new", "new topic", "different subject", |
| | "what is", "define", "explain what" |
| | ] |
| | |
| | def __init__(self): |
| | self.stats = { |
| | "predictions": 0, |
| | "inject_recommended": 0, |
| | "skip_recommended": 0, |
| | "skip_context_window": 0 |
| | } |
| | |
| | def should_inject(self, |
| | query: str, |
| | context: str = "", |
| | conversation_history: str = "", |
| | model_confidence: float = 0.5) -> Tuple[bool, str, float]: |
| | """ |
| | Predict if memory injection will help. |
| | |
| | Returns: |
| | (should_inject, reason, confidence) |
| | """ |
| | self.stats["predictions"] += 1 |
| | combined = (query + " " + context).lower() |
| | |
| | |
| | for signal in self.SKIP_SIGNALS: |
| | if signal in combined: |
| | self.stats["skip_recommended"] += 1 |
| | return False, f"skip_signal:{signal}", 0.8 |
| | |
| | |
| | for signal in self.INJECTION_SIGNALS: |
| | if signal in combined: |
| | |
| | if self._context_has_info(query, conversation_history): |
| | self.stats["skip_context_window"] += 1 |
| | return False, "context_window_sufficient", 0.7 |
| | |
| | self.stats["inject_recommended"] += 1 |
| | return True, f"inject_signal:{signal}", 0.85 |
| | |
| | |
| | if self._is_simple_query(query): |
| | self.stats["skip_recommended"] += 1 |
| | return False, "simple_query", 0.6 |
| | |
| | |
| | if model_confidence > 0.85: |
| | self.stats["skip_recommended"] += 1 |
| | return False, "model_confident", 0.7 |
| | |
| | |
| | self.stats["skip_recommended"] += 1 |
| | return False, "no_signal", 0.5 |
| | |
| | def _context_has_info(self, query: str, history: str) -> bool: |
| | """Check if conversation history already has needed context""" |
| | if not history or len(history.split()) < 200: |
| | return False |
| | |
| | query_keywords = set(query.lower().split()) - { |
| | "the", "a", "is", "are", "to", "of", "in", "for", "what", "how" |
| | } |
| | |
| | history_lower = history.lower() |
| | overlap = sum(1 for kw in query_keywords if kw in history_lower) |
| | |
| | return overlap >= len(query_keywords) * 0.6 |
| | |
| | def _is_simple_query(self, query: str) -> bool: |
| | """Detect simple factual queries that don't need memory""" |
| | simple_patterns = [ |
| | r"^what is\b", r"^who is\b", r"^when did\b", |
| | r"^where is\b", r"^how many\b", r"^define\b" |
| | ] |
| | query_lower = query.lower() |
| | return any(re.search(p, query_lower) for p in simple_patterns) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class SelfTuner: |
| | """ |
| | SLM Self-Tuning Parameter System |
| | |
| | Tracks performance and auto-adjusts parameters. |
| | """ |
| | |
| | def __init__(self): |
| | self.parameters = { |
| | "similarity_threshold": 0.10, |
| | "quality_threshold": 0.35, |
| | "promotion_threshold": 0.55, |
| | "demotion_threshold": 0.70, |
| | } |
| | |
| | self.performance_history = defaultdict(list) |
| | self.adjustment_count = 0 |
| | |
| | |
| | self.learning_rates = { |
| | "similarity_threshold": 0.01, |
| | "quality_threshold": 0.02, |
| | "promotion_threshold": 0.05, |
| | } |
| | |
| | def record_outcome(self, param_name: str, value: float, success: bool): |
| | """Record outcome for a parameter setting""" |
| | self.performance_history[param_name].append({ |
| | "value": value, |
| | "success": success, |
| | "timestamp": time.time() |
| | }) |
| | |
| | |
| | if len(self.performance_history[param_name]) > 100: |
| | self.performance_history[param_name] = \ |
| | self.performance_history[param_name][-100:] |
| | |
| | def should_adjust(self, param_name: str) -> bool: |
| | """Check if parameter should be adjusted (every 10 samples)""" |
| | history = self.performance_history.get(param_name, []) |
| | return len(history) >= 10 and len(history) % 10 == 0 |
| | |
| | def get_adjustment(self, param_name: str) -> float: |
| | """Calculate parameter adjustment based on recent performance""" |
| | history = self.performance_history.get(param_name, []) |
| | if len(history) < 10: |
| | return 0.0 |
| | |
| | recent = history[-10:] |
| | success_rate = sum(1 for h in recent if h["success"]) / len(recent) |
| | |
| | lr = self.learning_rates.get(param_name, 0.01) |
| | |
| | if success_rate < 0.5: |
| | |
| | return -lr |
| | elif success_rate > 0.8: |
| | |
| | return lr * 0.5 |
| | |
| | return 0.0 |
| | |
| | def auto_tune(self): |
| | """Run auto-tuning cycle""" |
| | adjusted = [] |
| | |
| | for param_name in self.parameters: |
| | if self.should_adjust(param_name): |
| | adjustment = self.get_adjustment(param_name) |
| | if adjustment != 0: |
| | old_val = self.parameters[param_name] |
| | new_val = max(0.1, min(0.9, old_val + adjustment)) |
| | self.parameters[param_name] = new_val |
| | adjusted.append((param_name, old_val, new_val)) |
| | self.adjustment_count += 1 |
| | |
| | return adjusted |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TieredMemoryManager: |
| | """ |
| | SLM Three-Tiered Memory Hierarchy |
| | |
| | Working Memory (32MB, <1ms): |
| | - Currently active info |
| | - Priority decay: 0.95/minute |
| | - Eviction threshold: 0.2 |
| | |
| | Token Memory (100-250 items, 1-10ms): |
| | - Compressed representations |
| | - Loop-based organization |
| | - Merging at 0.8 similarity |
| | |
| | Semantic Memory (persistent, 10-100ms): |
| | - Full knowledge representations |
| | - Partition-based organization |
| | """ |
| | |
| | |
| | WORKING_MEMORY_SIZE = 50 |
| | TOKEN_LOOP_CAPACITY = 100 |
| | TOKEN_LOOP_MAX = 250 |
| | |
| | PRIORITY_DECAY = 0.95 |
| | EVICTION_THRESHOLD = 0.2 |
| | LOOP_MERGE_THRESHOLD = 0.8 |
| | |
| | |
| | MEMORY_DECAY_RATE = 0.01 |
| | MEMORY_PRUNE_THRESHOLD = 0.15 |
| | MEMORY_STALE_DAYS = 30 |
| | |
| | def __init__(self, tuner: SelfTuner): |
| | self.tuner = tuner |
| | |
| | |
| | self.working_memory: Dict[str, Memory] = {} |
| | self.token_loops: Dict[str, List[str]] = defaultdict(list) |
| | self.semantic_memory: Dict[str, Memory] = {} |
| | |
| | self.stats = { |
| | "promotions": 0, |
| | "demotions": 0, |
| | "evictions": 0, |
| | "memories_decayed": 0, |
| | "memories_pruned": 0 |
| | } |
| | |
| | def add_to_tier(self, memory: Memory, tier: MemoryTier): |
| | """Add memory to specific tier""" |
| | memory.tier = tier |
| | |
| | if tier == MemoryTier.WORKING: |
| | self._add_to_working(memory) |
| | elif tier == MemoryTier.TOKEN: |
| | self._add_to_token(memory) |
| | else: |
| | self.semantic_memory[memory.id] = memory |
| | |
| | def _add_to_working(self, memory: Memory): |
| | """Add to working memory with eviction if needed""" |
| | if len(self.working_memory) >= self.WORKING_MEMORY_SIZE: |
| | self._evict_from_working() |
| | |
| | memory.priority = 1.0 |
| | self.working_memory[memory.id] = memory |
| | |
| | def _add_to_token(self, memory: Memory): |
| | """Add to token memory loop""" |
| | loop = self.token_loops[memory.namespace] |
| | |
| | if len(loop) >= self.TOKEN_LOOP_CAPACITY: |
| | |
| | oldest_id = loop.pop(0) |
| | if oldest_id in self.semantic_memory: |
| | self.semantic_memory[oldest_id].tier = MemoryTier.SEMANTIC |
| | |
| | loop.append(memory.id) |
| | self.semantic_memory[memory.id] = memory |
| | memory.tier = MemoryTier.TOKEN |
| | |
| | def _evict_from_working(self): |
| | """Evict lowest priority items from working memory""" |
| | if not self.working_memory: |
| | return |
| | |
| | |
| | min_id = min(self.working_memory, key=lambda k: self.working_memory[k].priority) |
| | evicted = self.working_memory.pop(min_id) |
| | |
| | |
| | self._add_to_token(evicted) |
| | self.stats["evictions"] += 1 |
| | |
| | def decay_priorities(self): |
| | """Apply SLM priority decay (0.95 per cycle)""" |
| | for memory in self.working_memory.values(): |
| | memory.priority *= self.PRIORITY_DECAY |
| | |
| | |
| | if memory.priority < self.EVICTION_THRESHOLD: |
| | self._evict_from_working() |
| | |
| | def calculate_promotion_score(self, memory: Memory, query_relevance: float) -> float: |
| | """ |
| | SLM Promotion Score: |
| | PromotionScore = (QueryRelevance * 0.6) + (AccessFrequency * 0.3) + (RecencyScore * 0.1) |
| | """ |
| | |
| | access_freq = min(memory.access_count / 10, 1.0) |
| | |
| | |
| | age_hours = (time.time() - memory.last_accessed) / 3600 |
| | recency = max(0, 1 - (age_hours / 24)) |
| | |
| | return (query_relevance * 0.6) + (access_freq * 0.3) + (recency * 0.1) |
| | |
| | def calculate_demotion_score(self, memory: Memory, query_relevance: float) -> float: |
| | """ |
| | SLM Demotion Score: |
| | DemotionScore = (1-QueryRelevance)*0.5 + (1-AccessFrequency)*0.3 + (Age/MAX_AGE)*0.2 |
| | """ |
| | access_freq = min(memory.access_count / 10, 1.0) |
| | |
| | age_hours = (time.time() - memory.created_at) / 3600 |
| | age_score = min(age_hours / 168, 1.0) |
| | |
| | return ((1 - query_relevance) * 0.5) + ((1 - access_freq) * 0.3) + (age_score * 0.2) |
| | |
| | def try_promote(self, memory_id: str, query_relevance: float) -> bool: |
| | """Try to promote memory to higher tier""" |
| | if memory_id not in self.semantic_memory: |
| | return False |
| | |
| | memory = self.semantic_memory[memory_id] |
| | score = self.calculate_promotion_score(memory, query_relevance) |
| | threshold = self.tuner.parameters["promotion_threshold"] |
| | |
| | if score > threshold: |
| | if memory.tier == MemoryTier.SEMANTIC: |
| | self._add_to_token(memory) |
| | self.stats["promotions"] += 1 |
| | return True |
| | elif memory.tier == MemoryTier.TOKEN: |
| | self._add_to_working(memory) |
| | self.stats["promotions"] += 1 |
| | return True |
| | |
| | return False |
| | |
| | def try_demote(self, memory_id: str, query_relevance: float) -> bool: |
| | """Try to demote memory to lower tier""" |
| | if memory_id in self.working_memory: |
| | memory = self.working_memory[memory_id] |
| | score = self.calculate_demotion_score(memory, query_relevance) |
| | threshold = self.tuner.parameters["demotion_threshold"] |
| | |
| | |
| | capacity_pressure = len(self.working_memory) / self.WORKING_MEMORY_SIZE |
| | |
| | if score > threshold and capacity_pressure > 0.8: |
| | self.working_memory.pop(memory_id) |
| | self._add_to_token(memory) |
| | self.stats["demotions"] += 1 |
| | return True |
| | |
| | return False |
| | |
| | def get_all_memories(self) -> Dict[str, Memory]: |
| | """Get all memories across tiers""" |
| | return {**self.semantic_memory, **self.working_memory} |
| | |
| | def decay_memories(self) -> int: |
| | """ |
| | Apply gentle quality decay to unused semantic memories. |
| | Memories that are accessed stay fresh; unused ones gradually decay. |
| | Returns number of memories affected. |
| | """ |
| | now = time.time() |
| | affected = 0 |
| | |
| | for memory in self.semantic_memory.values(): |
| | |
| | days_unused = (now - memory.last_accessed) / 86400 |
| | |
| | if days_unused > 1: |
| | |
| | |
| | decay_factor = min(days_unused * self.MEMORY_DECAY_RATE, 0.1) |
| | memory.quality_score *= (1 - decay_factor) |
| | affected += 1 |
| | |
| | return affected |
| | |
| | def prune_stale_memories(self) -> Tuple[int, List[str]]: |
| | """ |
| | Remove memories that have decayed below threshold. |
| | Returns (count_pruned, list_of_pruned_ids). |
| | """ |
| | now = time.time() |
| | to_prune = [] |
| | |
| | for mem_id, memory in self.semantic_memory.items(): |
| | days_unused = (now - memory.last_accessed) / 86400 |
| | |
| | |
| | if (memory.quality_score < self.MEMORY_PRUNE_THRESHOLD and |
| | days_unused > self.MEMORY_STALE_DAYS): |
| | to_prune.append(mem_id) |
| | |
| | |
| | pruned_ids = [] |
| | for mem_id in to_prune: |
| | del self.semantic_memory[mem_id] |
| | pruned_ids.append(mem_id) |
| | |
| | return len(pruned_ids), pruned_ids |
| | |
| | def refresh_memory(self, memory_id: str): |
| | """Mark a memory as freshly accessed (resets decay)""" |
| | if memory_id in self.semantic_memory: |
| | self.semantic_memory[memory_id].last_accessed = time.time() |
| | elif memory_id in self.working_memory: |
| | self.working_memory[memory_id].last_accessed = time.time() |
| | |
| | def get_tier_stats(self) -> Dict: |
| | """Get tier statistics""" |
| | return { |
| | "working_memory_count": len(self.working_memory), |
| | "working_memory_capacity": self.WORKING_MEMORY_SIZE, |
| | "token_loops": {ns: len(ids) for ns, ids in self.token_loops.items()}, |
| | "semantic_memory_count": len(self.semantic_memory), |
| | "promotions": self.stats["promotions"], |
| | "demotions": self.stats["demotions"], |
| | "evictions": self.stats["evictions"] |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class NeuralLinkManager: |
| | """ |
| | SLM Neural Link Pathway System |
| | |
| | Creates and manages typed connections between memories. |
| | """ |
| | |
| | |
| | MAX_PATH_DEPTH = 4 |
| | MIN_PATH_STRENGTH = 0.40 |
| | PATH_STRENGTH_DECAY = 0.9 |
| | MAX_BRANCHING = 12 |
| | |
| | |
| | PRUNE_STRENGTH_THRESHOLD = 0.25 |
| | PRUNE_AGE_DAYS = 30 |
| | |
| | def __init__(self): |
| | self.links: Dict[str, NeuralLink] = {} |
| | self.outgoing: Dict[str, Set[str]] = defaultdict(set) |
| | self.incoming: Dict[str, Set[str]] = defaultdict(set) |
| | |
| | self.stats = { |
| | "links_created": 0, |
| | "links_pruned": 0, |
| | "traversals": 0 |
| | } |
| | |
| | def _link_id(self, source: str, target: str, link_type: LinkType) -> str: |
| | """Generate link ID""" |
| | return f"{source}:{target}:{link_type.value}" |
| | |
| | def create_link(self, source_id: str, target_id: str, |
| | link_type: LinkType, similarity: float) -> Optional[str]: |
| | """ |
| | Create link if similarity exceeds type-specific threshold. |
| | |
| | SLM LinkScore = (VectorSimilarity * 0.6) + (CoOccurrence * 0.25) + (DomainRelatedness * 0.15) |
| | Simplified here to just similarity. |
| | """ |
| | props = LINK_PROPERTIES[link_type] |
| | |
| | if similarity < props["creation_threshold"]: |
| | return None |
| | |
| | link_id = self._link_id(source_id, target_id, link_type) |
| | |
| | if link_id in self.links: |
| | |
| | self.links[link_id].strength = min( |
| | 1.0, |
| | self.links[link_id].strength + props["usage_boost"] |
| | ) |
| | return link_id |
| | |
| | |
| | link = NeuralLink( |
| | source_id=source_id, |
| | target_id=target_id, |
| | link_type=link_type, |
| | strength=props["initial_strength"] |
| | ) |
| | |
| | self.links[link_id] = link |
| | self.outgoing[source_id].add(link_id) |
| | self.incoming[target_id].add(link_id) |
| | self.stats["links_created"] += 1 |
| | |
| | return link_id |
| | |
| | def traverse_link(self, link_id: str) -> Optional[NeuralLink]: |
| | """Traverse a link, strengthening it""" |
| | if link_id not in self.links: |
| | return None |
| | |
| | link = self.links[link_id] |
| | link.traversal_count += 1 |
| | link.last_traversed = time.time() |
| | |
| | |
| | props = LINK_PROPERTIES[link.link_type] |
| | link.strength = min(1.0, link.strength + props["usage_boost"]) |
| | |
| | self.stats["traversals"] += 1 |
| | return link |
| | |
| | def find_paths(self, source_id: str, target_id: str, |
| | max_depth: int = None) -> List[List[str]]: |
| | """Find paths between memories (SLM path finding)""" |
| | max_depth = max_depth or self.MAX_PATH_DEPTH |
| | paths = [] |
| | |
| | def dfs(current: str, target: str, path: List[str], |
| | strength: float, depth: int): |
| | if depth > max_depth or strength < self.MIN_PATH_STRENGTH: |
| | return |
| | |
| | if current == target: |
| | paths.append(path.copy()) |
| | return |
| | |
| | |
| | link_ids = list(self.outgoing.get(current, set()))[:self.MAX_BRANCHING] |
| | |
| | for link_id in link_ids: |
| | link = self.links.get(link_id) |
| | if link and link.target_id not in path: |
| | new_strength = strength * link.strength * self.PATH_STRENGTH_DECAY |
| | path.append(link.target_id) |
| | dfs(link.target_id, target, path, new_strength, depth + 1) |
| | path.pop() |
| | |
| | dfs(source_id, target_id, [source_id], 1.0, 0) |
| | return paths |
| | |
| | def get_connected(self, memory_id: str, link_types: List[LinkType] = None) -> List[str]: |
| | """Get memories connected to this one""" |
| | connected = [] |
| | |
| | for link_id in self.outgoing.get(memory_id, set()): |
| | link = self.links.get(link_id) |
| | if link: |
| | if link_types is None or link.link_type in link_types: |
| | connected.append(link.target_id) |
| | |
| | return connected |
| | |
| | def decay_links(self): |
| | """Apply daily decay to all links""" |
| | for link in self.links.values(): |
| | props = LINK_PROPERTIES[link.link_type] |
| | link.strength *= (1 - props["decay_rate"]) |
| | |
| | def prune_weak_links(self) -> int: |
| | """Prune links below strength threshold and unused for too long""" |
| | to_prune = [] |
| | now = time.time() |
| | age_threshold = self.PRUNE_AGE_DAYS * 24 * 3600 |
| | |
| | for link_id, link in self.links.items(): |
| | age = now - link.last_traversed |
| | if link.strength < self.PRUNE_STRENGTH_THRESHOLD and age > age_threshold: |
| | to_prune.append(link_id) |
| | |
| | for link_id in to_prune: |
| | link = self.links.pop(link_id) |
| | self.outgoing[link.source_id].discard(link_id) |
| | self.incoming[link.target_id].discard(link_id) |
| | self.stats["links_pruned"] += 1 |
| | |
| | return len(to_prune) |
| | |
| | def remove_links_for_memory(self, memory_id: str) -> int: |
| | """Remove all links connected to a memory (when memory is pruned)""" |
| | to_remove = [] |
| | |
| | |
| | for link_id, link in self.links.items(): |
| | if link.source_id == memory_id or link.target_id == memory_id: |
| | to_remove.append(link_id) |
| | |
| | |
| | for link_id in to_remove: |
| | link = self.links.pop(link_id) |
| | self.outgoing[link.source_id].discard(link_id) |
| | self.incoming[link.target_id].discard(link_id) |
| | self.stats["links_pruned"] += 1 |
| | |
| | |
| | if memory_id in self.outgoing: |
| | del self.outgoing[memory_id] |
| | if memory_id in self.incoming: |
| | del self.incoming[memory_id] |
| | |
| | return len(to_remove) |
| | |
| | def get_stats(self) -> Dict: |
| | return { |
| | "total_links": len(self.links), |
| | "links_by_type": { |
| | lt.value: sum(1 for l in self.links.values() if l.link_type == lt) |
| | for lt in LinkType |
| | }, |
| | **self.stats |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class Mnemo: |
| | """ |
| | Mnemo v4: SLM-Inspired Memory System |
| | |
| | Implements: |
| | - Three-tiered memory hierarchy |
| | - Neural link pathways (8 types) |
| | - Self-tuning parameters |
| | - Memory utility prediction |
| | |
| | With parameter adjustments based on Mnemo benchmarks. |
| | """ |
| | |
| | STOP_WORDS = {"a", "an", "the", "is", "are", "was", "were", "be", "been", |
| | "to", "of", "in", "for", "on", "with", "at", "by", "from", |
| | "and", "but", "or", "not", "this", "that", "i", "me", "my"} |
| | |
| | def __init__(self, embedding_dim: int = 384): |
| | self.embedding_dim = embedding_dim |
| | |
| | |
| | self.tuner = SelfTuner() |
| | self.memory_manager = TieredMemoryManager(self.tuner) |
| | self.link_manager = NeuralLinkManager() |
| | self.utility_predictor = MemoryUtilityPredictor() |
| | |
| | |
| | self._embeddings: List[np.ndarray] = [] |
| | self._ids: List[str] = [] |
| | |
| | if HAS_FAISS: |
| | self.index = faiss.IndexFlatIP(embedding_dim) |
| | else: |
| | self.index = None |
| | |
| | |
| | self.bm25 = None |
| | self._tokenized_docs: List[List[str]] = [] |
| | |
| | |
| | if HAS_NETWORKX: |
| | self.graph = nx.DiGraph() |
| | else: |
| | self.graph = None |
| | |
| | |
| | self._cache: Dict[str, Any] = {} |
| | self._cache_lock = threading.Lock() |
| | |
| | |
| | self.stats = { |
| | "adds": 0, |
| | "adds_rejected": 0, |
| | "searches": 0, |
| | "cache_hits": 0, |
| | "cache_misses": 0 |
| | } |
| | |
| | def _get_embedding(self, text: str) -> np.ndarray: |
| | """Generate embedding (hash-based for POC)""" |
| | cache_key = f"emb:{hashlib.md5(text.encode()).hexdigest()}" |
| | |
| | with self._cache_lock: |
| | if cache_key in self._cache: |
| | self.stats["cache_hits"] += 1 |
| | return self._cache[cache_key] |
| | self.stats["cache_misses"] += 1 |
| | |
| | |
| | embedding = np.zeros(self.embedding_dim, dtype=np.float32) |
| | words = text.lower().split() |
| | for i, word in enumerate(words): |
| | idx = hash(word) % self.embedding_dim |
| | embedding[idx] += 1.0 / (i + 1) |
| | |
| | norm = np.linalg.norm(embedding) |
| | if norm > 0: |
| | embedding = embedding / norm |
| | |
| | with self._cache_lock: |
| | self._cache[cache_key] = embedding |
| | |
| | return embedding |
| | |
| | def _estimate_quality(self, content: str) -> float: |
| | """Estimate content quality (SLM quality gates)""" |
| | score = 0.5 |
| | words = len(content.split()) |
| | |
| | if words < 5: |
| | score -= 0.3 |
| | elif words > 20: |
| | score += 0.1 |
| | |
| | if any(r in content.lower() for r in ["because", "therefore", "shows"]): |
| | score += 0.2 |
| | |
| | if re.search(r'\d+', content): |
| | score += 0.1 |
| | |
| | if any(v in content.lower() for v in ["something", "stuff", "maybe"]): |
| | score -= 0.2 |
| | |
| | return max(0.0, min(1.0, score)) |
| | |
| | def should_inject(self, query: str, context: str = "", |
| | conversation_history: str = "", |
| | model_confidence: float = 0.5) -> bool: |
| | """ |
| | Memory Utility Predictor - should we inject memory? |
| | |
| | Based on benchmark findings that memory often hurts performance. |
| | """ |
| | should, reason, confidence = self.utility_predictor.should_inject( |
| | query, context, conversation_history, model_confidence |
| | ) |
| | return should |
| | |
| | def add(self, content: str, namespace: str = "default", |
| | metadata: Dict = None, skip_quality_check: bool = False) -> Optional[str]: |
| | """Add memory with SLM quality gates""" |
| | quality = self._estimate_quality(content) |
| | threshold = self.tuner.parameters["quality_threshold"] |
| | |
| | if not skip_quality_check and quality < threshold: |
| | self.stats["adds_rejected"] += 1 |
| | self.tuner.record_outcome("quality_threshold", threshold, False) |
| | return None |
| | |
| | memory_id = f"mem_{hashlib.md5(content.encode()).hexdigest()[:8]}" |
| | embedding = self._get_embedding(content) |
| | |
| | memory = Memory( |
| | id=memory_id, |
| | content=content, |
| | embedding=embedding, |
| | namespace=namespace, |
| | quality_score=quality, |
| | metadata=metadata or {} |
| | ) |
| | |
| | |
| | self.memory_manager.add_to_tier(memory, MemoryTier.SEMANTIC) |
| | |
| | |
| | self._embeddings.append(embedding) |
| | self._ids.append(memory_id) |
| | |
| | if HAS_FAISS and self.index is not None: |
| | self.index.add(embedding.reshape(1, -1)) |
| | |
| | tokens = content.lower().split() |
| | self._tokenized_docs.append(tokens) |
| | if HAS_BM25: |
| | self.bm25 = BM25Okapi(self._tokenized_docs) |
| | |
| | |
| | self._create_links_for_new_memory(memory_id, embedding) |
| | |
| | self.stats["adds"] += 1 |
| | self.tuner.record_outcome("quality_threshold", threshold, True) |
| | |
| | return memory_id |
| | |
| | def _create_links_for_new_memory(self, memory_id: str, embedding: np.ndarray): |
| | """Create neural links to similar memories""" |
| | if len(self._ids) < 2: |
| | return |
| | |
| | |
| | similarities = [] |
| | for other_id, other_emb in zip(self._ids, self._embeddings): |
| | if other_id != memory_id: |
| | sim = float(np.dot(embedding, other_emb)) |
| | similarities.append((other_id, sim)) |
| | |
| | |
| | similarities.sort(key=lambda x: x[1], reverse=True) |
| | |
| | |
| | for other_id, sim in similarities[:5]: |
| | |
| | self.link_manager.create_link( |
| | memory_id, other_id, LinkType.SEMANTIC_SIMILARITY, sim |
| | ) |
| | self.link_manager.create_link( |
| | other_id, memory_id, LinkType.SEMANTIC_SIMILARITY, sim |
| | ) |
| | |
| | def search(self, query: str, top_k: int = 5, |
| | namespace: Optional[str] = None, |
| | use_links: bool = True) -> List[SearchResult]: |
| | """ |
| | Search with multi-strategy retrieval + neural links |
| | """ |
| | if not self.memory_manager.semantic_memory: |
| | return [] |
| | |
| | self.stats["searches"] += 1 |
| | query_embedding = self._get_embedding(query) |
| | threshold = self.tuner.parameters["similarity_threshold"] |
| | |
| | |
| | semantic_scores = {} |
| | if HAS_FAISS and self.index is not None and self.index.ntotal > 0: |
| | k = min(top_k * 3, self.index.ntotal) |
| | scores, indices = self.index.search(query_embedding.reshape(1, -1), k) |
| | for score, idx in zip(scores[0], indices[0]): |
| | if 0 <= idx < len(self._ids): |
| | semantic_scores[self._ids[idx]] = float(score) |
| | else: |
| | for mem_id, emb in zip(self._ids, self._embeddings): |
| | semantic_scores[mem_id] = float(np.dot(query_embedding, emb)) |
| | |
| | |
| | bm25_scores = {} |
| | if HAS_BM25 and self.bm25 is not None: |
| | tokens = query.lower().split() |
| | scores = self.bm25.get_scores(tokens) |
| | max_score = max(scores) if len(scores) > 0 and max(scores) > 0 else 1 |
| | for idx, score in enumerate(scores): |
| | if score > 0.1 * max_score: |
| | bm25_scores[self._ids[idx]] = float(score / max_score) |
| | |
| | |
| | link_scores = {} |
| | if use_links: |
| | |
| | top_semantic = sorted(semantic_scores.items(), key=lambda x: x[1], reverse=True)[:3] |
| | for mem_id, _ in top_semantic: |
| | connected = self.link_manager.get_connected(mem_id) |
| | for conn_id in connected[:5]: |
| | link_scores[conn_id] = link_scores.get(conn_id, 0) + 0.3 |
| | |
| | |
| | all_ids = set(semantic_scores.keys()) | set(bm25_scores.keys()) | set(link_scores.keys()) |
| | |
| | if namespace: |
| | |
| | all_ids = {mid for mid in all_ids |
| | if mid in self.memory_manager.semantic_memory |
| | and self.memory_manager.semantic_memory[mid].namespace == namespace} |
| | |
| | results = [] |
| | for mem_id in all_ids: |
| | strat = { |
| | "semantic": semantic_scores.get(mem_id, 0), |
| | "bm25": bm25_scores.get(mem_id, 0), |
| | "links": link_scores.get(mem_id, 0) |
| | } |
| | |
| | combined = ( |
| | strat["semantic"] * 0.5 + |
| | strat["bm25"] * 0.3 + |
| | strat["links"] * 0.2 |
| | ) |
| | |
| | memory = self.memory_manager.semantic_memory.get(mem_id) |
| | if memory and combined >= threshold: |
| | |
| | memory.access_count += 1 |
| | memory.last_accessed = time.time() |
| | |
| | |
| | self.memory_manager.try_promote(mem_id, combined) |
| | |
| | results.append(SearchResult( |
| | id=mem_id, |
| | content=memory.content, |
| | score=combined, |
| | tier=memory.tier, |
| | strategy_scores=strat, |
| | metadata=memory.metadata |
| | )) |
| | |
| | self.tuner.record_outcome("similarity_threshold", threshold, True) |
| | else: |
| | self.tuner.record_outcome("similarity_threshold", threshold, False) |
| | |
| | results.sort(key=lambda x: x.score, reverse=True) |
| | return results[:top_k] |
| | |
| | def get_context(self, query: str, top_k: int = 3, |
| | namespace: Optional[str] = None) -> str: |
| | """Get formatted context for prompt injection""" |
| | results = self.search(query, top_k=top_k, namespace=namespace) |
| | |
| | if not results: |
| | return "" |
| | |
| | parts = ["[RELEVANT CONTEXT FROM MEMORY]"] |
| | for r in results: |
| | tier_marker = f"[{r.tier.value.upper()}]" if r.tier != MemoryTier.SEMANTIC else "" |
| | parts.append(f"β’ {tier_marker} {r.content}") |
| | parts.append("[END CONTEXT]\n") |
| | |
| | return "\n".join(parts) |
| | |
| | def feedback(self, query: str, memory_id: str, relevance: float): |
| | """Record feedback for learning""" |
| | relevance = max(-1, min(1, relevance)) |
| | |
| | if memory_id in self.memory_manager.semantic_memory: |
| | memory = self.memory_manager.semantic_memory[memory_id] |
| | |
| | |
| | memory.relevance_score = 0.7 * memory.relevance_score + 0.3 * ((relevance + 1) / 2) |
| | |
| | |
| | for link_id in self.link_manager.outgoing.get(memory_id, set()): |
| | link = self.link_manager.links.get(link_id) |
| | if link: |
| | link.strength = max(0, min(1, link.strength + relevance * 0.05)) |
| | |
| | def maintenance_cycle(self): |
| | """Run SLM maintenance operations""" |
| | |
| | self.memory_manager.decay_priorities() |
| | |
| | |
| | self.link_manager.decay_links() |
| | |
| | |
| | links_pruned = self.link_manager.prune_weak_links() |
| | |
| | |
| | memories_decayed = self.memory_manager.decay_memories() |
| | self.memory_manager.stats["memories_decayed"] += memories_decayed |
| | |
| | |
| | memories_pruned, pruned_ids = self.memory_manager.prune_stale_memories() |
| | self.memory_manager.stats["memories_pruned"] += memories_pruned |
| | |
| | |
| | for mem_id in pruned_ids: |
| | self.link_manager.remove_links_for_memory(mem_id) |
| | |
| | |
| | adjustments = self.tuner.auto_tune() |
| | |
| | return { |
| | "links_pruned": links_pruned, |
| | "memories_decayed": memories_decayed, |
| | "memories_pruned": memories_pruned, |
| | "parameter_adjustments": adjustments |
| | } |
| | |
| | def get_stats(self) -> Dict: |
| | """Get comprehensive statistics""" |
| | return { |
| | "memories": { |
| | "total": len(self.memory_manager.semantic_memory), |
| | **self.memory_manager.get_tier_stats() |
| | }, |
| | "links": self.link_manager.get_stats(), |
| | "utility_predictor": self.utility_predictor.stats, |
| | "tuner": { |
| | "parameters": self.tuner.parameters, |
| | "adjustments": self.tuner.adjustment_count |
| | }, |
| | "operations": self.stats |
| | } |
| | |
| | def clear(self): |
| | """Clear all memory""" |
| | self.memory_manager = TieredMemoryManager(self.tuner) |
| | self.link_manager = NeuralLinkManager() |
| | self._embeddings.clear() |
| | self._ids.clear() |
| | self._tokenized_docs.clear() |
| | self.bm25 = None |
| | self._cache.clear() |
| | |
| | if HAS_FAISS: |
| | self.index = faiss.IndexFlatIP(self.embedding_dim) |
| | |
| | def __len__(self): |
| | return len(self.memory_manager.semantic_memory) |
| | |
| | def __repr__(self): |
| | return f"Mnemo(memories={len(self)}, links={len(self.link_manager.links)})" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def demo(): |
| | print("="*70) |
| | print("MNEMO v4: SLM-INSPIRED ARCHITECTURE") |
| | print("="*70) |
| | |
| | m = Mnemo() |
| | print(f"\nβ Initialized: {m}") |
| | |
| | |
| | print("\nπ Tuned Parameters (adjusted from SLM):") |
| | for param, value in m.tuner.parameters.items(): |
| | print(f" {param}: {value}") |
| | |
| | |
| | print("\nπ Adding memories...") |
| | memories = [ |
| | "User prefers Python because it has clean syntax and good libraries", |
| | "Previous analysis showed gender bias in Victorian psychiatry diagnoses", |
| | "Framework has 5 checkpoints for detecting historical medical bias", |
| | "The project deadline is March 15th for the API redesign", |
| | "User's coffee preference is cappuccino with oat milk" |
| | ] |
| | |
| | for mem in memories: |
| | result = m.add(mem) |
| | status = "β" if result else "β" |
| | print(f" {status} {mem[:50]}...") |
| | |
| | |
| | print("\nπ§ Memory Utility Predictions:") |
| | tests = [ |
| | ("What is Python?", False), |
| | ("Based on your previous analysis...", True), |
| | ("Compare to your earlier findings", True), |
| | ("This is a NEW topic", False), |
| | ] |
| | |
| | for query, expected in tests: |
| | result = m.should_inject(query) |
| | status = "β" if result == expected else "β" |
| | action = "INJECT" if result else "SKIP" |
| | print(f" {status} {action}: {query}") |
| | |
| | |
| | print("\nπ Search Results:") |
| | results = m.search("previous analysis framework", top_k=3) |
| | for r in results: |
| | print(f" [{r.tier.value}] score={r.score:.3f}: {r.content[:50]}...") |
| | |
| | |
| | print("\nπ Neural Links:") |
| | link_stats = m.link_manager.get_stats() |
| | print(f" Total links: {link_stats['total_links']}") |
| | for lt, count in link_stats['links_by_type'].items(): |
| | if count > 0: |
| | print(f" {lt}: {count}") |
| | |
| | |
| | print("\nπ Full Statistics:") |
| | stats = m.get_stats() |
| | print(f" Memories: {stats['memories']['total']}") |
| | print(f" Working memory: {stats['memories']['working_memory_count']}") |
| | print(f" Links: {stats['links']['total_links']}") |
| | print(f" Utility predictions: {stats['utility_predictor']['predictions']}") |
| | |
| | print("\n" + "="*70) |
| | print("β
Demo complete!") |
| | print("="*70) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo() |
| |
|