l / streamlit_app /core /cache_manager.py
Princess3's picture
Upload 25 files
c089ca4 verified
#!/usr/bin/env python3
"""
Context Memory Cache Manager
A sophisticated caching system for NZ Legislation Loophole Analysis that provides:
- Hash-based chunk identification for unique content tracking
- Multi-level caching (memory + optional disk persistence)
- Intelligent cache invalidation based on memory limits
- Performance metrics and cache statistics
- Thread-safe operations for concurrent processing
"""
import hashlib
import json
import os
import time
import threading
from typing import Dict, Any, Optional, Tuple
from functools import lru_cache
import sqlite3
from pathlib import Path
import psutil
import streamlit as st
class CacheEntry:
"""Represents a single cache entry with metadata"""
def __init__(self, key: str, content: str, analysis_result: Dict[str, Any],
model_config: Dict[str, Any], processing_config: Dict[str, Any]):
self.key = key
self.content = content
self.analysis_result = analysis_result
self.model_config = model_config
self.processing_config = processing_config
self.created_at = time.time()
self.last_accessed = time.time()
self.access_count = 0
self.size_bytes = len(content.encode('utf-8')) + len(str(analysis_result).encode('utf-8'))
def to_dict(self) -> Dict[str, Any]:
"""Convert cache entry to dictionary for serialization"""
return {
'key': self.key,
'content': self.content,
'analysis_result': self.analysis_result,
'model_config': self.model_config,
'processing_config': self.processing_config,
'created_at': self.created_at,
'last_accessed': self.last_accessed,
'access_count': self.access_count,
'size_bytes': self.size_bytes
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CacheEntry':
"""Create cache entry from dictionary"""
entry = cls(
key=data['key'],
content=data['content'],
analysis_result=data['analysis_result'],
model_config=data['model_config'],
processing_config=data['processing_config']
)
entry.created_at = data.get('created_at', time.time())
entry.last_accessed = data.get('last_accessed', time.time())
entry.access_count = data.get('access_count', 0)
entry.size_bytes = data.get('size_bytes', entry.size_bytes)
return entry
def update_access(self):
"""Update access statistics"""
self.last_accessed = time.time()
self.access_count += 1
class CacheManager:
"""Advanced cache manager for legislation analysis"""
def __init__(self, max_memory_mb: int = 1024, persistent: bool = True,
cache_dir: str = None, ttl_hours: int = 24):
"""
Initialize the cache manager
Args:
max_memory_mb: Maximum memory to use for caching (MB)
persistent: Whether to use persistent disk cache
cache_dir: Directory for persistent cache storage
ttl_hours: Time-to-live for cache entries (hours)
"""
self.max_memory_mb = max_memory_mb
self.persistent = persistent
self.ttl_hours = ttl_hours
self.ttl_seconds = ttl_hours * 3600
# Set up cache directory
if cache_dir is None:
cache_dir = os.path.join(os.path.dirname(__file__), '..', 'cache')
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.db_path = self.cache_dir / 'cache.db'
# Thread synchronization
self.lock = threading.RLock()
# In-memory cache with LRU eviction
self.memory_cache: Dict[str, CacheEntry] = {}
self.memory_size = 0 # Current memory usage in bytes
# Statistics
self.stats = {
'hits': 0,
'misses': 0,
'entries': 0,
'memory_usage_mb': 0,
'evictions': 0,
'enabled': True
}
# Initialize database if persistent
if self.persistent:
self._init_database()
# Load existing cache entries if persistent
if self.persistent:
self._load_persistent_cache()
def _init_database(self):
"""Initialize SQLite database for persistent cache"""
try:
with sqlite3.connect(str(self.db_path)) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS cache_entries (
key TEXT PRIMARY KEY,
data TEXT NOT NULL,
created_at REAL NOT NULL,
last_accessed REAL NOT NULL,
access_count INTEGER DEFAULT 0,
size_bytes INTEGER DEFAULT 0
)
''')
conn.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON cache_entries(created_at)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_last_accessed ON cache_entries(last_accessed)')
except Exception as e:
print(f"Warning: Could not initialize persistent cache: {e}")
self.persistent = False
def _load_persistent_cache(self):
"""Load existing cache entries from database"""
if not self.persistent:
return
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.execute('SELECT data FROM cache_entries')
for row in cursor:
try:
entry_data = json.loads(row[0])
entry = CacheEntry.from_dict(entry_data)
# Check if entry is still valid
if self._is_entry_valid(entry):
self._add_to_memory_cache(entry)
else:
# Remove expired entry from database
conn.execute('DELETE FROM cache_entries WHERE key = ?', (entry.key,))
except (json.JSONDecodeError, KeyError):
continue
except Exception as e:
print(f"Warning: Could not load persistent cache: {e}")
def _generate_cache_key(self, content: str, model_config: Dict[str, Any],
processing_config: Dict[str, Any]) -> str:
"""
Generate a unique cache key based on content and configuration
Args:
content: The text content to be analyzed
model_config: Model configuration used for analysis
processing_config: Processing configuration used
Returns:
SHA-256 hash string as cache key
"""
# Create a deterministic string from all parameters
key_data = {
'content': content,
'model_config': model_config,
'processing_config': processing_config
}
# Convert to JSON string with sorted keys for consistency
key_string = json.dumps(key_data, sort_keys=True)
# Generate SHA-256 hash
return hashlib.sha256(key_string.encode('utf-8')).hexdigest()
def _is_entry_valid(self, entry: CacheEntry) -> bool:
"""Check if a cache entry is still valid"""
# Check TTL
if time.time() - entry.created_at > self.ttl_seconds:
return False
# Check if configurations match (for future-proofing)
# This could be enhanced to handle configuration changes
return True
def _add_to_memory_cache(self, entry: CacheEntry):
"""Add entry to memory cache with size management"""
with self.lock:
# Check if we need to evict entries
while self.memory_size + entry.size_bytes > self.max_memory_mb * 1024 * 1024:
if not self.memory_cache:
break
self._evict_lru_entry()
self.memory_cache[entry.key] = entry
self.memory_size += entry.size_bytes
self.stats['entries'] = len(self.memory_cache)
self.stats['memory_usage_mb'] = self.memory_size / (1024 * 1024)
def _evict_lru_entry(self):
"""Evict the least recently used entry from memory cache"""
if not self.memory_cache:
return
# Find entry with oldest last_accessed time
lru_key = min(self.memory_cache.keys(),
key=lambda k: self.memory_cache[k].last_accessed)
evicted_entry = self.memory_cache.pop(lru_key)
self.memory_size -= evicted_entry.size_bytes
self.stats['evictions'] += 1
# If persistent, we could keep it in database but remove from memory
# For now, we'll just remove it completely
def _save_to_persistent_cache(self, entry: CacheEntry):
"""Save entry to persistent cache"""
if not self.persistent:
return
try:
with sqlite3.connect(str(self.db_path)) as conn:
conn.execute('''
INSERT OR REPLACE INTO cache_entries
(key, data, created_at, last_accessed, access_count, size_bytes)
VALUES (?, ?, ?, ?, ?, ?)
''', (
entry.key,
json.dumps(entry.to_dict()),
entry.created_at,
entry.last_accessed,
entry.access_count,
entry.size_bytes
))
except Exception as e:
print(f"Warning: Could not save to persistent cache: {e}")
def get(self, content: str, model_config: Dict[str, Any],
processing_config: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Get cached analysis result for given content and configuration
Args:
content: Text content to look up
model_config: Model configuration used for analysis
processing_config: Processing configuration used
Returns:
Cached analysis result or None if not found
"""
if not self.stats['enabled']:
self.stats['misses'] += 1
return None
cache_key = self._generate_cache_key(content, model_config, processing_config)
with self.lock:
# Check memory cache first
if cache_key in self.memory_cache:
entry = self.memory_cache[cache_key]
if self._is_entry_valid(entry):
entry.update_access()
self.stats['hits'] += 1
return entry.analysis_result
else:
# Remove invalid entry
self.memory_cache.pop(cache_key)
self.memory_size -= entry.size_bytes
self.stats['entries'] = len(self.memory_cache)
# Check persistent cache if not in memory
if self.persistent:
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.execute('SELECT data FROM cache_entries WHERE key = ?', (cache_key,))
row = cursor.fetchone()
if row:
entry_data = json.loads(row[0])
entry = CacheEntry.from_dict(entry_data)
if self._is_entry_valid(entry):
entry.update_access()
self.stats['hits'] += 1
# Move to memory cache for faster future access
self._add_to_memory_cache(entry)
# Update persistent cache with new access stats
self._save_to_persistent_cache(entry)
return entry.analysis_result
except Exception as e:
print(f"Warning: Error accessing persistent cache: {e}")
self.stats['misses'] += 1
return None
def put(self, content: str, analysis_result: Dict[str, Any],
model_config: Dict[str, Any], processing_config: Dict[str, Any]):
"""
Store analysis result in cache
Args:
content: Text content that was analyzed
analysis_result: Analysis result to cache
model_config: Model configuration used for analysis
processing_config: Processing configuration used
"""
if not self.stats['enabled']:
return
cache_key = self._generate_cache_key(content, model_config, processing_config)
with self.lock:
entry = CacheEntry(cache_key, content, analysis_result,
model_config, processing_config)
# Add to memory cache
self._add_to_memory_cache(entry)
# Save to persistent cache
self._save_to_persistent_cache(entry)
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
with self.lock:
total_requests = self.stats['hits'] + self.stats['misses']
hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0
return {
**self.stats,
'hit_rate': hit_rate,
'total_requests': total_requests,
'persistent_enabled': self.persistent,
'memory_limit_mb': self.max_memory_mb,
'ttl_hours': self.ttl_hours
}
def clear_cache(self):
"""Clear all cache entries"""
with self.lock:
self.memory_cache.clear()
self.memory_size = 0
self.stats['entries'] = 0
self.stats['hits'] = 0
self.stats['misses'] = 0
self.stats['evictions'] = 0
self.stats['memory_usage_mb'] = 0
# Clear persistent cache
if self.persistent:
try:
with sqlite3.connect(str(self.db_path)) as conn:
conn.execute('DELETE FROM cache_entries')
except Exception as e:
print(f"Warning: Could not clear persistent cache: {e}")
def cleanup_expired_entries(self):
"""Remove expired entries from cache"""
current_time = time.time()
expired_keys = []
with self.lock:
# Find expired entries in memory
for key, entry in self.memory_cache.items():
if current_time - entry.created_at > self.ttl_seconds:
expired_keys.append(key)
self.memory_size -= entry.size_bytes
# Remove expired entries from memory
for key in expired_keys:
del self.memory_cache[key]
self.stats['entries'] = len(self.memory_cache)
self.stats['memory_usage_mb'] = self.memory_size / (1024 * 1024)
# Clean up persistent cache
if self.persistent:
try:
with sqlite3.connect(str(self.db_path)) as conn:
conn.execute('DELETE FROM cache_entries WHERE ? - created_at > ?',
(current_time, self.ttl_seconds))
except Exception as e:
print(f"Warning: Could not cleanup persistent cache: {e}")
def enable(self):
"""Enable caching"""
self.stats['enabled'] = True
def disable(self):
"""Disable caching"""
self.stats['enabled'] = False
def export_cache(self, filepath: str):
"""Export cache contents to JSON file"""
cache_data = {
'metadata': {
'exported_at': time.time(),
'version': '1.0',
'total_entries': len(self.memory_cache)
},
'entries': []
}
with self.lock:
for entry in self.memory_cache.values():
cache_data['entries'].append(entry.to_dict())
# Also export persistent cache entries
if self.persistent:
try:
with sqlite3.connect(str(self.db_path)) as conn:
cursor = conn.execute('SELECT data FROM cache_entries')
for row in cursor:
try:
entry_data = json.loads(row[0])
cache_data['entries'].append(entry_data)
except json.JSONDecodeError:
continue
except Exception as e:
print(f"Warning: Could not export persistent cache: {e}")
try:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"Error exporting cache: {e}")
return False
def import_cache(self, filepath: str):
"""Import cache contents from JSON file"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
imported_count = 0
for entry_data in cache_data.get('entries', []):
try:
entry = CacheEntry.from_dict(entry_data)
if self._is_entry_valid(entry):
self._add_to_memory_cache(entry)
if self.persistent:
self._save_to_persistent_cache(entry)
imported_count += 1
except Exception as e:
print(f"Warning: Could not import cache entry: {e}")
continue
return imported_count
except Exception as e:
print(f"Error importing cache: {e}")
return 0
# Global cache instance for use across the application
_cache_instance = None
_cache_lock = threading.Lock()
def get_cache_manager(max_memory_mb: int = 1024, persistent: bool = True,
cache_dir: str = None, ttl_hours: int = 24) -> CacheManager:
"""
Get or create global cache manager instance
This ensures we have a single cache instance across the application
while allowing configuration updates.
"""
global _cache_instance
with _cache_lock:
if _cache_instance is None:
_cache_instance = CacheManager(max_memory_mb, persistent, cache_dir, ttl_hours)
else:
# Update configuration if different
if (_cache_instance.max_memory_mb != max_memory_mb or
_cache_instance.persistent != persistent or
_cache_instance.ttl_hours != ttl_hours):
_cache_instance.max_memory_mb = max_memory_mb
_cache_instance.persistent = persistent
_cache_instance.ttl_hours = ttl_hours
_cache_instance.ttl_seconds = ttl_hours * 3600
return _cache_instance