Tools / Modules /Memory_Manager.py
chmielvu's picture
Upload folder using huggingface_hub
588592f verified
raw
history blame
11.2 kB
"""
Memory Manager - Graphiti Knowledge Graph Interface.
Provides unified memory operations using the same Graphiti instance
configured for Claude Code's Graphiti MCP server.
Configuration (must match Graphiti MCP):
- FALKORDB_URI: redis://localhost:6379 (default)
- FALKORDB_DATABASE: graphiti (default)
- MISTRAL_API_KEY: Required for entity extraction
- GRAPHITI_GROUP_ID: main (default)
"""
from __future__ import annotations
import os
from datetime import datetime, timezone
from typing import Annotated, Literal, Optional
import gradio as gr
from ._docstrings import autodoc
# Graphiti configuration - matches Graphiti MCP server
FALKORDB_URI = os.getenv("FALKORDB_URI", "redis://localhost:6379")
FALKORDB_DATABASE = os.getenv("FALKORDB_DATABASE", "graphiti")
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY", "")
GRAPHITI_GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "main")
# Check if Graphiti is available
GRAPHITI_AVAILABLE = bool(MISTRAL_API_KEY)
# Lazy-loaded Graphiti client
_graphiti_client = None
def _get_graphiti_client():
"""Get or create the Graphiti client (lazy load to avoid import errors)."""
global _graphiti_client
if _graphiti_client is None and GRAPHITI_AVAILABLE:
try:
from graphiti_core import Graphiti
from graphiti_core.llm_client import OpenAIClient
from graphiti_core.driver.falkordb_driver import FalkorDriver
# Create FalkorDB driver
driver = FalkorDriver(
uri=FALKORDB_URI,
database=FALKORDB_DATABASE,
)
# Create Mistral LLM client (OpenAI-compatible API)
llm_client = OpenAIClient(
api_key=MISTRAL_API_KEY,
base_url="https://api.mistral.ai/v1",
model="mistral-large-2411",
)
# Create Graphiti client
_graphiti_client = Graphiti(
uri=FALKORDB_URI,
driver=driver,
llm_client=llm_client,
)
except ImportError as e:
print(f"[Memory_Manager] Graphiti not available: {e}")
return None
except Exception as e:
print(f"[Memory_Manager] Failed to initialize Graphiti: {e}")
return None
return _graphiti_client
def _format_timestamp() -> str:
"""Return current UTC timestamp in ISO format."""
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
# ============================================================================
# Graphiti Memory Operations
# ============================================================================
def _graphiti_save(text: str, tags: str) -> str:
"""Save memory to Graphiti knowledge graph."""
if not GRAPHITI_AVAILABLE:
return "Error: MISTRAL_API_KEY not set. Cannot save to Graphiti."
client = _get_graphiti_client()
if not client:
return "Error: Failed to initialize Graphiti client."
try:
# Build episode body with tags
episode_body = text.strip()
if tags and tags.strip():
episode_body = f"{text.strip()}\n\nTags: {tags.strip()}"
# Add episode to Graphiti
import asyncio
async def _save():
return await client.add_episode(
name=f"Memory {_format_timestamp()}",
episode_body=episode_body,
source_description="Memory_Manager tool",
group_id=GRAPHITI_GROUP_ID,
)
result = asyncio.run(_save())
return f"Memory saved to Graphiti knowledge graph (group: {GRAPHITI_GROUP_ID})"
except Exception as e:
return f"Error saving to Graphiti: {e}"
def _graphiti_list(limit: int, include_tags: bool) -> str:
"""List recent memories from Graphiti."""
if not GRAPHITI_AVAILABLE:
return "Error: MISTRAL_API_KEY not set. Cannot access Graphiti."
client = _get_graphiti_client()
if not client:
return "Error: Failed to initialize Graphiti client."
try:
import asyncio
async def _list():
# Get episodes from Graphiti
return await client.get_episodes(
group_ids=[GRAPHITI_GROUP_ID],
limit=limit,
)
episodes = asyncio.run(_list())
if not episodes:
return f"No memories found in Graphiti (group: {GRAPHITI_GROUP_ID})"
lines = [f"Graphiti Memories (group: {GRAPHITI_GROUP_ID})", "-" * 50]
for ep in episodes:
name = ep.name if hasattr(ep, "name") else "?"
created = ep.created_at if hasattr(ep, "created_at") else "?"
content = ep.content if hasattr(ep, "content") else str(ep)
# Extract tags from content if present
tags_str = ""
if include_tags and "Tags:" in content:
parts = content.split("Tags:")
if len(parts) > 1:
tags_str = f" | tags: {parts[1].strip()}"
content = parts[0].strip()
lines.append(f"[{created}] {content[:100]}{'...' if len(content) > 100 else ''}{tags_str}")
return "\n".join(lines)
except Exception as e:
return f"Error listing from Graphiti: {e}"
def _graphiti_search(query: str, limit: int) -> str:
"""Search memories in Graphiti knowledge graph."""
if not GRAPHITI_AVAILABLE:
return "Error: MISTRAL_API_KEY not set. Cannot search Graphiti."
client = _get_graphiti_client()
if not client:
return "Error: Failed to initialize Graphiti client."
try:
import asyncio
async def _search():
# Use Graphiti's hybrid search
return await client.search(
query=query,
group_ids=[GRAPHITI_GROUP_ID],
num_results=limit,
)
results = asyncio.run(_search())
if not results:
return f"No matches found for: {query}"
lines = [f"Graphiti Search Results for: {query}", "-" * 50]
for i, result in enumerate(results, 1):
if hasattr(result, "fact"):
# Edge/fact result
source = getattr(result, "source_node", "?")
target = getattr(result, "target_node", "?")
fact = result.fact
lines.append(f"{i}. {source} -> {target}: {fact}")
elif hasattr(result, "name"):
# Node result
name = result.name
summary = getattr(result, "summary", "")
lines.append(f"{i}. [{name}] {summary[:150]}{'...' if len(summary) > 150 else ''}")
else:
lines.append(f"{i}. {str(result)[:150]}")
return "\n".join(lines)
except Exception as e:
return f"Error searching Graphiti: {e}"
def _graphiti_delete(memory_id: str) -> str:
"""Delete memory from Graphiti (requires episode UUID)."""
if not GRAPHITI_AVAILABLE:
return "Error: MISTRAL_API_KEY not set. Cannot access Graphiti."
# Note: Graphiti deletion requires the full episode UUID
# This is a simplified implementation
return f"Note: To delete from Graphiti, use the Graphiti MCP directly with the episode UUID. Memory deletion is not fully implemented in this interface."
# ============================================================================
# Status Check
# ============================================================================
def _get_status() -> str:
"""Get Graphiti connection status."""
if not GRAPHITI_AVAILABLE:
return "Status: MISTRAL_API_KEY not configured"
client = _get_graphiti_client()
if client:
return f"Status: Connected to Graphiti\nDatabase: {FALKORDB_DATABASE}\nGroup: {GRAPHITI_GROUP_ID}"
return "Status: Failed to initialize Graphiti client"
# ============================================================================
# Main Tool Function
# ============================================================================
TOOL_SUMMARY = (
"Manage memories in Graphiti knowledge graph (save, list, search, status). "
"Connects to the same Graphiti instance as the Graphiti MCP server. "
"Requires MISTRAL_API_KEY for entity extraction and knowledge graph operations."
)
@autodoc(summary=TOOL_SUMMARY)
def Memory_Manager(
action: Annotated[Literal["save", "list", "search", "status"], "Action: save | list | search | status"] = "list",
text: Annotated[Optional[str], "Memory text (Save only)"] = None,
tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None,
query: Annotated[Optional[str], "Search query (Search only)"] = None,
limit: Annotated[int, "Max results (List/Search only)"] = 20,
include_tags: Annotated[bool, "Include tags in output"] = True,
) -> str:
"""
Memory Manager - Graphiti Knowledge Graph Interface.
Connects to the same Graphiti instance used by Claude Code's Graphiti MCP.
All memories are stored in the knowledge graph with automatic entity extraction
and relationship detection.
"""
act = (action or "list").lower().strip()
if act == "status":
return _get_status()
if act == "save":
text = (text or "").strip()
if not text:
return "Error: 'text' is required when action=save."
return _graphiti_save(text=text, tags=tags or "")
if act == "list":
return _graphiti_list(limit=max(1, min(200, limit)), include_tags=include_tags)
if act == "search":
query = (query or "").strip()
if not query:
return "Error: 'query' is required when action=search."
return _graphiti_search(query=query, limit=max(1, min(200, limit)))
return "Error: invalid action (use save|list|search|status)."
def build_interface() -> gr.Interface:
"""Build Gradio interface for Memory Manager."""
status_info = _get_status()
return gr.Interface(
fn=Memory_Manager,
inputs=[
gr.Radio(
label="Action",
choices=["save", "list", "search", "status"],
value="status",
info="Action to perform",
),
gr.Textbox(label="Text", lines=3, info="Memory text (Save only)"),
gr.Textbox(label="Tags", placeholder="tag1, tag2", max_lines=1, info="Comma-separated tags (Save only)"),
gr.Textbox(label="Query", placeholder="search terms...", max_lines=1, info="Search query (Search only)"),
gr.Slider(1, 200, value=20, step=1, label="Limit", info="Max results (List/Search only)"),
gr.Checkbox(value=True, label="Include Tags", info="Include tags in output"),
],
outputs=gr.Textbox(label="Result", lines=14),
title="Memory Manager - Graphiti",
description=f"<div style='text-align:center'><strong>{status_info}</strong><br/>Knowledge graph memory with entity extraction</div>",
api_description=TOOL_SUMMARY,
flagging_mode="never",
)
__all__ = [
"Memory_Manager",
"build_interface",
"GRAPHITI_AVAILABLE",
"GRAPHITI_GROUP_ID",
]