CogniEngine / brain.py
sadidft's picture
Create brain.py
12306e8 verified
"""
Cogni-Engine v1 — Cognitive Engine (Brain)
The central coordinator that connects all components.
Processes user requests through three stages:
1. UNDERSTAND — Parse intent, extract entities, build query
2. REASON — Search graph, traverse, build reasoning chains
3. RESPOND — Generate natural language from chains
Also manages conversation sessions and knowledge extraction.
"""
import time
import threading
from typing import List, Dict, Optional, Tuple, Any
import numpy as np
import config
import utils
from knowledge import KnowledgeGraph, Node, Edge, ReasoningChain
from language import LanguageGenerator
from thinker import Thinker
# ═══════════════════════════════════════════════════════════
# CONVERSATION SESSION
# ═══════════════════════════════════════════════════════════
class Session:
"""
Tracks a multi-turn conversation.
Maintains context window for coherent dialogue.
"""
__slots__ = [
'id', 'messages', 'context_entities', 'context_node_ids',
'system_prompt', 'personality', 'last_active', 'turn_count'
]
def __init__(self, session_id: str, system_prompt: str = ""):
self.id = session_id
self.messages: List[dict] = [] # [{role, content}]
self.context_entities: List[str] = []
self.context_node_ids: List[str] = []
self.system_prompt = system_prompt
self.personality = utils.parse_system_prompt(system_prompt)
self.last_active = time.time()
self.turn_count = 0
def add_message(self, role: str, content: str):
"""Add a message to conversation history."""
self.messages.append({"role": role, "content": content})
# Trim to context window
max_messages = config.CONTEXT_WINDOW_TURNS * 2 # user + assistant pairs
if len(self.messages) > max_messages:
# Keep system prompt awareness but trim old messages
self.messages = self.messages[-max_messages:]
self.last_active = time.time()
self.turn_count += 1
def add_context_entities(self, entities: List[str]):
"""Add discovered entities to session context."""
for e in entities:
if e not in self.context_entities:
self.context_entities.append(e)
# Keep last N entities
self.context_entities = self.context_entities[-30:]
def add_context_nodes(self, node_ids: List[str]):
"""Add discovered node IDs to session context."""
for nid in node_ids:
if nid not in self.context_node_ids:
self.context_node_ids.append(nid)
self.context_node_ids = self.context_node_ids[-50:]
def get_context_text(self) -> str:
"""Get combined context from recent messages."""
recent = self.messages[-config.CONTEXT_WINDOW_TURNS * 2:]
parts = []
for msg in recent:
if msg["role"] == "user":
parts.append(msg["content"])
return " ".join(parts)
def is_expired(self) -> bool:
"""Check if session has expired."""
return (time.time() - self.last_active) > (config.SESSION_TIMEOUT_MINUTES * 60)
# ═══════════════════════════════════════════════════════════
# SESSION MANAGER
# ═══════════════════════════════════════════════════════════
class SessionManager:
"""Manages active conversation sessions."""
def __init__(self):
self._sessions: Dict[str, Session] = {}
self._lock = threading.Lock()
def get_or_create(
self,
session_id: str = None,
system_prompt: str = ""
) -> Session:
"""Get existing session or create new one."""
with self._lock:
if session_id and session_id in self._sessions:
session = self._sessions[session_id]
session.last_active = time.time()
# Update system prompt if changed
if system_prompt and system_prompt != session.system_prompt:
session.system_prompt = system_prompt
session.personality = utils.parse_system_prompt(system_prompt)
return session
# Create new session
new_id = session_id or config.generate_session_id()
session = Session(new_id, system_prompt)
self._sessions[new_id] = session
return session
def remove(self, session_id: str):
"""Remove a session."""
with self._lock:
self._sessions.pop(session_id, None)
def cleanup_expired(self):
"""Remove expired sessions."""
with self._lock:
expired = [
sid for sid, s in self._sessions.items()
if s.is_expired()
]
for sid in expired:
del self._sessions[sid]
if expired:
print(f"[SESSION] Cleaned up {len(expired)} expired sessions")
@property
def active_count(self) -> int:
return len(self._sessions)
# ═══════════════════════════════════════════════════════════
# BRAIN — MAIN COGNITIVE ENGINE
# ═══════════════════════════════════════════════════════════
class Brain:
"""
Central cognitive engine.
Coordinates understanding, reasoning, and response generation.
Usage:
brain = Brain(graph, thinker)
response = brain.process_message(messages, session_id)
"""
def __init__(self, graph: KnowledgeGraph, thinker: Thinker):
self.graph = graph
self.thinker = thinker
self.language = LanguageGenerator()
self.sessions = SessionManager()
# Processing stats
self._total_requests = 0
self._total_response_time = 0.0
self._avg_confidence = 0.0
# ───────────────────────────────────────────────────
# MAIN ENTRY POINT
# ───────────────────────────────────────────────────
def process_message(
self,
messages: List[dict],
session_id: str = None,
temperature: float = None
) -> dict:
"""
Process a chat completion request.
Args:
messages: List of {role, content} messages (OpenAI format)
session_id: Optional session ID for multi-turn
temperature: Response variation (0-1)
Returns:
{
"response": str, # The generated response text
"session_id": str, # Session ID for continuity
"confidence": float, # Response confidence
"reasoning_depth": int, # How deep the reasoning went
"nodes_traversed": int, # How many nodes were visited
"chains_used": int, # How many reasoning chains
"thinking_cycles": int, # Total thinker cycles so far
"processing_time_ms": int # How long this took
}
"""
start_time = time.time()
if temperature is None:
temperature = config.DEFAULT_TEMPERATURE
# ── Extract system prompt and user message ──
system_prompt = ""
user_message = ""
conversation_history = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "system":
system_prompt = content
elif role == "user":
user_message = content
conversation_history.append(msg)
elif role == "assistant":
conversation_history.append(msg)
if not user_message:
return self._empty_response(session_id, start_time)
# ── Get or create session ──
session = self.sessions.get_or_create(session_id, system_prompt)
session.add_message("user", user_message)
# ── STAGE 1: UNDERSTAND ──
query_analysis = self._understand(
user_message, session, temperature
)
# ── STAGE 2: REASON ──
reasoning_result = self._reason(query_analysis, session)
# ── STAGE 3: RESPOND ──
response_text = self._respond(
reasoning_result, query_analysis, session
)
# ── Post-processing ──
session.add_message("assistant", response_text)
# Extract knowledge from user message (async-safe)
self._extract_user_knowledge(user_message)
# Reinforce used chains and edges
self._reinforce_used_knowledge(reasoning_result)
# Update stats
processing_time = time.time() - start_time
self._total_requests += 1
self._total_response_time += processing_time
result = {
"response": response_text,
"session_id": session.id,
"confidence": reasoning_result.get("confidence", 0.0),
"reasoning_depth": reasoning_result.get("max_depth", 0),
"nodes_traversed": reasoning_result.get("nodes_traversed", 0),
"chains_used": len(reasoning_result.get("chains", [])),
"thinking_cycles": self.thinker.total_cycles,
"processing_time_ms": int(processing_time * 1000)
}
if config.LOG_API_REQUESTS:
print(
f"[BRAIN] Request processed: "
f"confidence={result['confidence']:.2f}, "
f"depth={result['reasoning_depth']}, "
f"nodes={result['nodes_traversed']}, "
f"chains={result['chains_used']}, "
f"time={result['processing_time_ms']}ms"
)
return result
# ═══════════════════════════════════════════════════
# STAGE 1: UNDERSTAND
# ═══════════════════════════════════════════════════
def _understand(
self,
message: str,
session: Session,
temperature: float
) -> dict:
"""
Parse user message to understand what is being asked.
Returns query_analysis dict:
{
intent: str,
intent_confidence: float,
entities: [str],
keywords: [str],
query_vector: np.ndarray,
temperature: float,
is_followup: bool,
context_entities: [str],
query_text: str,
confidence: float (initial, from intent detection)
}
"""
# ── Detect intent ──
intent, intent_confidence = utils.detect_intent(message)
# ── Check if this is a follow-up ──
is_followup = self._is_followup(message, session)
if is_followup and intent == "general":
intent = "followup"
# ── Extract entities ──
entities = utils.extract_entities_simple(message)
# ── Extract keywords ──
keywords = utils.extract_keywords(message, max_keywords=15)
# If no entities found, use keywords as entities
if not entities and keywords:
entities = keywords[:5]
# ── Build context-enriched query ──
query_parts = [message]
if is_followup and session.context_entities:
# Add recent context for follow-up questions
query_parts.extend(session.context_entities[-5:])
query_text = " ".join(query_parts)
# ── Compute query vector ──
query_vector = utils.text_to_vector_tfidf(query_text)
# ── If follow-up, blend with previous context vector ──
if is_followup and session.context_node_ids:
context_vectors = []
for nid in session.context_node_ids[-5:]:
node = self.graph.get_node(nid)
if node:
context_vectors.append(node.vector)
if context_vectors:
context_mean = utils.vector_mean(context_vectors)
# Blend: 70% current query + 30% context
query_vector = utils.normalize(
query_vector * 0.7 + context_mean * 0.3
)
# ── Update session context ──
session.add_context_entities(entities)
return {
"intent": intent,
"intent_confidence": intent_confidence,
"entities": entities,
"keywords": keywords,
"query_vector": query_vector,
"temperature": temperature,
"is_followup": is_followup,
"context_entities": list(session.context_entities),
"query_text": query_text,
"confidence": intent_confidence
}
def _is_followup(self, message: str, session: Session) -> bool:
"""Detect if message is a follow-up to previous conversation."""
if session.turn_count == 0:
return False
message_lower = message.lower().strip()
# Short messages after conversation likely follow-ups
if len(message_lower.split()) <= 5 and session.turn_count > 0:
return True
# Pronoun references
followup_indicators = [
"itu", "tersebut", "nya", "dia", "mereka",
"lanjutkan", "jelaskan lagi", "maksudnya",
"terus", "lalu", "bagaimana dengan",
"it", "that", "they", "them", "those",
"what about", "how about", "tell me more",
"continue", "go on", "elaborate",
"dan", "juga", "selain itu",
]
for indicator in followup_indicators:
if indicator in message_lower:
return True
return False
# ═══════════════════════════════════════════════════
# STAGE 2: REASON
# ═══════════════════════════════════════════════════
def _reason(self, query_analysis: dict, session: Session) -> dict:
"""
Search knowledge graph and build reasoning chains.
Returns reasoning_result dict:
{
chains: [ReasoningChain],
matched_nodes: [(Node, float)],
confidence: float,
max_depth: int,
nodes_traversed: int,
direct_nodes: [Node],
direct_edges: [Edge]
}
"""
query_vector = query_analysis["query_vector"]
entities = query_analysis["entities"]
intent = query_analysis["intent"]
temperature = query_analysis["temperature"]
# ── Step 1: Find matching nodes ──
matched_nodes = self._find_relevant_nodes(
query_vector, entities, session
)
if not matched_nodes:
return {
"chains": [],
"matched_nodes": [],
"confidence": 0.0,
"max_depth": 0,
"nodes_traversed": 0,
"direct_nodes": [],
"direct_edges": []
}
# Track traversed nodes
all_traversed_ids = set()
# ── Step 2: Build reasoning chains ──
start_node_ids = [node.id for node, _ in matched_nodes[:5]]
all_traversed_ids.update(start_node_ids)
chains = self.graph.build_reasoning_chains(
start_nodes=start_node_ids,
max_chains=config.MAX_CHAINS_PER_RESPONSE,
max_depth=config.MAX_TRAVERSAL_DEPTH
)
# Track all nodes in chains
for chain in chains:
for item_id in chain.path:
if item_id in self.graph.nodes:
all_traversed_ids.add(item_id)
# ── Step 3: Intent-specific reasoning ──
if intent == "relation" and len(entities) >= 2:
relation_chains = self._reason_relation(entities, temperature)
chains.extend(relation_chains)
elif intent == "compare" and len(entities) >= 2:
compare_chains = self._reason_comparison(entities, temperature)
chains.extend(compare_chains)
elif intent == "cause":
cause_chains = self._reason_causation(start_node_ids, temperature)
chains.extend(cause_chains)
# ── Step 4: Deduplicate and sort chains ──
seen_chain_ids = set()
unique_chains = []
for c in chains:
if c.id not in seen_chain_ids:
seen_chain_ids.add(c.id)
unique_chains.append(c)
unique_chains.sort(key=lambda c: c.confidence, reverse=True)
unique_chains = unique_chains[:config.MAX_CHAINS_PER_RESPONSE]
# ── Step 5: Calculate overall confidence ──
if unique_chains:
chain_confidences = [c.confidence for c in unique_chains]
max_match_sim = matched_nodes[0][1] if matched_nodes else 0.0
overall_confidence = (
max(chain_confidences) * 0.4 +
(sum(chain_confidences) / len(chain_confidences)) * 0.3 +
max_match_sim * 0.3
)
else:
overall_confidence = matched_nodes[0][1] * 0.4 if matched_nodes else 0.0
overall_confidence = utils.clamp(overall_confidence, 0.0, 1.0)
# ── Step 6: Calculate max reasoning depth ──
max_depth = 0
for chain in unique_chains:
node_count = sum(1 for i in chain.path if i in self.graph.nodes)
max_depth = max(max_depth, node_count)
# ── Collect direct nodes and edges for fallback generation ──
direct_nodes = [node for node, _ in matched_nodes[:10]]
direct_edges = []
for node in direct_nodes:
direct_edges.extend(self.graph.get_all_edges_for(node.id)[:5])
# Update session context with discovered nodes
session.add_context_nodes(list(all_traversed_ids)[:20])
return {
"chains": unique_chains,
"matched_nodes": matched_nodes,
"confidence": round(overall_confidence, 4),
"max_depth": max_depth,
"nodes_traversed": len(all_traversed_ids),
"direct_nodes": direct_nodes,
"direct_edges": direct_edges
}
def _find_relevant_nodes(
self,
query_vector: np.ndarray,
entities: List[str],
session: Session
) -> List[Tuple[Node, float]]:
"""
Find nodes relevant to the query using multiple strategies.
Combines vector similarity with entity matching.
"""
all_matches: Dict[str, Tuple[Node, float]] = {}
# ── Strategy 1: Vector similarity search ──
vector_matches = self.graph.find_similar_nodes(
query_vector,
top_k=config.MAX_NODES_PER_SEARCH,
min_similarity=0.2
)
for node, sim in vector_matches:
if node.id not in all_matches or sim > all_matches[node.id][1]:
all_matches[node.id] = (node, sim)
# ── Strategy 2: Entity exact/fuzzy match ──
for entity in entities:
# Exact match
exact_node = self.graph.get_node_by_content(entity)
if exact_node:
# Boost exact matches
existing_sim = all_matches.get(exact_node.id, (None, 0))[1]
all_matches[exact_node.id] = (exact_node, max(existing_sim, 0.95))
# Fuzzy match via vector
entity_vector = utils.text_to_vector_tfidf(entity)
entity_matches = self.graph.find_similar_nodes(
entity_vector,
top_k=5,
min_similarity=0.4
)
for node, sim in entity_matches:
# Boost because it matched an entity directly
boosted_sim = min(sim * 1.2, 1.0)
if node.id not in all_matches or boosted_sim > all_matches[node.id][1]:
all_matches[node.id] = (node, boosted_sim)
# ── Strategy 3: Context-based (for follow-ups) ──
if session.context_node_ids:
for ctx_nid in session.context_node_ids[-5:]:
ctx_node = self.graph.get_node(ctx_nid)
if ctx_node:
sim = utils.cosine_similarity(query_vector, ctx_node.vector)
if sim > 0.3:
# Context nodes get moderate boost
boosted = min(sim * 1.1, 1.0)
if ctx_nid not in all_matches or boosted > all_matches[ctx_nid][1]:
all_matches[ctx_nid] = (ctx_node, boosted)
# Sort by similarity descending
results = sorted(
all_matches.values(),
key=lambda x: x[1],
reverse=True
)
return results[:config.MAX_NODES_PER_SEARCH]
def _reason_relation(
self, entities: List[str], temperature: float
) -> List[ReasoningChain]:
"""Find relationship between two entities."""
if len(entities) < 2:
return []
chains = []
# Find nodes for both entities
node_a = self.graph.get_node_by_content(entities[0])
node_b = self.graph.get_node_by_content(entities[1])
if not node_a:
matches = self.graph.find_similar_to_text(entities[0], top_k=1, min_similarity=0.4)
if matches:
node_a = matches[0][0]
if not node_b:
matches = self.graph.find_similar_to_text(entities[1], top_k=1, min_similarity=0.4)
if matches:
node_b = matches[0][0]
if not node_a or not node_b:
return []
# Find paths between them
paths = self.graph.find_paths(
node_a.id, node_b.id,
max_depth=config.MAX_TRAVERSAL_DEPTH,
max_paths=3
)
for path in paths:
confidence = self._score_path(path)
chain = ReasoningChain(
chain_id=config.generate_chain_id(path),
path=path,
conclusion=f"{entities[0]}{entities[1]}",
confidence=confidence
)
chains.append(chain)
# Also try reverse direction
reverse_paths = self.graph.find_paths(
node_b.id, node_a.id,
max_depth=config.MAX_TRAVERSAL_DEPTH,
max_paths=2
)
for path in reverse_paths:
confidence = self._score_path(path)
chain = ReasoningChain(
chain_id=config.generate_chain_id(path),
path=path,
conclusion=f"{entities[1]}{entities[0]}",
confidence=confidence
)
chains.append(chain)
return chains
def _reason_comparison(
self, entities: List[str], temperature: float
) -> List[ReasoningChain]:
"""Build comparison reasoning between entities."""
if len(entities) < 2:
return []
chains = []
for entity in entities[:2]:
matches = self.graph.find_similar_to_text(
entity, top_k=1, min_similarity=0.3
)
if matches:
node = matches[0][0]
entity_chains = self.graph.build_reasoning_chains(
[node.id], max_chains=2, max_depth=4
)
chains.extend(entity_chains)
return chains
def _reason_causation(
self, start_node_ids: List[str], temperature: float
) -> List[ReasoningChain]:
"""Follow causal chains from starting nodes."""
chains = []
for nid in start_node_ids[:3]:
# Follow "causes" edges specifically
current = nid
path = [current]
visited = {current}
for _ in range(config.MAX_TRAVERSAL_DEPTH):
cause_edges = [
e for e in self.graph.get_edges_from(current)
if e.relation in ("causes", "leads_to", "results_in")
and e.to_node not in visited
]
if not cause_edges:
break
best = max(cause_edges, key=lambda e: e.confidence)
path.append(best.id)
path.append(best.to_node)
visited.add(best.to_node)
current = best.to_node
if len(path) >= 3:
confidence = self._score_path(path)
chain = ReasoningChain(
chain_id=config.generate_chain_id(path),
path=path,
conclusion="causal_chain",
confidence=confidence
)
chains.append(chain)
return chains
def _score_path(self, path: list) -> float:
"""Score a path for confidence."""
edge_scores = []
for item_id in path:
edge = self.graph.get_edge(item_id)
if edge:
edge_scores.append(edge.weight * edge.confidence)
if not edge_scores:
return 0.3
avg = sum(edge_scores) / len(edge_scores)
length_penalty = 1.0 / (1.0 + 0.1 * len(edge_scores))
return utils.clamp(avg * length_penalty, 0.0, 1.0)
# ═══════════════════════════════════════════════════
# STAGE 3: RESPOND
# ═══════════════════════════════════════════════════
def _respond(
self,
reasoning_result: dict,
query_analysis: dict,
session: Session
) -> str:
"""
Generate natural language response from reasoning results.
Uses compositional language generation.
"""
chains = reasoning_result.get("chains", [])
confidence = reasoning_result.get("confidence", 0.0)
direct_nodes = reasoning_result.get("direct_nodes", [])
direct_edges = reasoning_result.get("direct_edges", [])
personality = session.personality
lang = personality.get("language", config.DEFAULT_LANGUAGE)
# Merge confidence from reasoning into query analysis
query_analysis_with_confidence = dict(query_analysis)
query_analysis_with_confidence["confidence"] = confidence
graph_stats = self.graph.get_stats()
# ── Primary path: Chain-based generation ──
if chains:
response = self.language.generate_response(
chains=chains,
query_analysis=query_analysis_with_confidence,
personality=personality,
all_nodes=self.graph.nodes,
all_edges=self.graph.edges,
graph_stats=graph_stats
)
if response and response.strip():
return response
# ── Fallback: Direct node-based generation ──
if direct_nodes:
response = self.language.generate_from_direct_nodes(
nodes=direct_nodes,
edges=direct_edges,
query_analysis=query_analysis_with_confidence,
personality=personality,
all_nodes=self.graph.nodes,
lang=lang
)
if response and response.strip():
return response
# ── Last resort: Honest uncertainty ──
return self._generate_uncertainty_response(
query_analysis, personality, graph_stats, lang
)
def _generate_uncertainty_response(
self,
query_analysis: dict,
personality: dict,
graph_stats: dict,
lang: str
) -> str:
"""
Generate an honest uncertainty response.
Built compositionally — NOT a static template.
"""
# Force uncertainty structure
query_analysis_low = dict(query_analysis)
query_analysis_low["confidence"] = 0.1
# Create minimal chains list (empty)
response = self.language.generate_response(
chains=[],
query_analysis=query_analysis_low,
personality=personality,
all_nodes=self.graph.nodes,
all_edges=self.graph.edges,
graph_stats=graph_stats
)
if response and response.strip():
return response
# Absolute last fallback (should rarely reach here)
entities = query_analysis.get("entities", [])
topic = ", ".join(entities[:2]) if entities else "topik tersebut"
rng = utils.seeded_random(utils.variation_seed())
if lang == "id":
options = [
f"Saya belum memiliki informasi yang cukup mengenai {topic}. "
f"Pengetahuan saya akan berkembang seiring waktu dan dengan "
f"penambahan data yang relevan.",
f"Mengenai {topic}, pemahaman saya masih terbatas saat ini. "
f"Dengan berjalannya waktu dan penambahan informasi, saya akan "
f"mampu membahas topik ini dengan lebih baik.",
f"Topik {topic} belum tercakup secara memadai dalam pengetahuan "
f"saya saat ini. Saya terus belajar dan memperluas pemahaman "
f"saya secara mandiri.",
]
else:
options = [
f"I don't have sufficient information about {topic} yet. "
f"My knowledge grows over time and with the addition of "
f"relevant data.",
f"Regarding {topic}, my understanding is currently limited. "
f"As time goes on and more information is added, I'll be "
f"able to discuss this topic more thoroughly.",
f"The topic of {topic} isn't yet well covered in my "
f"knowledge base. I'm continuously learning and expanding "
f"my understanding autonomously.",
]
return rng.choice(options)
# ───────────────────────────────────────────────────
# POST-PROCESSING
# ───────────────────────────────────────────────────
def _extract_user_knowledge(self, message: str):
"""
Extract knowledge from user message.
Delegates to thinker for knowledge extraction.
Does NOT store raw message.
"""
try:
self.thinker.extract_from_user_message(message)
except Exception as e:
if config.LOG_THINKING_DETAILS:
print(f"[BRAIN] Knowledge extraction error: {e}")
def _reinforce_used_knowledge(self, reasoning_result: dict):
"""Reinforce edges and chains that were used in this response."""
chains = reasoning_result.get("chains", [])
for chain in chains:
# Save and reinforce chain
self.graph.save_chain(chain)
self.graph.reinforce_chain(chain.id)
# ───────────────────────────────────────────────────
# UTILITY RESPONSES
# ───────────────────────────────────────────────────
def _empty_response(self, session_id: str, start_time: float) -> dict:
"""Return response for empty/invalid input."""
processing_time = time.time() - start_time
return {
"response": "",
"session_id": session_id or "",
"confidence": 0.0,
"reasoning_depth": 0,
"nodes_traversed": 0,
"chains_used": 0,
"thinking_cycles": self.thinker.total_cycles,
"processing_time_ms": int(processing_time * 1000)
}
# ───────────────────────────────────────────────────
# STATUS & STATS
# ───────────────────────────────────────────────────
def get_status(self) -> dict:
"""Get comprehensive brain status."""
graph_stats = self.graph.get_stats()
thinker_status = self.thinker.get_status()
intelligence_score = self.graph.get_intelligence_score()
avg_response_time = (
(self._total_response_time / self._total_requests * 1000)
if self._total_requests > 0 else 0
)
return {
"alive": True,
"intelligence_score": round(intelligence_score, 2),
# Graph stats
"graph": {
"total_nodes": graph_stats["total_nodes"],
"total_edges": graph_stats["total_edges"],
"total_chains": graph_stats["total_chains"],
"inferred_nodes": graph_stats["inferred_nodes"],
"inferred_edges": graph_stats["inferred_edges"],
"max_abstraction_depth": graph_stats["max_abstraction_depth"],
"avg_connections": graph_stats["avg_connections"],
"avg_confidence": graph_stats["avg_confidence"],
"inference_ratio": graph_stats["inference_ratio"],
},
# Thinker stats
"thinker": {
"running": thinker_status["running"],
"current_phase": thinker_status["current_phase"],
"total_cycles": thinker_status["total_cycles"],
"interval_seconds": thinker_status["interval_seconds"],
"metrics": thinker_status["metrics"],
},
# API stats
"api": {
"total_requests": self._total_requests,
"avg_response_time_ms": round(avg_response_time, 1),
"active_sessions": self.sessions.active_count,
},
# Memory stats
"memory": self.graph.memory.get_db_stats(),
}
def cleanup(self):
"""Periodic cleanup tasks."""
self.sessions.cleanup_expired()
def shutdown(self):
"""Graceful shutdown."""
print("[BRAIN] Shutting down...")
self.thinker.stop()
self.graph.force_sync()
self.graph.memory.shutdown()
print("[BRAIN] Shutdown complete.")