Spaces:
Sleeping
Sleeping
| """ | |
| Cogni-Engine v1 — Cognitive Engine (Brain) | |
| The central coordinator that connects all components. | |
| Processes user requests through three stages: | |
| 1. UNDERSTAND — Parse intent, extract entities, build query | |
| 2. REASON — Search graph, traverse, build reasoning chains | |
| 3. RESPOND — Generate natural language from chains | |
| Also manages conversation sessions and knowledge extraction. | |
| """ | |
| import time | |
| import threading | |
| from typing import List, Dict, Optional, Tuple, Any | |
| import numpy as np | |
| import config | |
| import utils | |
| from knowledge import KnowledgeGraph, Node, Edge, ReasoningChain | |
| from language import LanguageGenerator | |
| from thinker import Thinker | |
| # ═══════════════════════════════════════════════════════════ | |
| # CONVERSATION SESSION | |
| # ═══════════════════════════════════════════════════════════ | |
| class Session: | |
| """ | |
| Tracks a multi-turn conversation. | |
| Maintains context window for coherent dialogue. | |
| """ | |
| __slots__ = [ | |
| 'id', 'messages', 'context_entities', 'context_node_ids', | |
| 'system_prompt', 'personality', 'last_active', 'turn_count' | |
| ] | |
| def __init__(self, session_id: str, system_prompt: str = ""): | |
| self.id = session_id | |
| self.messages: List[dict] = [] # [{role, content}] | |
| self.context_entities: List[str] = [] | |
| self.context_node_ids: List[str] = [] | |
| self.system_prompt = system_prompt | |
| self.personality = utils.parse_system_prompt(system_prompt) | |
| self.last_active = time.time() | |
| self.turn_count = 0 | |
| def add_message(self, role: str, content: str): | |
| """Add a message to conversation history.""" | |
| self.messages.append({"role": role, "content": content}) | |
| # Trim to context window | |
| max_messages = config.CONTEXT_WINDOW_TURNS * 2 # user + assistant pairs | |
| if len(self.messages) > max_messages: | |
| # Keep system prompt awareness but trim old messages | |
| self.messages = self.messages[-max_messages:] | |
| self.last_active = time.time() | |
| self.turn_count += 1 | |
| def add_context_entities(self, entities: List[str]): | |
| """Add discovered entities to session context.""" | |
| for e in entities: | |
| if e not in self.context_entities: | |
| self.context_entities.append(e) | |
| # Keep last N entities | |
| self.context_entities = self.context_entities[-30:] | |
| def add_context_nodes(self, node_ids: List[str]): | |
| """Add discovered node IDs to session context.""" | |
| for nid in node_ids: | |
| if nid not in self.context_node_ids: | |
| self.context_node_ids.append(nid) | |
| self.context_node_ids = self.context_node_ids[-50:] | |
| def get_context_text(self) -> str: | |
| """Get combined context from recent messages.""" | |
| recent = self.messages[-config.CONTEXT_WINDOW_TURNS * 2:] | |
| parts = [] | |
| for msg in recent: | |
| if msg["role"] == "user": | |
| parts.append(msg["content"]) | |
| return " ".join(parts) | |
| def is_expired(self) -> bool: | |
| """Check if session has expired.""" | |
| return (time.time() - self.last_active) > (config.SESSION_TIMEOUT_MINUTES * 60) | |
| # ═══════════════════════════════════════════════════════════ | |
| # SESSION MANAGER | |
| # ═══════════════════════════════════════════════════════════ | |
| class SessionManager: | |
| """Manages active conversation sessions.""" | |
| def __init__(self): | |
| self._sessions: Dict[str, Session] = {} | |
| self._lock = threading.Lock() | |
| def get_or_create( | |
| self, | |
| session_id: str = None, | |
| system_prompt: str = "" | |
| ) -> Session: | |
| """Get existing session or create new one.""" | |
| with self._lock: | |
| if session_id and session_id in self._sessions: | |
| session = self._sessions[session_id] | |
| session.last_active = time.time() | |
| # Update system prompt if changed | |
| if system_prompt and system_prompt != session.system_prompt: | |
| session.system_prompt = system_prompt | |
| session.personality = utils.parse_system_prompt(system_prompt) | |
| return session | |
| # Create new session | |
| new_id = session_id or config.generate_session_id() | |
| session = Session(new_id, system_prompt) | |
| self._sessions[new_id] = session | |
| return session | |
| def remove(self, session_id: str): | |
| """Remove a session.""" | |
| with self._lock: | |
| self._sessions.pop(session_id, None) | |
| def cleanup_expired(self): | |
| """Remove expired sessions.""" | |
| with self._lock: | |
| expired = [ | |
| sid for sid, s in self._sessions.items() | |
| if s.is_expired() | |
| ] | |
| for sid in expired: | |
| del self._sessions[sid] | |
| if expired: | |
| print(f"[SESSION] Cleaned up {len(expired)} expired sessions") | |
| def active_count(self) -> int: | |
| return len(self._sessions) | |
| # ═══════════════════════════════════════════════════════════ | |
| # BRAIN — MAIN COGNITIVE ENGINE | |
| # ═══════════════════════════════════════════════════════════ | |
| class Brain: | |
| """ | |
| Central cognitive engine. | |
| Coordinates understanding, reasoning, and response generation. | |
| Usage: | |
| brain = Brain(graph, thinker) | |
| response = brain.process_message(messages, session_id) | |
| """ | |
| def __init__(self, graph: KnowledgeGraph, thinker: Thinker): | |
| self.graph = graph | |
| self.thinker = thinker | |
| self.language = LanguageGenerator() | |
| self.sessions = SessionManager() | |
| # Processing stats | |
| self._total_requests = 0 | |
| self._total_response_time = 0.0 | |
| self._avg_confidence = 0.0 | |
| # ─────────────────────────────────────────────────── | |
| # MAIN ENTRY POINT | |
| # ─────────────────────────────────────────────────── | |
| def process_message( | |
| self, | |
| messages: List[dict], | |
| session_id: str = None, | |
| temperature: float = None | |
| ) -> dict: | |
| """ | |
| Process a chat completion request. | |
| Args: | |
| messages: List of {role, content} messages (OpenAI format) | |
| session_id: Optional session ID for multi-turn | |
| temperature: Response variation (0-1) | |
| Returns: | |
| { | |
| "response": str, # The generated response text | |
| "session_id": str, # Session ID for continuity | |
| "confidence": float, # Response confidence | |
| "reasoning_depth": int, # How deep the reasoning went | |
| "nodes_traversed": int, # How many nodes were visited | |
| "chains_used": int, # How many reasoning chains | |
| "thinking_cycles": int, # Total thinker cycles so far | |
| "processing_time_ms": int # How long this took | |
| } | |
| """ | |
| start_time = time.time() | |
| if temperature is None: | |
| temperature = config.DEFAULT_TEMPERATURE | |
| # ── Extract system prompt and user message ── | |
| system_prompt = "" | |
| user_message = "" | |
| conversation_history = [] | |
| for msg in messages: | |
| role = msg.get("role", "") | |
| content = msg.get("content", "") | |
| if role == "system": | |
| system_prompt = content | |
| elif role == "user": | |
| user_message = content | |
| conversation_history.append(msg) | |
| elif role == "assistant": | |
| conversation_history.append(msg) | |
| if not user_message: | |
| return self._empty_response(session_id, start_time) | |
| # ── Get or create session ── | |
| session = self.sessions.get_or_create(session_id, system_prompt) | |
| session.add_message("user", user_message) | |
| # ── STAGE 1: UNDERSTAND ── | |
| query_analysis = self._understand( | |
| user_message, session, temperature | |
| ) | |
| # ── STAGE 2: REASON ── | |
| reasoning_result = self._reason(query_analysis, session) | |
| # ── STAGE 3: RESPOND ── | |
| response_text = self._respond( | |
| reasoning_result, query_analysis, session | |
| ) | |
| # ── Post-processing ── | |
| session.add_message("assistant", response_text) | |
| # Extract knowledge from user message (async-safe) | |
| self._extract_user_knowledge(user_message) | |
| # Reinforce used chains and edges | |
| self._reinforce_used_knowledge(reasoning_result) | |
| # Update stats | |
| processing_time = time.time() - start_time | |
| self._total_requests += 1 | |
| self._total_response_time += processing_time | |
| result = { | |
| "response": response_text, | |
| "session_id": session.id, | |
| "confidence": reasoning_result.get("confidence", 0.0), | |
| "reasoning_depth": reasoning_result.get("max_depth", 0), | |
| "nodes_traversed": reasoning_result.get("nodes_traversed", 0), | |
| "chains_used": len(reasoning_result.get("chains", [])), | |
| "thinking_cycles": self.thinker.total_cycles, | |
| "processing_time_ms": int(processing_time * 1000) | |
| } | |
| if config.LOG_API_REQUESTS: | |
| print( | |
| f"[BRAIN] Request processed: " | |
| f"confidence={result['confidence']:.2f}, " | |
| f"depth={result['reasoning_depth']}, " | |
| f"nodes={result['nodes_traversed']}, " | |
| f"chains={result['chains_used']}, " | |
| f"time={result['processing_time_ms']}ms" | |
| ) | |
| return result | |
| # ═══════════════════════════════════════════════════ | |
| # STAGE 1: UNDERSTAND | |
| # ═══════════════════════════════════════════════════ | |
| def _understand( | |
| self, | |
| message: str, | |
| session: Session, | |
| temperature: float | |
| ) -> dict: | |
| """ | |
| Parse user message to understand what is being asked. | |
| Returns query_analysis dict: | |
| { | |
| intent: str, | |
| intent_confidence: float, | |
| entities: [str], | |
| keywords: [str], | |
| query_vector: np.ndarray, | |
| temperature: float, | |
| is_followup: bool, | |
| context_entities: [str], | |
| query_text: str, | |
| confidence: float (initial, from intent detection) | |
| } | |
| """ | |
| # ── Detect intent ── | |
| intent, intent_confidence = utils.detect_intent(message) | |
| # ── Check if this is a follow-up ── | |
| is_followup = self._is_followup(message, session) | |
| if is_followup and intent == "general": | |
| intent = "followup" | |
| # ── Extract entities ── | |
| entities = utils.extract_entities_simple(message) | |
| # ── Extract keywords ── | |
| keywords = utils.extract_keywords(message, max_keywords=15) | |
| # If no entities found, use keywords as entities | |
| if not entities and keywords: | |
| entities = keywords[:5] | |
| # ── Build context-enriched query ── | |
| query_parts = [message] | |
| if is_followup and session.context_entities: | |
| # Add recent context for follow-up questions | |
| query_parts.extend(session.context_entities[-5:]) | |
| query_text = " ".join(query_parts) | |
| # ── Compute query vector ── | |
| query_vector = utils.text_to_vector_tfidf(query_text) | |
| # ── If follow-up, blend with previous context vector ── | |
| if is_followup and session.context_node_ids: | |
| context_vectors = [] | |
| for nid in session.context_node_ids[-5:]: | |
| node = self.graph.get_node(nid) | |
| if node: | |
| context_vectors.append(node.vector) | |
| if context_vectors: | |
| context_mean = utils.vector_mean(context_vectors) | |
| # Blend: 70% current query + 30% context | |
| query_vector = utils.normalize( | |
| query_vector * 0.7 + context_mean * 0.3 | |
| ) | |
| # ── Update session context ── | |
| session.add_context_entities(entities) | |
| return { | |
| "intent": intent, | |
| "intent_confidence": intent_confidence, | |
| "entities": entities, | |
| "keywords": keywords, | |
| "query_vector": query_vector, | |
| "temperature": temperature, | |
| "is_followup": is_followup, | |
| "context_entities": list(session.context_entities), | |
| "query_text": query_text, | |
| "confidence": intent_confidence | |
| } | |
| def _is_followup(self, message: str, session: Session) -> bool: | |
| """Detect if message is a follow-up to previous conversation.""" | |
| if session.turn_count == 0: | |
| return False | |
| message_lower = message.lower().strip() | |
| # Short messages after conversation likely follow-ups | |
| if len(message_lower.split()) <= 5 and session.turn_count > 0: | |
| return True | |
| # Pronoun references | |
| followup_indicators = [ | |
| "itu", "tersebut", "nya", "dia", "mereka", | |
| "lanjutkan", "jelaskan lagi", "maksudnya", | |
| "terus", "lalu", "bagaimana dengan", | |
| "it", "that", "they", "them", "those", | |
| "what about", "how about", "tell me more", | |
| "continue", "go on", "elaborate", | |
| "dan", "juga", "selain itu", | |
| ] | |
| for indicator in followup_indicators: | |
| if indicator in message_lower: | |
| return True | |
| return False | |
| # ═══════════════════════════════════════════════════ | |
| # STAGE 2: REASON | |
| # ═══════════════════════════════════════════════════ | |
| def _reason(self, query_analysis: dict, session: Session) -> dict: | |
| """ | |
| Search knowledge graph and build reasoning chains. | |
| Returns reasoning_result dict: | |
| { | |
| chains: [ReasoningChain], | |
| matched_nodes: [(Node, float)], | |
| confidence: float, | |
| max_depth: int, | |
| nodes_traversed: int, | |
| direct_nodes: [Node], | |
| direct_edges: [Edge] | |
| } | |
| """ | |
| query_vector = query_analysis["query_vector"] | |
| entities = query_analysis["entities"] | |
| intent = query_analysis["intent"] | |
| temperature = query_analysis["temperature"] | |
| # ── Step 1: Find matching nodes ── | |
| matched_nodes = self._find_relevant_nodes( | |
| query_vector, entities, session | |
| ) | |
| if not matched_nodes: | |
| return { | |
| "chains": [], | |
| "matched_nodes": [], | |
| "confidence": 0.0, | |
| "max_depth": 0, | |
| "nodes_traversed": 0, | |
| "direct_nodes": [], | |
| "direct_edges": [] | |
| } | |
| # Track traversed nodes | |
| all_traversed_ids = set() | |
| # ── Step 2: Build reasoning chains ── | |
| start_node_ids = [node.id for node, _ in matched_nodes[:5]] | |
| all_traversed_ids.update(start_node_ids) | |
| chains = self.graph.build_reasoning_chains( | |
| start_nodes=start_node_ids, | |
| max_chains=config.MAX_CHAINS_PER_RESPONSE, | |
| max_depth=config.MAX_TRAVERSAL_DEPTH | |
| ) | |
| # Track all nodes in chains | |
| for chain in chains: | |
| for item_id in chain.path: | |
| if item_id in self.graph.nodes: | |
| all_traversed_ids.add(item_id) | |
| # ── Step 3: Intent-specific reasoning ── | |
| if intent == "relation" and len(entities) >= 2: | |
| relation_chains = self._reason_relation(entities, temperature) | |
| chains.extend(relation_chains) | |
| elif intent == "compare" and len(entities) >= 2: | |
| compare_chains = self._reason_comparison(entities, temperature) | |
| chains.extend(compare_chains) | |
| elif intent == "cause": | |
| cause_chains = self._reason_causation(start_node_ids, temperature) | |
| chains.extend(cause_chains) | |
| # ── Step 4: Deduplicate and sort chains ── | |
| seen_chain_ids = set() | |
| unique_chains = [] | |
| for c in chains: | |
| if c.id not in seen_chain_ids: | |
| seen_chain_ids.add(c.id) | |
| unique_chains.append(c) | |
| unique_chains.sort(key=lambda c: c.confidence, reverse=True) | |
| unique_chains = unique_chains[:config.MAX_CHAINS_PER_RESPONSE] | |
| # ── Step 5: Calculate overall confidence ── | |
| if unique_chains: | |
| chain_confidences = [c.confidence for c in unique_chains] | |
| max_match_sim = matched_nodes[0][1] if matched_nodes else 0.0 | |
| overall_confidence = ( | |
| max(chain_confidences) * 0.4 + | |
| (sum(chain_confidences) / len(chain_confidences)) * 0.3 + | |
| max_match_sim * 0.3 | |
| ) | |
| else: | |
| overall_confidence = matched_nodes[0][1] * 0.4 if matched_nodes else 0.0 | |
| overall_confidence = utils.clamp(overall_confidence, 0.0, 1.0) | |
| # ── Step 6: Calculate max reasoning depth ── | |
| max_depth = 0 | |
| for chain in unique_chains: | |
| node_count = sum(1 for i in chain.path if i in self.graph.nodes) | |
| max_depth = max(max_depth, node_count) | |
| # ── Collect direct nodes and edges for fallback generation ── | |
| direct_nodes = [node for node, _ in matched_nodes[:10]] | |
| direct_edges = [] | |
| for node in direct_nodes: | |
| direct_edges.extend(self.graph.get_all_edges_for(node.id)[:5]) | |
| # Update session context with discovered nodes | |
| session.add_context_nodes(list(all_traversed_ids)[:20]) | |
| return { | |
| "chains": unique_chains, | |
| "matched_nodes": matched_nodes, | |
| "confidence": round(overall_confidence, 4), | |
| "max_depth": max_depth, | |
| "nodes_traversed": len(all_traversed_ids), | |
| "direct_nodes": direct_nodes, | |
| "direct_edges": direct_edges | |
| } | |
| def _find_relevant_nodes( | |
| self, | |
| query_vector: np.ndarray, | |
| entities: List[str], | |
| session: Session | |
| ) -> List[Tuple[Node, float]]: | |
| """ | |
| Find nodes relevant to the query using multiple strategies. | |
| Combines vector similarity with entity matching. | |
| """ | |
| all_matches: Dict[str, Tuple[Node, float]] = {} | |
| # ── Strategy 1: Vector similarity search ── | |
| vector_matches = self.graph.find_similar_nodes( | |
| query_vector, | |
| top_k=config.MAX_NODES_PER_SEARCH, | |
| min_similarity=0.2 | |
| ) | |
| for node, sim in vector_matches: | |
| if node.id not in all_matches or sim > all_matches[node.id][1]: | |
| all_matches[node.id] = (node, sim) | |
| # ── Strategy 2: Entity exact/fuzzy match ── | |
| for entity in entities: | |
| # Exact match | |
| exact_node = self.graph.get_node_by_content(entity) | |
| if exact_node: | |
| # Boost exact matches | |
| existing_sim = all_matches.get(exact_node.id, (None, 0))[1] | |
| all_matches[exact_node.id] = (exact_node, max(existing_sim, 0.95)) | |
| # Fuzzy match via vector | |
| entity_vector = utils.text_to_vector_tfidf(entity) | |
| entity_matches = self.graph.find_similar_nodes( | |
| entity_vector, | |
| top_k=5, | |
| min_similarity=0.4 | |
| ) | |
| for node, sim in entity_matches: | |
| # Boost because it matched an entity directly | |
| boosted_sim = min(sim * 1.2, 1.0) | |
| if node.id not in all_matches or boosted_sim > all_matches[node.id][1]: | |
| all_matches[node.id] = (node, boosted_sim) | |
| # ── Strategy 3: Context-based (for follow-ups) ── | |
| if session.context_node_ids: | |
| for ctx_nid in session.context_node_ids[-5:]: | |
| ctx_node = self.graph.get_node(ctx_nid) | |
| if ctx_node: | |
| sim = utils.cosine_similarity(query_vector, ctx_node.vector) | |
| if sim > 0.3: | |
| # Context nodes get moderate boost | |
| boosted = min(sim * 1.1, 1.0) | |
| if ctx_nid not in all_matches or boosted > all_matches[ctx_nid][1]: | |
| all_matches[ctx_nid] = (ctx_node, boosted) | |
| # Sort by similarity descending | |
| results = sorted( | |
| all_matches.values(), | |
| key=lambda x: x[1], | |
| reverse=True | |
| ) | |
| return results[:config.MAX_NODES_PER_SEARCH] | |
| def _reason_relation( | |
| self, entities: List[str], temperature: float | |
| ) -> List[ReasoningChain]: | |
| """Find relationship between two entities.""" | |
| if len(entities) < 2: | |
| return [] | |
| chains = [] | |
| # Find nodes for both entities | |
| node_a = self.graph.get_node_by_content(entities[0]) | |
| node_b = self.graph.get_node_by_content(entities[1]) | |
| if not node_a: | |
| matches = self.graph.find_similar_to_text(entities[0], top_k=1, min_similarity=0.4) | |
| if matches: | |
| node_a = matches[0][0] | |
| if not node_b: | |
| matches = self.graph.find_similar_to_text(entities[1], top_k=1, min_similarity=0.4) | |
| if matches: | |
| node_b = matches[0][0] | |
| if not node_a or not node_b: | |
| return [] | |
| # Find paths between them | |
| paths = self.graph.find_paths( | |
| node_a.id, node_b.id, | |
| max_depth=config.MAX_TRAVERSAL_DEPTH, | |
| max_paths=3 | |
| ) | |
| for path in paths: | |
| confidence = self._score_path(path) | |
| chain = ReasoningChain( | |
| chain_id=config.generate_chain_id(path), | |
| path=path, | |
| conclusion=f"{entities[0]} → {entities[1]}", | |
| confidence=confidence | |
| ) | |
| chains.append(chain) | |
| # Also try reverse direction | |
| reverse_paths = self.graph.find_paths( | |
| node_b.id, node_a.id, | |
| max_depth=config.MAX_TRAVERSAL_DEPTH, | |
| max_paths=2 | |
| ) | |
| for path in reverse_paths: | |
| confidence = self._score_path(path) | |
| chain = ReasoningChain( | |
| chain_id=config.generate_chain_id(path), | |
| path=path, | |
| conclusion=f"{entities[1]} → {entities[0]}", | |
| confidence=confidence | |
| ) | |
| chains.append(chain) | |
| return chains | |
| def _reason_comparison( | |
| self, entities: List[str], temperature: float | |
| ) -> List[ReasoningChain]: | |
| """Build comparison reasoning between entities.""" | |
| if len(entities) < 2: | |
| return [] | |
| chains = [] | |
| for entity in entities[:2]: | |
| matches = self.graph.find_similar_to_text( | |
| entity, top_k=1, min_similarity=0.3 | |
| ) | |
| if matches: | |
| node = matches[0][0] | |
| entity_chains = self.graph.build_reasoning_chains( | |
| [node.id], max_chains=2, max_depth=4 | |
| ) | |
| chains.extend(entity_chains) | |
| return chains | |
| def _reason_causation( | |
| self, start_node_ids: List[str], temperature: float | |
| ) -> List[ReasoningChain]: | |
| """Follow causal chains from starting nodes.""" | |
| chains = [] | |
| for nid in start_node_ids[:3]: | |
| # Follow "causes" edges specifically | |
| current = nid | |
| path = [current] | |
| visited = {current} | |
| for _ in range(config.MAX_TRAVERSAL_DEPTH): | |
| cause_edges = [ | |
| e for e in self.graph.get_edges_from(current) | |
| if e.relation in ("causes", "leads_to", "results_in") | |
| and e.to_node not in visited | |
| ] | |
| if not cause_edges: | |
| break | |
| best = max(cause_edges, key=lambda e: e.confidence) | |
| path.append(best.id) | |
| path.append(best.to_node) | |
| visited.add(best.to_node) | |
| current = best.to_node | |
| if len(path) >= 3: | |
| confidence = self._score_path(path) | |
| chain = ReasoningChain( | |
| chain_id=config.generate_chain_id(path), | |
| path=path, | |
| conclusion="causal_chain", | |
| confidence=confidence | |
| ) | |
| chains.append(chain) | |
| return chains | |
| def _score_path(self, path: list) -> float: | |
| """Score a path for confidence.""" | |
| edge_scores = [] | |
| for item_id in path: | |
| edge = self.graph.get_edge(item_id) | |
| if edge: | |
| edge_scores.append(edge.weight * edge.confidence) | |
| if not edge_scores: | |
| return 0.3 | |
| avg = sum(edge_scores) / len(edge_scores) | |
| length_penalty = 1.0 / (1.0 + 0.1 * len(edge_scores)) | |
| return utils.clamp(avg * length_penalty, 0.0, 1.0) | |
| # ═══════════════════════════════════════════════════ | |
| # STAGE 3: RESPOND | |
| # ═══════════════════════════════════════════════════ | |
| def _respond( | |
| self, | |
| reasoning_result: dict, | |
| query_analysis: dict, | |
| session: Session | |
| ) -> str: | |
| """ | |
| Generate natural language response from reasoning results. | |
| Uses compositional language generation. | |
| """ | |
| chains = reasoning_result.get("chains", []) | |
| confidence = reasoning_result.get("confidence", 0.0) | |
| direct_nodes = reasoning_result.get("direct_nodes", []) | |
| direct_edges = reasoning_result.get("direct_edges", []) | |
| personality = session.personality | |
| lang = personality.get("language", config.DEFAULT_LANGUAGE) | |
| # Merge confidence from reasoning into query analysis | |
| query_analysis_with_confidence = dict(query_analysis) | |
| query_analysis_with_confidence["confidence"] = confidence | |
| graph_stats = self.graph.get_stats() | |
| # ── Primary path: Chain-based generation ── | |
| if chains: | |
| response = self.language.generate_response( | |
| chains=chains, | |
| query_analysis=query_analysis_with_confidence, | |
| personality=personality, | |
| all_nodes=self.graph.nodes, | |
| all_edges=self.graph.edges, | |
| graph_stats=graph_stats | |
| ) | |
| if response and response.strip(): | |
| return response | |
| # ── Fallback: Direct node-based generation ── | |
| if direct_nodes: | |
| response = self.language.generate_from_direct_nodes( | |
| nodes=direct_nodes, | |
| edges=direct_edges, | |
| query_analysis=query_analysis_with_confidence, | |
| personality=personality, | |
| all_nodes=self.graph.nodes, | |
| lang=lang | |
| ) | |
| if response and response.strip(): | |
| return response | |
| # ── Last resort: Honest uncertainty ── | |
| return self._generate_uncertainty_response( | |
| query_analysis, personality, graph_stats, lang | |
| ) | |
| def _generate_uncertainty_response( | |
| self, | |
| query_analysis: dict, | |
| personality: dict, | |
| graph_stats: dict, | |
| lang: str | |
| ) -> str: | |
| """ | |
| Generate an honest uncertainty response. | |
| Built compositionally — NOT a static template. | |
| """ | |
| # Force uncertainty structure | |
| query_analysis_low = dict(query_analysis) | |
| query_analysis_low["confidence"] = 0.1 | |
| # Create minimal chains list (empty) | |
| response = self.language.generate_response( | |
| chains=[], | |
| query_analysis=query_analysis_low, | |
| personality=personality, | |
| all_nodes=self.graph.nodes, | |
| all_edges=self.graph.edges, | |
| graph_stats=graph_stats | |
| ) | |
| if response and response.strip(): | |
| return response | |
| # Absolute last fallback (should rarely reach here) | |
| entities = query_analysis.get("entities", []) | |
| topic = ", ".join(entities[:2]) if entities else "topik tersebut" | |
| rng = utils.seeded_random(utils.variation_seed()) | |
| if lang == "id": | |
| options = [ | |
| f"Saya belum memiliki informasi yang cukup mengenai {topic}. " | |
| f"Pengetahuan saya akan berkembang seiring waktu dan dengan " | |
| f"penambahan data yang relevan.", | |
| f"Mengenai {topic}, pemahaman saya masih terbatas saat ini. " | |
| f"Dengan berjalannya waktu dan penambahan informasi, saya akan " | |
| f"mampu membahas topik ini dengan lebih baik.", | |
| f"Topik {topic} belum tercakup secara memadai dalam pengetahuan " | |
| f"saya saat ini. Saya terus belajar dan memperluas pemahaman " | |
| f"saya secara mandiri.", | |
| ] | |
| else: | |
| options = [ | |
| f"I don't have sufficient information about {topic} yet. " | |
| f"My knowledge grows over time and with the addition of " | |
| f"relevant data.", | |
| f"Regarding {topic}, my understanding is currently limited. " | |
| f"As time goes on and more information is added, I'll be " | |
| f"able to discuss this topic more thoroughly.", | |
| f"The topic of {topic} isn't yet well covered in my " | |
| f"knowledge base. I'm continuously learning and expanding " | |
| f"my understanding autonomously.", | |
| ] | |
| return rng.choice(options) | |
| # ─────────────────────────────────────────────────── | |
| # POST-PROCESSING | |
| # ─────────────────────────────────────────────────── | |
| def _extract_user_knowledge(self, message: str): | |
| """ | |
| Extract knowledge from user message. | |
| Delegates to thinker for knowledge extraction. | |
| Does NOT store raw message. | |
| """ | |
| try: | |
| self.thinker.extract_from_user_message(message) | |
| except Exception as e: | |
| if config.LOG_THINKING_DETAILS: | |
| print(f"[BRAIN] Knowledge extraction error: {e}") | |
| def _reinforce_used_knowledge(self, reasoning_result: dict): | |
| """Reinforce edges and chains that were used in this response.""" | |
| chains = reasoning_result.get("chains", []) | |
| for chain in chains: | |
| # Save and reinforce chain | |
| self.graph.save_chain(chain) | |
| self.graph.reinforce_chain(chain.id) | |
| # ─────────────────────────────────────────────────── | |
| # UTILITY RESPONSES | |
| # ─────────────────────────────────────────────────── | |
| def _empty_response(self, session_id: str, start_time: float) -> dict: | |
| """Return response for empty/invalid input.""" | |
| processing_time = time.time() - start_time | |
| return { | |
| "response": "", | |
| "session_id": session_id or "", | |
| "confidence": 0.0, | |
| "reasoning_depth": 0, | |
| "nodes_traversed": 0, | |
| "chains_used": 0, | |
| "thinking_cycles": self.thinker.total_cycles, | |
| "processing_time_ms": int(processing_time * 1000) | |
| } | |
| # ─────────────────────────────────────────────────── | |
| # STATUS & STATS | |
| # ─────────────────────────────────────────────────── | |
| def get_status(self) -> dict: | |
| """Get comprehensive brain status.""" | |
| graph_stats = self.graph.get_stats() | |
| thinker_status = self.thinker.get_status() | |
| intelligence_score = self.graph.get_intelligence_score() | |
| avg_response_time = ( | |
| (self._total_response_time / self._total_requests * 1000) | |
| if self._total_requests > 0 else 0 | |
| ) | |
| return { | |
| "alive": True, | |
| "intelligence_score": round(intelligence_score, 2), | |
| # Graph stats | |
| "graph": { | |
| "total_nodes": graph_stats["total_nodes"], | |
| "total_edges": graph_stats["total_edges"], | |
| "total_chains": graph_stats["total_chains"], | |
| "inferred_nodes": graph_stats["inferred_nodes"], | |
| "inferred_edges": graph_stats["inferred_edges"], | |
| "max_abstraction_depth": graph_stats["max_abstraction_depth"], | |
| "avg_connections": graph_stats["avg_connections"], | |
| "avg_confidence": graph_stats["avg_confidence"], | |
| "inference_ratio": graph_stats["inference_ratio"], | |
| }, | |
| # Thinker stats | |
| "thinker": { | |
| "running": thinker_status["running"], | |
| "current_phase": thinker_status["current_phase"], | |
| "total_cycles": thinker_status["total_cycles"], | |
| "interval_seconds": thinker_status["interval_seconds"], | |
| "metrics": thinker_status["metrics"], | |
| }, | |
| # API stats | |
| "api": { | |
| "total_requests": self._total_requests, | |
| "avg_response_time_ms": round(avg_response_time, 1), | |
| "active_sessions": self.sessions.active_count, | |
| }, | |
| # Memory stats | |
| "memory": self.graph.memory.get_db_stats(), | |
| } | |
| def cleanup(self): | |
| """Periodic cleanup tasks.""" | |
| self.sessions.cleanup_expired() | |
| def shutdown(self): | |
| """Graceful shutdown.""" | |
| print("[BRAIN] Shutting down...") | |
| self.thinker.stop() | |
| self.graph.force_sync() | |
| self.graph.memory.shutdown() | |
| print("[BRAIN] Shutdown complete.") |