agent-memory / memory /events.py
Chris4K's picture
Upload 17 files
86a0172 verified
"""
Episodic Memory – Past Tasks & Events
=======================================
Stores discrete events / task completions as Markdown files
under memory/events/*.md
Each event has a timestamp, outcome, and optional linked entities.
Supports keyword search and time-range queries.
"""
from __future__ import annotations
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from .models import MemoryEntry, MemoryTier
class EpisodicMemory:
"""File-backed store for task / event memories."""
def __init__(self, base_dir: str = "memory/events"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
# id β†’ MemoryEntry (in-memory index)
self._index: Dict[str, MemoryEntry] = {}
self._load_from_disk()
# ── CRUD ─────────────────────────────────────────────────
def create(
self,
content: str,
title: str = "",
tags: Optional[List[str]] = None,
importance: float = 0.5,
metadata: Optional[Dict[str, Any]] = None,
source: str = "",
) -> MemoryEntry:
entry = MemoryEntry(
content=content,
title=title or self._auto_title(content),
tier=MemoryTier.EPISODIC,
tags=tags or [],
importance=importance,
metadata=metadata or {},
source=source,
created_at=datetime.utcnow().isoformat(),
updated_at=datetime.utcnow().isoformat(),
)
self._index[entry.id] = entry
self._persist(entry)
return entry
def read(self, entry_id: str) -> Optional[MemoryEntry]:
entry = self._index.get(entry_id)
if entry:
entry.access_count += 1
entry.updated_at = datetime.utcnow().isoformat()
self._persist(entry)
return entry
def update(self, entry_id: str, **kwargs) -> Optional[MemoryEntry]:
entry = self._index.get(entry_id)
if not entry:
return None
for k, v in kwargs.items():
if hasattr(entry, k) and k not in ("id", "tier", "created_at"):
setattr(entry, k, v)
entry.updated_at = datetime.utcnow().isoformat()
self._persist(entry)
return entry
def delete(self, entry_id: str) -> bool:
if entry_id not in self._index:
return False
del self._index[entry_id]
path = self._entry_path(entry_id)
if path.exists():
path.unlink()
return True
def list_entries(
self,
tag: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
limit: int = 50,
) -> List[MemoryEntry]:
"""List events, optionally filtered by tag and/or time range."""
entries = list(self._index.values())
if tag:
entries = [e for e in entries if tag in e.tags]
if since:
entries = [e for e in entries if e.created_at >= since]
if until:
entries = [e for e in entries if e.created_at <= until]
# newest first
entries.sort(key=lambda e: e.created_at, reverse=True)
return entries[:limit]
def search(self, query: str, limit: int = 10) -> List[MemoryEntry]:
"""Keyword search across episodic memories."""
q = query.lower()
scored: List[tuple] = []
for entry in self._index.values():
text = f"{entry.title} {entry.content} {' '.join(entry.tags)}".lower()
if q in text:
# rudimentary relevance: importance + recency
scored.append((entry, entry.importance))
scored.sort(key=lambda x: x[1], reverse=True)
return [e for e, _ in scored[:limit]]
def count(self) -> int:
return len(self._index)
# ── timeline helpers ─────────────────────────────────────
def recent(self, n: int = 10) -> List[MemoryEntry]:
"""Get the N most recent events."""
entries = sorted(self._index.values(), key=lambda e: e.created_at, reverse=True)
return entries[:n]
def by_tag(self, tag: str) -> List[MemoryEntry]:
return [e for e in self._index.values() if tag in e.tags]
# ── persistence ──────────────────────────────────────────
def _entry_path(self, entry_id: str) -> Path:
return self.base_dir / f"{entry_id}.md"
def _persist(self, entry: MemoryEntry):
path = self._entry_path(entry.id)
path.write_text(entry.to_markdown(), encoding="utf-8")
def _load_from_disk(self):
for md_file in self.base_dir.glob("*.md"):
try:
text = md_file.read_text(encoding="utf-8")
entry = MemoryEntry.from_markdown(text)
entry.tier = MemoryTier.EPISODIC
self._index[entry.id] = entry
except Exception:
pass
@staticmethod
def _auto_title(content: str) -> str:
"""Generate a short title from content."""
first_line = content.strip().split("\n")[0][:80]
return first_line if first_line else "Untitled Event"