Spaces:
Sleeping
Sleeping
| """Memory Agent: ?????? ?????? ???????.""" | |
| import logging | |
| from typing import Any, Dict | |
| from memory.short_term import ShortTermMemory | |
| from memory.episodic_db import EpisodicDB | |
| from memory.knowledge_vector import KnowledgeVectorStore | |
| from api.deps import get_logger, is_remote_brain_only | |
| from agents.supervisor_agent import AgentOS | |
| logger = get_logger("kapo.agent.memory") | |
| class MemoryAgent: | |
| def __init__(self): | |
| self.remote_only = is_remote_brain_only() | |
| self.short = ShortTermMemory() | |
| self.episodic = None if self.remote_only else EpisodicDB() | |
| self.knowledge = None if self.remote_only else KnowledgeVectorStore() | |
| self.agent_os = None if self.remote_only else AgentOS() | |
| def write_short_term(self, key: str, value: Dict[str, Any]) -> None: | |
| if self.remote_only: | |
| return | |
| self.short.set(key, value) | |
| def read_short_term(self, key: str) -> Dict[str, Any] | None: | |
| if self.remote_only: | |
| return None | |
| return self.short.get(key) | |
| def store_experience(self, payload: Dict[str, Any]) -> None: | |
| if self.remote_only or self.episodic is None: | |
| return | |
| self.episodic.insert_experience( | |
| task=payload.get("task", ""), | |
| plan=payload.get("plan", {}), | |
| tools_used=payload.get("tools_used", {}), | |
| result=payload.get("result", {}), | |
| success=1 if payload.get("success") else 0, | |
| ) | |
| def query_knowledge(self, text: str, top_k: int = 3): | |
| if self.remote_only or self.knowledge is None: | |
| return [] | |
| return self.knowledge.query(text, top_k=top_k) | |
| def run_agent_os(self): | |
| """????? ???? ??????? ?????? ??????.""" | |
| if self.remote_only or self.agent_os is None: | |
| return {"skipped": True, "reason": "REMOTE_BRAIN_ONLY"} | |
| return self.agent_os.run() | |