Spaces:
Running
Running
| """ | |
| Episodic Memory β Past Tasks & Events | |
| ======================================= | |
| Stores discrete events / task completions as Markdown files | |
| under memory/events/*.md | |
| Each event has a timestamp, outcome, and optional linked entities. | |
| Supports keyword search and time-range queries. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from .models import MemoryEntry, MemoryTier | |
| class EpisodicMemory: | |
| """File-backed store for task / event memories.""" | |
| def __init__(self, base_dir: str = "memory/events"): | |
| self.base_dir = Path(base_dir) | |
| self.base_dir.mkdir(parents=True, exist_ok=True) | |
| # id β MemoryEntry (in-memory index) | |
| self._index: Dict[str, MemoryEntry] = {} | |
| self._load_from_disk() | |
| # ββ CRUD βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create( | |
| self, | |
| content: str, | |
| title: str = "", | |
| tags: Optional[List[str]] = None, | |
| importance: float = 0.5, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| source: str = "", | |
| ) -> MemoryEntry: | |
| entry = MemoryEntry( | |
| content=content, | |
| title=title or self._auto_title(content), | |
| tier=MemoryTier.EPISODIC, | |
| tags=tags or [], | |
| importance=importance, | |
| metadata=metadata or {}, | |
| source=source, | |
| created_at=datetime.utcnow().isoformat(), | |
| updated_at=datetime.utcnow().isoformat(), | |
| ) | |
| self._index[entry.id] = entry | |
| self._persist(entry) | |
| return entry | |
| def read(self, entry_id: str) -> Optional[MemoryEntry]: | |
| entry = self._index.get(entry_id) | |
| if entry: | |
| entry.access_count += 1 | |
| entry.updated_at = datetime.utcnow().isoformat() | |
| self._persist(entry) | |
| return entry | |
| def update(self, entry_id: str, **kwargs) -> Optional[MemoryEntry]: | |
| entry = self._index.get(entry_id) | |
| if not entry: | |
| return None | |
| for k, v in kwargs.items(): | |
| if hasattr(entry, k) and k not in ("id", "tier", "created_at"): | |
| setattr(entry, k, v) | |
| entry.updated_at = datetime.utcnow().isoformat() | |
| self._persist(entry) | |
| return entry | |
| def delete(self, entry_id: str) -> bool: | |
| if entry_id not in self._index: | |
| return False | |
| del self._index[entry_id] | |
| path = self._entry_path(entry_id) | |
| if path.exists(): | |
| path.unlink() | |
| return True | |
| def list_entries( | |
| self, | |
| tag: Optional[str] = None, | |
| since: Optional[str] = None, | |
| until: Optional[str] = None, | |
| limit: int = 50, | |
| ) -> List[MemoryEntry]: | |
| """List events, optionally filtered by tag and/or time range.""" | |
| entries = list(self._index.values()) | |
| if tag: | |
| entries = [e for e in entries if tag in e.tags] | |
| if since: | |
| entries = [e for e in entries if e.created_at >= since] | |
| if until: | |
| entries = [e for e in entries if e.created_at <= until] | |
| # newest first | |
| entries.sort(key=lambda e: e.created_at, reverse=True) | |
| return entries[:limit] | |
| def search(self, query: str, limit: int = 10) -> List[MemoryEntry]: | |
| """Keyword search across episodic memories.""" | |
| q = query.lower() | |
| scored: List[tuple] = [] | |
| for entry in self._index.values(): | |
| text = f"{entry.title} {entry.content} {' '.join(entry.tags)}".lower() | |
| if q in text: | |
| # rudimentary relevance: importance + recency | |
| scored.append((entry, entry.importance)) | |
| scored.sort(key=lambda x: x[1], reverse=True) | |
| return [e for e, _ in scored[:limit]] | |
| def count(self) -> int: | |
| return len(self._index) | |
| # ββ timeline helpers βββββββββββββββββββββββββββββββββββββ | |
| def recent(self, n: int = 10) -> List[MemoryEntry]: | |
| """Get the N most recent events.""" | |
| entries = sorted(self._index.values(), key=lambda e: e.created_at, reverse=True) | |
| return entries[:n] | |
| def by_tag(self, tag: str) -> List[MemoryEntry]: | |
| return [e for e in self._index.values() if tag in e.tags] | |
| # ββ persistence ββββββββββββββββββββββββββββββββββββββββββ | |
| def _entry_path(self, entry_id: str) -> Path: | |
| return self.base_dir / f"{entry_id}.md" | |
| def _persist(self, entry: MemoryEntry): | |
| path = self._entry_path(entry.id) | |
| path.write_text(entry.to_markdown(), encoding="utf-8") | |
| def _load_from_disk(self): | |
| for md_file in self.base_dir.glob("*.md"): | |
| try: | |
| text = md_file.read_text(encoding="utf-8") | |
| entry = MemoryEntry.from_markdown(text) | |
| entry.tier = MemoryTier.EPISODIC | |
| self._index[entry.id] = entry | |
| except Exception: | |
| pass | |
| def _auto_title(content: str) -> str: | |
| """Generate a short title from content.""" | |
| first_line = content.strip().split("\n")[0][:80] | |
| return first_line if first_line else "Untitled Event" | |