Spaces:
Sleeping
Sleeping
# What this does? | |
## Gets a key's redis cache, and store it in memory for 1 minute. | |
## This reduces the number of REDIS GET requests made during high-traffic by the proxy. | |
### [BETA] this is in Beta. And might change. | |
import traceback | |
from typing import Literal, Optional | |
from fastapi import HTTPException | |
import litellm | |
from litellm._logging import verbose_proxy_logger | |
from litellm.caching.caching import DualCache, InMemoryCache, RedisCache | |
from litellm.integrations.custom_logger import CustomLogger | |
from litellm.proxy._types import UserAPIKeyAuth | |
class _PROXY_BatchRedisRequests(CustomLogger): | |
# Class variables or attributes | |
in_memory_cache: Optional[InMemoryCache] = None | |
def __init__(self): | |
if litellm.cache is not None: | |
litellm.cache.async_get_cache = ( | |
self.async_get_cache | |
) # map the litellm 'get_cache' function to our custom function | |
def print_verbose( | |
self, print_statement, debug_level: Literal["INFO", "DEBUG"] = "DEBUG" | |
): | |
if debug_level == "DEBUG": | |
verbose_proxy_logger.debug(print_statement) | |
elif debug_level == "INFO": | |
verbose_proxy_logger.debug(print_statement) | |
if litellm.set_verbose is True: | |
print(print_statement) # noqa | |
async def async_pre_call_hook( | |
self, | |
user_api_key_dict: UserAPIKeyAuth, | |
cache: DualCache, | |
data: dict, | |
call_type: str, | |
): | |
try: | |
""" | |
Get the user key | |
Check if a key starting with `litellm:<api_key>:<call_type:` exists in-memory | |
If no, then get relevant cache from redis | |
""" | |
api_key = user_api_key_dict.api_key | |
cache_key_name = f"litellm:{api_key}:{call_type}" | |
self.in_memory_cache = cache.in_memory_cache | |
key_value_dict = {} | |
in_memory_cache_exists = False | |
for key in cache.in_memory_cache.cache_dict.keys(): | |
if isinstance(key, str) and key.startswith(cache_key_name): | |
in_memory_cache_exists = True | |
if in_memory_cache_exists is False and litellm.cache is not None: | |
""" | |
- Check if `litellm.Cache` is redis | |
- Get the relevant values | |
""" | |
if litellm.cache.type is not None and isinstance( | |
litellm.cache.cache, RedisCache | |
): | |
# Initialize an empty list to store the keys | |
keys = [] | |
self.print_verbose(f"cache_key_name: {cache_key_name}") | |
# Use the SCAN iterator to fetch keys matching the pattern | |
keys = await litellm.cache.cache.async_scan_iter( | |
pattern=cache_key_name, count=100 | |
) | |
# If you need the truly "last" based on time or another criteria, | |
# ensure your key naming or storage strategy allows this determination | |
# Here you would sort or filter the keys as needed based on your strategy | |
self.print_verbose(f"redis keys: {keys}") | |
if len(keys) > 0: | |
key_value_dict = ( | |
await litellm.cache.cache.async_batch_get_cache( | |
key_list=keys | |
) | |
) | |
## Add to cache | |
if len(key_value_dict.items()) > 0: | |
await cache.in_memory_cache.async_set_cache_pipeline( | |
cache_list=list(key_value_dict.items()), ttl=60 | |
) | |
## Set cache namespace if it's a miss | |
data["metadata"]["redis_namespace"] = cache_key_name | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
verbose_proxy_logger.error( | |
"litellm.proxy.hooks.batch_redis_get.py::async_pre_call_hook(): Exception occured - {}".format( | |
str(e) | |
) | |
) | |
verbose_proxy_logger.debug(traceback.format_exc()) | |
async def async_get_cache(self, *args, **kwargs): | |
""" | |
- Check if the cache key is in-memory | |
- Else: | |
- add missing cache key from REDIS | |
- update in-memory cache | |
- return redis cache request | |
""" | |
try: # never block execution | |
cache_key: Optional[str] = None | |
if "cache_key" in kwargs: | |
cache_key = kwargs["cache_key"] | |
elif litellm.cache is not None: | |
cache_key = litellm.cache.get_cache_key( | |
*args, **kwargs | |
) # returns "<cache_key_name>:<hash>" - we pass redis_namespace in async_pre_call_hook. Done to avoid rewriting the async_set_cache logic | |
if ( | |
cache_key is not None | |
and self.in_memory_cache is not None | |
and litellm.cache is not None | |
): | |
cache_control_args = kwargs.get("cache", {}) | |
max_age = cache_control_args.get( | |
"s-max-age", cache_control_args.get("s-maxage", float("inf")) | |
) | |
cached_result = self.in_memory_cache.get_cache( | |
cache_key, *args, **kwargs | |
) | |
if cached_result is None: | |
cached_result = await litellm.cache.cache.async_get_cache( | |
cache_key, *args, **kwargs | |
) | |
if cached_result is not None: | |
await self.in_memory_cache.async_set_cache( | |
cache_key, cached_result, ttl=60 | |
) | |
return litellm.cache._get_cache_logic( | |
cached_result=cached_result, max_age=max_age | |
) | |
except Exception: | |
return None | |