last_edit / server /cache_manager.py
Moharek
Deploy Moharek GEO Platform
a74b879
"""
Redis Caching Layer
- Performance optimization
- Session caching
- API response caching
"""
import redis
import json
import os
from typing import Optional, Any
from functools import wraps
import hashlib
# Redis connection
REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
try:
redis_client = redis.from_url(REDIS_URL, decode_responses=True)
redis_client.ping()
REDIS_AVAILABLE = True
except Exception as e:
print(f"⚠️ Redis not available: {e}. Using in-memory cache fallback.")
REDIS_AVAILABLE = False
redis_client = None
# In-memory fallback cache
_memory_cache = {}
class CacheManager:
"""Unified cache manager with Redis fallback"""
def __init__(self):
self.use_redis = REDIS_AVAILABLE
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
if self.use_redis:
val = redis_client.get(key)
return json.loads(val) if val else None
else:
return _memory_cache.get(key)
except Exception as e:
print(f"Cache get error: {e}")
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""Set value in cache with TTL"""
try:
serialized = json.dumps(value)
if self.use_redis:
redis_client.setex(key, ttl, serialized)
else:
_memory_cache[key] = value
except Exception as e:
print(f"Cache set error: {e}")
def delete(self, key: str):
"""Delete value from cache"""
try:
if self.use_redis:
redis_client.delete(key)
else:
_memory_cache.pop(key, None)
except Exception as e:
print(f"Cache delete error: {e}")
def clear(self):
"""Clear all cache"""
try:
if self.use_redis:
redis_client.flushdb()
else:
_memory_cache.clear()
except Exception as e:
print(f"Cache clear error: {e}")
def get_many(self, keys: list) -> dict:
"""Get multiple values"""
result = {}
try:
if self.use_redis:
vals = redis_client.mget(keys)
for k, v in zip(keys, vals):
result[k] = json.loads(v) if v else None
else:
for k in keys:
result[k] = _memory_cache.get(k)
except Exception as e:
print(f"Cache get_many error: {e}")
return result
def set_many(self, data: dict, ttl: int = 3600):
"""Set multiple values"""
try:
if self.use_redis:
pipe = redis_client.pipeline()
for k, v in data.items():
pipe.setex(k, ttl, json.dumps(v))
pipe.execute()
else:
_memory_cache.update(data)
except Exception as e:
print(f"Cache set_many error: {e}")
def increment(self, key: str, amount: int = 1) -> int:
"""Increment counter"""
try:
if self.use_redis:
return redis_client.incr(key, amount)
else:
_memory_cache[key] = _memory_cache.get(key, 0) + amount
return _memory_cache[key]
except Exception as e:
print(f"Cache increment error: {e}")
return 0
def decrement(self, key: str, amount: int = 1) -> int:
"""Decrement counter"""
try:
if self.use_redis:
return redis_client.decr(key, amount)
else:
_memory_cache[key] = _memory_cache.get(key, 0) - amount
return _memory_cache[key]
except Exception as e:
print(f"Cache decrement error: {e}")
return 0
cache = CacheManager()
def cache_key(*args, **kwargs) -> str:
"""Generate cache key from arguments"""
key_str = json.dumps({'args': args, 'kwargs': kwargs}, sort_keys=True, default=str)
return hashlib.md5(key_str.encode()).hexdigest()
def cached(ttl: int = 3600, key_prefix: str = ''):
"""Decorator for caching function results"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
func_key = f"{key_prefix or func.__name__}:{cache_key(*args, **kwargs)}"
# Try to get from cache
cached_val = cache.get(func_key)
if cached_val is not None:
return cached_val
# Call function and cache result
result = func(*args, **kwargs)
cache.set(func_key, result, ttl)
return result
return wrapper
return decorator
def cache_invalidate(pattern: str = ''):
"""Invalidate cache by pattern"""
try:
if cache.use_redis:
keys = redis_client.keys(pattern or '*')
if keys:
redis_client.delete(*keys)
else:
if pattern:
to_delete = [k for k in _memory_cache.keys() if pattern in k]
for k in to_delete:
del _memory_cache[k]
else:
_memory_cache.clear()
except Exception as e:
print(f"Cache invalidate error: {e}")
# Session management
class SessionManager:
"""Session management with Redis"""
SESSION_PREFIX = 'session:'
SESSION_TTL = 86400 # 24 hours
@staticmethod
def create(user_id: int, data: dict) -> str:
"""Create session"""
import secrets
session_id = secrets.token_urlsafe(32)
key = f"{SessionManager.SESSION_PREFIX}{session_id}"
cache.set(key, {'user_id': user_id, **data}, SessionManager.SESSION_TTL)
return session_id
@staticmethod
def get(session_id: str) -> Optional[dict]:
"""Get session"""
key = f"{SessionManager.SESSION_PREFIX}{session_id}"
return cache.get(key)
@staticmethod
def update(session_id: str, data: dict):
"""Update session"""
key = f"{SessionManager.SESSION_PREFIX}{session_id}"
session = cache.get(key)
if session:
session.update(data)
cache.set(key, session, SessionManager.SESSION_TTL)
@staticmethod
def delete(session_id: str):
"""Delete session"""
key = f"{SessionManager.SESSION_PREFIX}{session_id}"
cache.delete(key)
# Rate limiting with Redis
class RateLimiter:
"""Rate limiter using Redis"""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
def is_allowed(self, identifier: str) -> bool:
"""Check if request is allowed"""
key = f"rate_limit:{identifier}"
try:
count = cache.increment(key)
if count == 1:
# Set expiry on first request
if cache.use_redis:
redis_client.expire(key, self.window_seconds)
return count <= self.max_requests
except Exception as e:
print(f"Rate limit error: {e}")
return True
def get_remaining(self, identifier: str) -> int:
"""Get remaining requests"""
key = f"rate_limit:{identifier}"
try:
count = cache.get(key) or 0
return max(0, self.max_requests - count)
except Exception as e:
print(f"Rate limit get_remaining error: {e}")
return self.max_requests
def reset(self, identifier: str):
"""Reset rate limit"""
key = f"rate_limit:{identifier}"
cache.delete(key)
# Query result caching
def cache_query_result(query_key: str, ttl: int = 3600):
"""Cache database query results"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_k = f"query:{query_key}:{cache_key(*args, **kwargs)}"
cached_val = cache.get(cache_k)
if cached_val is not None:
return cached_val
result = func(*args, **kwargs)
cache.set(cache_k, result, ttl)
return result
return wrapper
return decorator
# Cache statistics
def get_cache_stats() -> dict:
"""Get cache statistics"""
try:
if cache.use_redis:
info = redis_client.info()
return {
'backend': 'redis',
'used_memory': info.get('used_memory_human', 'N/A'),
'connected_clients': info.get('connected_clients', 0),
'total_commands': info.get('total_commands_processed', 0)
}
else:
return {
'backend': 'memory',
'items': len(_memory_cache),
'memory_usage': 'N/A'
}
except Exception as e:
return {'error': str(e)}
# Job-specific cache invalidation
def invalidate_job_cache(job_id: int):
"""Invalidate all cache entries related to a specific job"""
patterns = [
f"job:{job_id}:*",
f"results:job_{job_id}",
f"analysis:job_{job_id}",
f"audit:job_{job_id}",
f"keywords:job_{job_id}",
f"recommendations:job_{job_id}"
]
for pattern in patterns:
cache_invalidate(pattern)
def invalidate_results_cache():
"""Invalidate all results and analysis cache"""
patterns = [
"results:*",
"analysis:*",
"audit:*",
"query:*"
]
for pattern in patterns:
cache_invalidate(pattern)
def invalidate_url_cache(url: str):
"""Invalidate cache for a specific URL"""
import hashlib
url_hash = hashlib.md5(url.encode()).hexdigest()
patterns = [
f"url:{url_hash}:*",
f"results:{url_hash}",
f"analysis:{url_hash}"
]
for pattern in patterns:
cache_invalidate(pattern)