Spaces:
Sleeping
Sleeping
File size: 4,941 Bytes
a3c7b61 81c40fc 84d1290 a3c7b61 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import time
from typing import Dict, Any, Optional
from backend.credentials import setup_google_credentials
setup_google_credentials()
# Simple in-memory cache
user_cache: Dict[str, Dict[str, Any]] = {}
routing_cache: Dict[str, str] = {}
CACHE_DURATION = 300 # 5 minutes in seconds
ROUTING_CACHE_DURATION = 1800 # 30 minutes for routing
def get_cache_key(user_id: str) -> str:
"""Generate cache key for user data"""
return f"user_data:{user_id}"
def is_cache_valid(cache_entry: Dict[str, Any], duration: int = CACHE_DURATION) -> bool:
"""Check if cache entry is still valid"""
current_time = time.time()
return current_time - cache_entry['timestamp'] < duration
def get_cached_user_data(user_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve cached user data if valid"""
cache_key = get_cache_key(user_id)
if cache_key in user_cache:
cache_entry = user_cache[cache_key]
if is_cache_valid(cache_entry):
return cache_entry['data']
else:
# Remove expired entry
del user_cache[cache_key]
return None
def cache_user_data(user_id: str, data: Dict[str, Any]) -> None:
"""Cache user data with timestamp"""
cache_key = get_cache_key(user_id)
user_cache[cache_key] = {
'data': data,
'timestamp': time.time()
}
print(f"[CACHE] Cached user data for user: {user_id}")
def get_cached_route(user_message: str) -> Optional[str]:
"""Get cached routing decision"""
# Create a simple hash of the message for caching
import hashlib
message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest()
if message_hash in routing_cache:
cache_entry = routing_cache[message_hash]
if is_cache_valid(cache_entry, ROUTING_CACHE_DURATION):
return cache_entry['route']
else:
del routing_cache[message_hash]
return None
def cache_route(user_message: str, route: str) -> None:
"""Cache routing decision"""
import hashlib
message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest()
routing_cache[message_hash] = {
'route': route,
'timestamp': time.time()
}
print(f"[CACHE] Cached route '{route}' for message hash: {message_hash[:8]}...")
def clear_user_cache(user_id: str = None) -> None:
"""Clear cache for specific user or all users"""
global user_cache
if user_id:
cache_key = get_cache_key(user_id)
user_cache.pop(cache_key, None)
print(f"[CACHE] Cleared cache for user: {user_id}")
else:
user_cache.clear()
print("[CACHE] Cleared all user cache")
def get_cache_stats() -> Dict[str, Any]:
"""Get cache statistics"""
current_time = time.time()
valid_user_entries = 0
expired_user_entries = 0
valid_routing_entries = 0
expired_routing_entries = 0
# User cache stats
for entry in user_cache.values():
if current_time - entry['timestamp'] < CACHE_DURATION:
valid_user_entries += 1
else:
expired_user_entries += 1
# Routing cache stats
for entry in routing_cache.values():
if current_time - entry['timestamp'] < ROUTING_CACHE_DURATION:
valid_routing_entries += 1
else:
expired_routing_entries += 1
return {
"user_cache": {
"total_entries": len(user_cache),
"valid_entries": valid_user_entries,
"expired_entries": expired_user_entries,
"cache_duration_seconds": CACHE_DURATION
},
"routing_cache": {
"total_entries": len(routing_cache),
"valid_entries": valid_routing_entries,
"expired_entries": expired_routing_entries,
"cache_duration_seconds": ROUTING_CACHE_DURATION
}
}
def cleanup_expired_cache() -> Dict[str, int]:
"""Remove expired cache entries and return count removed"""
global user_cache, routing_cache
current_time = time.time()
# Clean user cache
expired_user_keys = []
for key, entry in user_cache.items():
if current_time - entry['timestamp'] >= CACHE_DURATION:
expired_user_keys.append(key)
for key in expired_user_keys:
del user_cache[key]
# Clean routing cache
expired_routing_keys = []
for key, entry in routing_cache.items():
if current_time - entry['timestamp'] >= ROUTING_CACHE_DURATION:
expired_routing_keys.append(key)
for key in expired_routing_keys:
del routing_cache[key]
if expired_user_keys or expired_routing_keys:
print(f"[CACHE] Cleaned up {len(expired_user_keys)} user entries and {len(expired_routing_keys)} routing entries")
return {
"user_entries_removed": len(expired_user_keys),
"routing_entries_removed": len(expired_routing_keys)
}
|