| """Cross-universe memory layer: embeddings, vector DB, knowledge graph, object store stubs."""
|
| from typing import Dict, Any, List
|
| from .adapters.vector_adapter import InMemoryVectorIndex
|
| from .adapters.graph_adapter import NetworkXGraph, Neo4jAdapter
|
| try:
|
| from .adapters.vector_adapter_full import FaissIndex
|
| except Exception:
|
| FaissIndex = None
|
| import json
|
| import os
|
| import os as _os
|
|
|
|
|
| class MemoryManager:
|
| def __init__(self, storage_dir: str = "./memory_store"):
|
|
|
| if _os.environ.get("USE_FAISS", "0") == "1" and FaissIndex is not None:
|
| try:
|
| dim = int(_os.environ.get("FAISS_DIM", "32"))
|
| self.vindex = FaissIndex(dim)
|
| except Exception:
|
| self.vindex = InMemoryVectorIndex()
|
| else:
|
| self.vindex = InMemoryVectorIndex()
|
| self.kg = NetworkXGraph()
|
| self.storage_dir = storage_dir
|
| os.makedirs(self.storage_dir, exist_ok=True)
|
|
|
| self.neo4j = None
|
| try:
|
| if _os.environ.get("USE_NEO4J", "0") == "1":
|
| self.neo4j = Neo4jAdapter(uri=_os.environ.get("NEO4J_URI"), user=_os.environ.get("NEO4J_USER"), password=_os.environ.get("NEO4J_PASSWORD"))
|
| if not self.neo4j.is_available():
|
| self.neo4j = None
|
| except Exception:
|
| self.neo4j = None
|
|
|
| def index_universe(self, universe_id: str, embedding: List[float], metadata: Dict[str, Any] = None):
|
| self.vindex.upsert(universe_id, embedding)
|
| self.kg.add_node(universe_id, **(metadata or {}))
|
| if self.neo4j:
|
| try:
|
| props = {"id": universe_id}
|
| if metadata:
|
| props.update(metadata)
|
| self.neo4j.create_node(labels=["Universe"], props=props)
|
| except Exception:
|
| pass
|
|
|
| def snapshot_universe(self, universe_id: str, universe_obj: Any):
|
| path = os.path.join(self.storage_dir, f"{universe_id}.json")
|
| with open(path, "w", encoding="utf-8") as f:
|
| json.dump(universe_obj, f, default=str)
|
| return path
|
|
|
| def query_similar(self, embedding: List[float], top_k: int = 10):
|
| return self.vindex.search(embedding, top_k=top_k)
|
|
|