Spaces:
Running
Running
| """ | |
| Context engine for Janus — provides rich context injection into every LLM call. | |
| FIXES vs previous version: | |
| - pending_thoughts queue was growing unboundedly (one malformed thought per daemon cycle) | |
| - Topic extraction was cutting off queries at "what is the" instead of extracting meaning | |
| - Deduplication: identical thoughts no longer accumulate | |
| - Hard cap: max 20 pending thoughts, oldest dropped when full | |
| - Better topic extraction: skip stopwords, take the meaningful noun phrase | |
| """ | |
| import time | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from app.config import DATA_DIR | |
| except ImportError: | |
| DATA_DIR = Path(__file__).parent.parent / "data" | |
| CONTEXT_FILE = Path(DATA_DIR) / "daemon" / "context.json" | |
| # Stopwords to skip when extracting topic from a query | |
| _TOPIC_STOPWORDS = { | |
| "what", "is", "the", "a", "an", "are", "was", "were", "how", "why", | |
| "when", "where", "who", "which", "will", "can", "could", "should", | |
| "would", "do", "does", "did", "tell", "me", "about", "explain", | |
| "give", "show", "find", "get", "make", "help", "please", "i", "we", | |
| "my", "our", "your", "their", "its", "this", "that", "these", "those", | |
| "some", "any", "all", "more", "most", "much", "many", "few", "little", | |
| "of", "in", "on", "at", "to", "for", "by", "from", "with", "and", | |
| "or", "but", "not", "no", "if", "than", "then", "so", "yet", | |
| "hi", "hey", "hello", "howdy", "greetings", "yo", "sup", | |
| "use", "conversation", "below", "context", "answer", "latest", | |
| "message", "messages", "assistant", "system", "user", | |
| "simulate", "happens", "happening", "did", "does", "whats", "what's", "it's", | |
| } | |
| MAX_PENDING_THOUGHTS = 20 # hard cap — was unbounded | |
| MAX_THOUGHT_AGE_HOURS = 24 # drop thoughts older than 24h | |
| _META_TOPIC_WORDS = { | |
| "conversation", "context", "latest", "message", "messages", "assistant", "system", "user" | |
| } | |
| def _extract_topic(query: str) -> str: | |
| """ | |
| Extract the meaningful topic from a query, skipping leading stopwords. | |
| Examples: | |
| "what is the stock market" → "stock market" | |
| "how does inflation work" → "inflation" | |
| "tell me about AAPL earnings" → "AAPL earnings" | |
| "what is the" → "general query" (was causing the bug) | |
| """ | |
| import re | |
| # Clean and tokenize | |
| words = re.findall(r"[a-zA-Z0-9$€£%]+", query.lower()) | |
| # Skip leading stopwords | |
| meaningful = [] | |
| for w in words: | |
| if w not in _TOPIC_STOPWORDS or meaningful: | |
| if w not in _TOPIC_STOPWORDS: | |
| meaningful.append(w) | |
| # Take up to 4 meaningful words | |
| topic = " ".join(meaningful[:4]) | |
| return topic if topic and len(topic) > 2 else "general query" | |
| def _is_meta_topic(topic: str) -> bool: | |
| words = set((topic or "").lower().split()) | |
| return bool(words & _META_TOPIC_WORDS) | |
| class ContextEngine: | |
| """Manages system-wide context for LLM injection.""" | |
| def __init__(self): | |
| self._pending_thoughts: list = [] | |
| self._context_cache: dict = {} | |
| self._conversation_count: int = 0 | |
| self._last_topic: str = "" | |
| self._last_interaction: float = 0 | |
| self._recurring_interests: list = [] | |
| self._load() | |
| def _load(self): | |
| import json | |
| if CONTEXT_FILE.exists(): | |
| try: | |
| data = json.loads(CONTEXT_FILE.read_text()) | |
| raw_thoughts = data.get("pending_thoughts", []) | |
| # FIXED: deduplicate on load, enforce cap and age limit | |
| self._pending_thoughts = self._clean_thoughts(raw_thoughts) | |
| self._conversation_count = data.get("conversation_count", 0) | |
| self._last_topic = data.get("last_topic", "") | |
| self._last_interaction = data.get("last_interaction", 0) | |
| self._recurring_interests = data.get("recurring_interests", []) | |
| if _is_meta_topic(self._last_topic): | |
| self._last_topic = "" | |
| self._recurring_interests = [ | |
| topic for topic in self._recurring_interests if not _is_meta_topic(topic) | |
| ] | |
| except Exception as e: | |
| logger.warning(f"ContextEngine: load failed: {e}") | |
| def _save(self): | |
| import json | |
| CONTEXT_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| CONTEXT_FILE.write_text(json.dumps({ | |
| "pending_thoughts": self._pending_thoughts, | |
| "conversation_count": self._conversation_count, | |
| "last_topic": self._last_topic, | |
| "last_interaction": self._last_interaction, | |
| "recurring_interests": self._recurring_interests, | |
| }, indent=2)) | |
| except Exception as e: | |
| logger.warning(f"ContextEngine: save failed: {e}") | |
| def _clean_thoughts(self, thoughts: list) -> list: | |
| """ | |
| Deduplicate, enforce age limit, enforce count cap. | |
| Returns list sorted by priority desc, newest first within same priority. | |
| """ | |
| now = time.time() | |
| seen_texts = set() | |
| clean = [] | |
| for t in thoughts: | |
| text = t.get("thought", "").strip() | |
| if not text: | |
| continue | |
| # Skip duplicates | |
| if text in seen_texts: | |
| continue | |
| # Skip ancient thoughts | |
| age_hours = (now - t.get("created_at", now)) / 3600 | |
| if age_hours > MAX_THOUGHT_AGE_HOURS: | |
| continue | |
| seen_texts.add(text) | |
| clean.append(t) | |
| # Sort by priority desc | |
| clean.sort(key=lambda x: (-x.get("priority", 0), -x.get("created_at", 0))) | |
| # Enforce cap | |
| return clean[:MAX_PENDING_THOUGHTS] | |
| def add_pending_thought(self, thought: str, priority: float = 0.5, source: str = "system", force: bool = False): | |
| """Add a thought to the pending queue — with dedup and cap enforcement.""" | |
| thought = thought.strip() | |
| if not thought or len(thought) < 15: | |
| return | |
| # NEW: rate-limit daemon sources to 1 thought per hour (unless forced) | |
| now = time.time() | |
| if source in {"dream", "curiosity", "daemon"} and not force: | |
| last = getattr(self, '_last_daemon_thought', 0) | |
| if now - last < 3600: | |
| return | |
| self._last_daemon_thought = now | |
| # NEW: fingerprint dedup (first 80 chars) | |
| import re | |
| def fp(t): return re.sub(r"[^a-z0-9 ]", "", t.lower())[:80] | |
| if any(fp(thought) == fp(t.get("thought","")) for t in self._pending_thoughts): | |
| return | |
| # Deduplicate by exact text (kept for backwards compat) | |
| existing_texts = {t.get("thought", "") for t in self._pending_thoughts} | |
| if thought in existing_texts: | |
| logger.debug(f"ContextEngine: duplicate thought skipped: {thought[:60]}") | |
| return | |
| self._pending_thoughts.append({ | |
| "thought": thought, | |
| "priority": priority, | |
| "created_at": time.time(), | |
| "source": source, | |
| }) | |
| # Apply cleaning after each add to enforce cap | |
| self._pending_thoughts = self._clean_thoughts(self._pending_thoughts) | |
| self._save() | |
| def get_pending_thoughts(self) -> list: | |
| """Return current pending thoughts (deduplicated, capped).""" | |
| self._pending_thoughts = self._clean_thoughts(self._pending_thoughts) | |
| return self._pending_thoughts | |
| def clear_delivered_thoughts(self, count: int = 3): | |
| """Mark the top N thoughts as delivered (remove them from queue).""" | |
| self._pending_thoughts = self._pending_thoughts[count:] | |
| self._save() | |
| def build_context(self, user_input: str) -> dict: | |
| """Build the full context dict for injection into LLM calls.""" | |
| now = time.time() | |
| hours_away = (now - self._last_interaction) / 3600 if self._last_interaction else None | |
| # Extract topic properly — FIXED | |
| topic = _extract_topic(user_input) if user_input else "" | |
| # Update recurring interests | |
| if topic and topic != "general query" and not _is_meta_topic(topic): | |
| if topic not in self._recurring_interests: | |
| self._recurring_interests.insert(0, topic) | |
| self._recurring_interests = self._recurring_interests[:10] | |
| return { | |
| "user": { | |
| "is_returning": self._conversation_count > 0, | |
| "conversation_count": self._conversation_count, | |
| "last_topic": self._last_topic, | |
| "time_away": f"{hours_away:.0f}h" if hours_away and hours_away > 1 else None, | |
| "recurring_interests": self._recurring_interests[:5], | |
| }, | |
| "system_self": { | |
| "pending_thoughts": self.get_pending_thoughts()[:3], | |
| "recent_discoveries": [], | |
| }, | |
| "self_reflection": {}, | |
| "current_topic": topic, | |
| } | |
| def update_after_interaction(self, user_input: str, response: str, context: dict): | |
| """Update state after each interaction.""" | |
| topic = _extract_topic(user_input) | |
| if topic and topic != "general query" and not _is_meta_topic(topic): | |
| self._last_topic = topic | |
| self._last_interaction = time.time() | |
| self._conversation_count += 1 | |
| self._save() | |
| def record_performance(self, success: bool, confidence: float, elapsed: float): | |
| pass # Telemetry hook — can be extended | |
| # Module-level singleton | |
| context_engine = ContextEngine() | |