|
""" |
|
Cache manager for storing and retrieving agent answers. |
|
""" |
|
|
|
import os |
|
import json |
|
import hashlib |
|
from typing import Optional, Dict, Any, List |
|
from datetime import datetime |
|
|
|
class CacheManager: |
|
"""Manages caching of agent answers to avoid redundant processing.""" |
|
|
|
def __init__(self, cache_dir: str = "cache"): |
|
self.cache_dir = cache_dir |
|
self.ensure_cache_dir() |
|
|
|
def ensure_cache_dir(self): |
|
"""Create cache directory if it doesn't exist.""" |
|
if not os.path.exists(self.cache_dir): |
|
os.makedirs(self.cache_dir) |
|
|
|
def _get_question_hash(self, question: str) -> str: |
|
"""Generate a hash for the question to use as filename.""" |
|
return hashlib.md5(question.encode('utf-8')).hexdigest()[:12] |
|
|
|
def _get_cache_path(self, question: str) -> str: |
|
"""Get the cache file path for a question.""" |
|
question_hash = self._get_question_hash(question) |
|
return os.path.join(self.cache_dir, f"question_{question_hash}.json") |
|
|
|
def get_cached_answer(self, question: str) -> Optional[Dict[str, Any]]: |
|
""" |
|
Retrieve cached answer for a question. |
|
|
|
Args: |
|
question: The question to look up |
|
|
|
Returns: |
|
Dictionary with answer, iterations, and metadata if cached, None otherwise |
|
""" |
|
cache_path = self._get_cache_path(question) |
|
if not os.path.exists(cache_path): |
|
return None |
|
try: |
|
with open(cache_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
answers = data.get('answers', []) |
|
if not answers: |
|
return None |
|
last_answer = answers[-1] |
|
return { |
|
'answer': last_answer.get('answer', ''), |
|
'iterations': last_answer.get('iterations', 0), |
|
'timestamp': last_answer.get('timestamp', ''), |
|
'cache_valid': data.get('cache_valid', False), |
|
'file_name': data.get('file_name', None) |
|
} |
|
except Exception as e: |
|
print(f"Error reading cache: {e}") |
|
return None |
|
|
|
def cache_answer(self, question: str, answer: Optional[str], iterations: int = 1, file_name: Optional[str] = None) -> bool: |
|
""" |
|
Cache an answer for a question with iteration count. |
|
|
|
Args: |
|
question: The question that was asked |
|
answer: The answer to cache |
|
iterations: Number of iterations/steps used (should be 1-10 typically) |
|
|
|
Returns: |
|
True if cached successfully, False otherwise |
|
""" |
|
cache_path = self._get_cache_path(question) |
|
cache_valid = bool(answer and self.validate_answer_content(answer)) |
|
now = datetime.now().isoformat() |
|
try: |
|
if os.path.exists(cache_path): |
|
with open(cache_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
else: |
|
data = { |
|
'question': question, |
|
'answers': [], |
|
'cache_valid': False, |
|
'file_name': file_name |
|
} |
|
|
|
if file_name: |
|
data['file_name'] = file_name |
|
print(f"[CacheManager] file_name submitted: {file_name}") |
|
|
|
if cache_valid: |
|
data['answers'].append({ |
|
'answer': answer, |
|
'iterations': iterations, |
|
'timestamp': now |
|
}) |
|
data['cache_valid'] = True |
|
else: |
|
|
|
data['answers'].append({ |
|
'answer': answer if answer else "", |
|
'iterations': iterations, |
|
'timestamp': now |
|
}) |
|
data['cache_valid'] = False |
|
with open(cache_path, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, indent=2) |
|
return True |
|
except Exception as e: |
|
print(f"Error caching answer: {e}") |
|
return False |
|
|
|
def validate_answer_content(self, answer: str) -> bool: |
|
""" |
|
Validate that answer content is reasonable to cache. |
|
Error messages and corrupted responses should NOT be cached as valid. |
|
|
|
Args: |
|
answer: The answer content to validate |
|
|
|
Returns: |
|
True if answer is valid to cache, False otherwise |
|
""" |
|
if not answer or not isinstance(answer, str): |
|
return False |
|
|
|
clean_answer = answer.strip() |
|
if len(clean_answer) < 3: |
|
return False |
|
|
|
|
|
error_patterns = [ |
|
'error calling llm', |
|
'error running agent', |
|
'error in', |
|
'error processing', |
|
'litellm.badrequest', |
|
'litellm.exception', |
|
'vertexaiexception', |
|
'badrequest', |
|
'invalid_argument', |
|
'authentication', |
|
'credentials', |
|
'api key', |
|
'traceback', |
|
'exception occurred', |
|
'failed to', |
|
'unable to submit', |
|
'mimetype parameter', |
|
'not supported' |
|
] |
|
|
|
|
|
lower_answer = clean_answer.lower() |
|
for pattern in error_patterns: |
|
if pattern in lower_answer: |
|
print(f"[CacheManager] Rejecting answer containing error pattern: '{pattern}'") |
|
return False |
|
|
|
|
|
corrupt_patterns = [']', '[', '{}', '()', '""', "''", 'null', 'undefined'] |
|
if clean_answer in corrupt_patterns: |
|
return False |
|
|
|
|
|
if all(c in '[]{}()' for c in clean_answer): |
|
return False |
|
|
|
return True |
|
|
|
def clear_cache(self): |
|
"""Clear all cached answers.""" |
|
try: |
|
for filename in os.listdir(self.cache_dir): |
|
file_path = os.path.join(self.cache_dir, filename) |
|
if os.path.isfile(file_path): |
|
os.remove(file_path) |
|
print("Cache cleared successfully") |
|
except Exception as e: |
|
print(f"Error clearing cache: {e}") |
|
|
|
def list_cached_questions(self) -> List[Dict[str, Any]]: |
|
"""List all cached questions with metadata.""" |
|
cached_questions = [] |
|
try: |
|
for filename in os.listdir(self.cache_dir): |
|
if filename.startswith('question_') and filename.endswith('.json'): |
|
cache_path = os.path.join(self.cache_dir, filename) |
|
with open(cache_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
cached_questions.append({ |
|
'question': data.get('question', ''), |
|
'cache_valid': data.get('cache_valid', False), |
|
'file_name': data.get('file_name', None), |
|
'last_timestamp': data['answers'][-1]['timestamp'] if data.get('answers') else None |
|
}) |
|
except Exception as e: |
|
print(f"Error listing cached questions: {e}") |
|
return sorted(cached_questions, key=lambda x: x.get('last_timestamp', ''), reverse=True) |
|
|
|
def cleanup_invalid_cache_entries(self) -> int: |
|
""" |
|
Clean up cache entries that contain error messages or invalid content. |
|
|
|
Returns: |
|
Number of entries cleaned up |
|
""" |
|
cleaned_count = 0 |
|
try: |
|
for filename in os.listdir(self.cache_dir): |
|
if filename.startswith('question_') and filename.endswith('.json'): |
|
cache_path = os.path.join(self.cache_dir, filename) |
|
|
|
try: |
|
with open(cache_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
|
|
|
|
should_cleanup = False |
|
|
|
|
|
if data.get('cache_valid', False): |
|
answers = data.get('answers', []) |
|
for answer_entry in answers: |
|
answer_text = answer_entry.get('answer', '') |
|
if not self.validate_answer_content(answer_text): |
|
print(f"Found invalid cached answer in {filename}: {answer_text[:100]}...") |
|
should_cleanup = True |
|
break |
|
|
|
if should_cleanup: |
|
|
|
data['cache_valid'] = False |
|
with open(cache_path, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, indent=2) |
|
cleaned_count += 1 |
|
print(f"Cleaned up invalid cache entry: {filename}") |
|
|
|
except Exception as e: |
|
print(f"Error processing cache file {filename}: {e}") |
|
continue |
|
|
|
except Exception as e: |
|
print(f"Error during cache cleanup: {e}") |
|
|
|
print(f"Cache cleanup completed. {cleaned_count} entries cleaned up.") |
|
return cleaned_count |