Spaces:
Running
Running
| import json | |
| import logging | |
| from typing import Any, Dict, Optional | |
| import redis | |
| from redis.exceptions import RedisError | |
| from app.core.settings import settings | |
| logger = logging.getLogger(__name__) | |
| class CacheManager: | |
| CACHE_PREFIX = "mathminds:cache:" | |
| MAX_CACHE_SIZE = 50000 | |
| def __init__( | |
| self, | |
| redis_url: Optional[str] = None, | |
| connection_pool: Optional[redis.ConnectionPool] = None, | |
| ): | |
| self.redis_url = redis_url or settings.REDIS_URL | |
| self.redis_client = None | |
| try: | |
| if connection_pool: | |
| self.redis_client = redis.Redis( | |
| connection_pool=connection_pool, | |
| decode_responses=True, | |
| ) | |
| else: | |
| self.redis_client = redis.from_url( | |
| self.redis_url, | |
| decode_responses=True, | |
| socket_timeout=2, | |
| socket_connect_timeout=2, | |
| ) | |
| self.redis_client.ping() | |
| logger.info(f"Connected to Redis at {self.redis_url}") | |
| except RedisError as e: | |
| logger.error(f"Redis connection failed: {e}") | |
| self.redis_client = None | |
| def _serialize(self, data: Any) -> str: | |
| return json.dumps(data, default=str) | |
| def _prefixed(self, key: str) -> str: | |
| return f"{self.CACHE_PREFIX}{key}" | |
| def get_cached_answer(self, cache_key: str) -> Optional[Dict[str, Any]]: | |
| if not self.redis_client: | |
| return None | |
| try: | |
| key = self._prefixed(cache_key) | |
| data = self.redis_client.get(key) | |
| if data: | |
| logger.debug(f"Cache hit: {key}") | |
| return json.loads(data) | |
| return None | |
| except (RedisError, json.JSONDecodeError) as e: | |
| logger.error(f"Cache read error: {e}") | |
| return None | |
| def set_cached_answer( | |
| self, | |
| cache_key: str, | |
| answer: Dict[str, Any], | |
| ttl: int = 86400, | |
| ) -> bool: | |
| if not self.redis_client: | |
| return False | |
| try: | |
| key = self._prefixed(cache_key) | |
| serialized_data = self._serialize(answer) | |
| if len(serialized_data) > self.MAX_CACHE_SIZE: | |
| logger.warning("Cache skipped: payload too large") | |
| return False | |
| self.redis_client.setex(key, ttl, serialized_data) | |
| return True | |
| except (RedisError, TypeError) as e: | |
| logger.error(f"Cache write failed: {e}") | |
| return False | |
| def set_if_not_exists( | |
| self, | |
| cache_key: str, | |
| answer: Dict[str, Any], | |
| ttl: int = 86400, | |
| ) -> bool: | |
| if not self.redis_client: | |
| return False | |
| try: | |
| key = self._prefixed(cache_key) | |
| serialized_data = self._serialize(answer) | |
| result = self.redis_client.set( | |
| key, | |
| serialized_data, | |
| ex=ttl, | |
| nx=True, | |
| ) | |
| return bool(result) | |
| except Exception as e: | |
| logger.error(f"set_if_not_exists failed: {e}") | |
| return False | |
| def delete(self, cache_key: str) -> bool: | |
| if not self.redis_client: | |
| return False | |
| try: | |
| key = self._prefixed(cache_key) | |
| return bool(self.redis_client.delete(key)) | |
| except RedisError: | |
| return False | |
| def stats(self) -> Dict[str, Any]: | |
| if not self.redis_client: | |
| return {} | |
| try: | |
| info = self.redis_client.info() | |
| return { | |
| "used_memory": info.get("used_memory_human"), | |
| "connected_clients": info.get("connected_clients"), | |
| "keyspace_hits": info.get("keyspace_hits"), | |
| "keyspace_misses": info.get("keyspace_misses"), | |
| } | |
| except Exception: | |
| return {} |