| |
| """ |
| Context Memory Cache Manager |
| |
| A sophisticated caching system for NZ Legislation Loophole Analysis that provides: |
| - Hash-based chunk identification for unique content tracking |
| - Multi-level caching (memory + optional disk persistence) |
| - Intelligent cache invalidation based on memory limits |
| - Performance metrics and cache statistics |
| - Thread-safe operations for concurrent processing |
| """ |
|
|
| import hashlib |
| import json |
| import os |
| import time |
| import threading |
| from typing import Dict, Any, Optional, Tuple |
| from functools import lru_cache |
| import sqlite3 |
| from pathlib import Path |
| import psutil |
| import streamlit as st |
|
|
| class CacheEntry: |
| """Represents a single cache entry with metadata""" |
|
|
| def __init__(self, key: str, content: str, analysis_result: Dict[str, Any], |
| model_config: Dict[str, Any], processing_config: Dict[str, Any]): |
| self.key = key |
| self.content = content |
| self.analysis_result = analysis_result |
| self.model_config = model_config |
| self.processing_config = processing_config |
| self.created_at = time.time() |
| self.last_accessed = time.time() |
| self.access_count = 0 |
| self.size_bytes = len(content.encode('utf-8')) + len(str(analysis_result).encode('utf-8')) |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| """Convert cache entry to dictionary for serialization""" |
| return { |
| 'key': self.key, |
| 'content': self.content, |
| 'analysis_result': self.analysis_result, |
| 'model_config': self.model_config, |
| 'processing_config': self.processing_config, |
| 'created_at': self.created_at, |
| 'last_accessed': self.last_accessed, |
| 'access_count': self.access_count, |
| 'size_bytes': self.size_bytes |
| } |
|
|
| @classmethod |
| def from_dict(cls, data: Dict[str, Any]) -> 'CacheEntry': |
| """Create cache entry from dictionary""" |
| entry = cls( |
| key=data['key'], |
| content=data['content'], |
| analysis_result=data['analysis_result'], |
| model_config=data['model_config'], |
| processing_config=data['processing_config'] |
| ) |
| entry.created_at = data.get('created_at', time.time()) |
| entry.last_accessed = data.get('last_accessed', time.time()) |
| entry.access_count = data.get('access_count', 0) |
| entry.size_bytes = data.get('size_bytes', entry.size_bytes) |
| return entry |
|
|
| def update_access(self): |
| """Update access statistics""" |
| self.last_accessed = time.time() |
| self.access_count += 1 |
|
|
| class CacheManager: |
| """Advanced cache manager for legislation analysis""" |
|
|
| def __init__(self, max_memory_mb: int = 1024, persistent: bool = True, |
| cache_dir: str = None, ttl_hours: int = 24): |
| """ |
| Initialize the cache manager |
| |
| Args: |
| max_memory_mb: Maximum memory to use for caching (MB) |
| persistent: Whether to use persistent disk cache |
| cache_dir: Directory for persistent cache storage |
| ttl_hours: Time-to-live for cache entries (hours) |
| """ |
| self.max_memory_mb = max_memory_mb |
| self.persistent = persistent |
| self.ttl_hours = ttl_hours |
| self.ttl_seconds = ttl_hours * 3600 |
|
|
| |
| if cache_dir is None: |
| cache_dir = os.path.join(os.path.dirname(__file__), '..', 'cache') |
| self.cache_dir = Path(cache_dir) |
| self.cache_dir.mkdir(exist_ok=True) |
| self.db_path = self.cache_dir / 'cache.db' |
|
|
| |
| self.lock = threading.RLock() |
|
|
| |
| self.memory_cache: Dict[str, CacheEntry] = {} |
| self.memory_size = 0 |
|
|
| |
| self.stats = { |
| 'hits': 0, |
| 'misses': 0, |
| 'entries': 0, |
| 'memory_usage_mb': 0, |
| 'evictions': 0, |
| 'enabled': True |
| } |
|
|
| |
| if self.persistent: |
| self._init_database() |
|
|
| |
| if self.persistent: |
| self._load_persistent_cache() |
|
|
| def _init_database(self): |
| """Initialize SQLite database for persistent cache""" |
| try: |
| with sqlite3.connect(str(self.db_path)) as conn: |
| conn.execute(''' |
| CREATE TABLE IF NOT EXISTS cache_entries ( |
| key TEXT PRIMARY KEY, |
| data TEXT NOT NULL, |
| created_at REAL NOT NULL, |
| last_accessed REAL NOT NULL, |
| access_count INTEGER DEFAULT 0, |
| size_bytes INTEGER DEFAULT 0 |
| ) |
| ''') |
| conn.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON cache_entries(created_at)') |
| conn.execute('CREATE INDEX IF NOT EXISTS idx_last_accessed ON cache_entries(last_accessed)') |
| except Exception as e: |
| print(f"Warning: Could not initialize persistent cache: {e}") |
| self.persistent = False |
|
|
| def _load_persistent_cache(self): |
| """Load existing cache entries from database""" |
| if not self.persistent: |
| return |
|
|
| try: |
| with sqlite3.connect(str(self.db_path)) as conn: |
| cursor = conn.execute('SELECT data FROM cache_entries') |
| for row in cursor: |
| try: |
| entry_data = json.loads(row[0]) |
| entry = CacheEntry.from_dict(entry_data) |
|
|
| |
| if self._is_entry_valid(entry): |
| self._add_to_memory_cache(entry) |
| else: |
| |
| conn.execute('DELETE FROM cache_entries WHERE key = ?', (entry.key,)) |
| except (json.JSONDecodeError, KeyError): |
| continue |
| except Exception as e: |
| print(f"Warning: Could not load persistent cache: {e}") |
|
|
| def _generate_cache_key(self, content: str, model_config: Dict[str, Any], |
| processing_config: Dict[str, Any]) -> str: |
| """ |
| Generate a unique cache key based on content and configuration |
| |
| Args: |
| content: The text content to be analyzed |
| model_config: Model configuration used for analysis |
| processing_config: Processing configuration used |
| |
| Returns: |
| SHA-256 hash string as cache key |
| """ |
| |
| key_data = { |
| 'content': content, |
| 'model_config': model_config, |
| 'processing_config': processing_config |
| } |
|
|
| |
| key_string = json.dumps(key_data, sort_keys=True) |
|
|
| |
| return hashlib.sha256(key_string.encode('utf-8')).hexdigest() |
|
|
| def _is_entry_valid(self, entry: CacheEntry) -> bool: |
| """Check if a cache entry is still valid""" |
| |
| if time.time() - entry.created_at > self.ttl_seconds: |
| return False |
|
|
| |
| |
|
|
| return True |
|
|
| def _add_to_memory_cache(self, entry: CacheEntry): |
| """Add entry to memory cache with size management""" |
| with self.lock: |
| |
| while self.memory_size + entry.size_bytes > self.max_memory_mb * 1024 * 1024: |
| if not self.memory_cache: |
| break |
| self._evict_lru_entry() |
|
|
| self.memory_cache[entry.key] = entry |
| self.memory_size += entry.size_bytes |
| self.stats['entries'] = len(self.memory_cache) |
| self.stats['memory_usage_mb'] = self.memory_size / (1024 * 1024) |
|
|
| def _evict_lru_entry(self): |
| """Evict the least recently used entry from memory cache""" |
| if not self.memory_cache: |
| return |
|
|
| |
| lru_key = min(self.memory_cache.keys(), |
| key=lambda k: self.memory_cache[k].last_accessed) |
|
|
| evicted_entry = self.memory_cache.pop(lru_key) |
| self.memory_size -= evicted_entry.size_bytes |
| self.stats['evictions'] += 1 |
|
|
| |
| |
|
|
| def _save_to_persistent_cache(self, entry: CacheEntry): |
| """Save entry to persistent cache""" |
| if not self.persistent: |
| return |
|
|
| try: |
| with sqlite3.connect(str(self.db_path)) as conn: |
| conn.execute(''' |
| INSERT OR REPLACE INTO cache_entries |
| (key, data, created_at, last_accessed, access_count, size_bytes) |
| VALUES (?, ?, ?, ?, ?, ?) |
| ''', ( |
| entry.key, |
| json.dumps(entry.to_dict()), |
| entry.created_at, |
| entry.last_accessed, |
| entry.access_count, |
| entry.size_bytes |
| )) |
| except Exception as e: |
| print(f"Warning: Could not save to persistent cache: {e}") |
|
|
| def get(self, content: str, model_config: Dict[str, Any], |
| processing_config: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
| """ |
| Get cached analysis result for given content and configuration |
| |
| Args: |
| content: Text content to look up |
| model_config: Model configuration used for analysis |
| processing_config: Processing configuration used |
| |
| Returns: |
| Cached analysis result or None if not found |
| """ |
| if not self.stats['enabled']: |
| self.stats['misses'] += 1 |
| return None |
|
|
| cache_key = self._generate_cache_key(content, model_config, processing_config) |
|
|
| with self.lock: |
| |
| if cache_key in self.memory_cache: |
| entry = self.memory_cache[cache_key] |
|
|
| if self._is_entry_valid(entry): |
| entry.update_access() |
| self.stats['hits'] += 1 |
| return entry.analysis_result |
| else: |
| |
| self.memory_cache.pop(cache_key) |
| self.memory_size -= entry.size_bytes |
| self.stats['entries'] = len(self.memory_cache) |
|
|
| |
| if self.persistent: |
| try: |
| with sqlite3.connect(str(self.db_path)) as conn: |
| cursor = conn.execute('SELECT data FROM cache_entries WHERE key = ?', (cache_key,)) |
| row = cursor.fetchone() |
|
|
| if row: |
| entry_data = json.loads(row[0]) |
| entry = CacheEntry.from_dict(entry_data) |
|
|
| if self._is_entry_valid(entry): |
| entry.update_access() |
| self.stats['hits'] += 1 |
|
|
| |
| self._add_to_memory_cache(entry) |
|
|
| |
| self._save_to_persistent_cache(entry) |
|
|
| return entry.analysis_result |
| except Exception as e: |
| print(f"Warning: Error accessing persistent cache: {e}") |
|
|
| self.stats['misses'] += 1 |
| return None |
|
|
| def put(self, content: str, analysis_result: Dict[str, Any], |
| model_config: Dict[str, Any], processing_config: Dict[str, Any]): |
| """ |
| Store analysis result in cache |
| |
| Args: |
| content: Text content that was analyzed |
| analysis_result: Analysis result to cache |
| model_config: Model configuration used for analysis |
| processing_config: Processing configuration used |
| """ |
| if not self.stats['enabled']: |
| return |
|
|
| cache_key = self._generate_cache_key(content, model_config, processing_config) |
|
|
| with self.lock: |
| entry = CacheEntry(cache_key, content, analysis_result, |
| model_config, processing_config) |
|
|
| |
| self._add_to_memory_cache(entry) |
|
|
| |
| self._save_to_persistent_cache(entry) |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| """Get cache statistics""" |
| with self.lock: |
| total_requests = self.stats['hits'] + self.stats['misses'] |
| hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0 |
|
|
| return { |
| **self.stats, |
| 'hit_rate': hit_rate, |
| 'total_requests': total_requests, |
| 'persistent_enabled': self.persistent, |
| 'memory_limit_mb': self.max_memory_mb, |
| 'ttl_hours': self.ttl_hours |
| } |
|
|
| def clear_cache(self): |
| """Clear all cache entries""" |
| with self.lock: |
| self.memory_cache.clear() |
| self.memory_size = 0 |
| self.stats['entries'] = 0 |
| self.stats['hits'] = 0 |
| self.stats['misses'] = 0 |
| self.stats['evictions'] = 0 |
| self.stats['memory_usage_mb'] = 0 |
|
|
| |
| if self.persistent: |
| try: |
| with sqlite3.connect(str(self.db_path)) as conn: |
| conn.execute('DELETE FROM cache_entries') |
| except Exception as e: |
| print(f"Warning: Could not clear persistent cache: {e}") |
|
|
| def cleanup_expired_entries(self): |
| """Remove expired entries from cache""" |
| current_time = time.time() |
| expired_keys = [] |
|
|
| with self.lock: |
| |
| for key, entry in self.memory_cache.items(): |
| if current_time - entry.created_at > self.ttl_seconds: |
| expired_keys.append(key) |
| self.memory_size -= entry.size_bytes |
|
|
| |
| for key in expired_keys: |
| del self.memory_cache[key] |
|
|
| self.stats['entries'] = len(self.memory_cache) |
| self.stats['memory_usage_mb'] = self.memory_size / (1024 * 1024) |
|
|
| |
| if self.persistent: |
| try: |
| with sqlite3.connect(str(self.db_path)) as conn: |
| conn.execute('DELETE FROM cache_entries WHERE ? - created_at > ?', |
| (current_time, self.ttl_seconds)) |
| except Exception as e: |
| print(f"Warning: Could not cleanup persistent cache: {e}") |
|
|
| def enable(self): |
| """Enable caching""" |
| self.stats['enabled'] = True |
|
|
| def disable(self): |
| """Disable caching""" |
| self.stats['enabled'] = False |
|
|
| def export_cache(self, filepath: str): |
| """Export cache contents to JSON file""" |
| cache_data = { |
| 'metadata': { |
| 'exported_at': time.time(), |
| 'version': '1.0', |
| 'total_entries': len(self.memory_cache) |
| }, |
| 'entries': [] |
| } |
|
|
| with self.lock: |
| for entry in self.memory_cache.values(): |
| cache_data['entries'].append(entry.to_dict()) |
|
|
| |
| if self.persistent: |
| try: |
| with sqlite3.connect(str(self.db_path)) as conn: |
| cursor = conn.execute('SELECT data FROM cache_entries') |
| for row in cursor: |
| try: |
| entry_data = json.loads(row[0]) |
| cache_data['entries'].append(entry_data) |
| except json.JSONDecodeError: |
| continue |
| except Exception as e: |
| print(f"Warning: Could not export persistent cache: {e}") |
|
|
| try: |
| with open(filepath, 'w', encoding='utf-8') as f: |
| json.dump(cache_data, f, indent=2, ensure_ascii=False) |
| return True |
| except Exception as e: |
| print(f"Error exporting cache: {e}") |
| return False |
|
|
| def import_cache(self, filepath: str): |
| """Import cache contents from JSON file""" |
| try: |
| with open(filepath, 'r', encoding='utf-8') as f: |
| cache_data = json.load(f) |
|
|
| imported_count = 0 |
| for entry_data in cache_data.get('entries', []): |
| try: |
| entry = CacheEntry.from_dict(entry_data) |
| if self._is_entry_valid(entry): |
| self._add_to_memory_cache(entry) |
| if self.persistent: |
| self._save_to_persistent_cache(entry) |
| imported_count += 1 |
| except Exception as e: |
| print(f"Warning: Could not import cache entry: {e}") |
| continue |
|
|
| return imported_count |
| except Exception as e: |
| print(f"Error importing cache: {e}") |
| return 0 |
|
|
| |
| _cache_instance = None |
| _cache_lock = threading.Lock() |
|
|
| def get_cache_manager(max_memory_mb: int = 1024, persistent: bool = True, |
| cache_dir: str = None, ttl_hours: int = 24) -> CacheManager: |
| """ |
| Get or create global cache manager instance |
| |
| This ensures we have a single cache instance across the application |
| while allowing configuration updates. |
| """ |
| global _cache_instance |
|
|
| with _cache_lock: |
| if _cache_instance is None: |
| _cache_instance = CacheManager(max_memory_mb, persistent, cache_dir, ttl_hours) |
| else: |
| |
| if (_cache_instance.max_memory_mb != max_memory_mb or |
| _cache_instance.persistent != persistent or |
| _cache_instance.ttl_hours != ttl_hours): |
| _cache_instance.max_memory_mb = max_memory_mb |
| _cache_instance.persistent = persistent |
| _cache_instance.ttl_hours = ttl_hours |
| _cache_instance.ttl_seconds = ttl_hours * 3600 |
|
|
| return _cache_instance |
|
|