Spaces:
Sleeping
Sleeping
# What is this? | |
## Allocates dynamic tpm/rpm quota for a project based on current traffic | |
## Tracks num active projects per minute | |
import asyncio | |
import os | |
from typing import List, Literal, Optional, Tuple, Union | |
from fastapi import HTTPException | |
import litellm | |
from litellm import ModelResponse, Router | |
from litellm._logging import verbose_proxy_logger | |
from litellm.caching.caching import DualCache | |
from litellm.integrations.custom_logger import CustomLogger | |
from litellm.proxy._types import UserAPIKeyAuth | |
from litellm.types.router import ModelGroupInfo | |
from litellm.utils import get_utc_datetime | |
class DynamicRateLimiterCache: | |
""" | |
Thin wrapper on DualCache for this file. | |
Track number of active projects calling a model. | |
""" | |
def __init__(self, cache: DualCache) -> None: | |
self.cache = cache | |
self.ttl = 60 # 1 min ttl | |
async def async_get_cache(self, model: str) -> Optional[int]: | |
dt = get_utc_datetime() | |
current_minute = dt.strftime("%H-%M") | |
key_name = "{}:{}".format(current_minute, model) | |
_response = await self.cache.async_get_cache(key=key_name) | |
response: Optional[int] = None | |
if _response is not None: | |
response = len(_response) | |
return response | |
async def async_set_cache_sadd(self, model: str, value: List): | |
""" | |
Add value to set. | |
Parameters: | |
- model: str, the name of the model group | |
- value: str, the team id | |
Returns: | |
- None | |
Raises: | |
- Exception, if unable to connect to cache client (if redis caching enabled) | |
""" | |
try: | |
dt = get_utc_datetime() | |
current_minute = dt.strftime("%H-%M") | |
key_name = "{}:{}".format(current_minute, model) | |
await self.cache.async_set_cache_sadd( | |
key=key_name, value=value, ttl=self.ttl | |
) | |
except Exception as e: | |
verbose_proxy_logger.exception( | |
"litellm.proxy.hooks.dynamic_rate_limiter.py::async_set_cache_sadd(): Exception occured - {}".format( | |
str(e) | |
) | |
) | |
raise e | |
class _PROXY_DynamicRateLimitHandler(CustomLogger): | |
# Class variables or attributes | |
def __init__(self, internal_usage_cache: DualCache): | |
self.internal_usage_cache = DynamicRateLimiterCache(cache=internal_usage_cache) | |
def update_variables(self, llm_router: Router): | |
self.llm_router = llm_router | |
async def check_available_usage( | |
self, model: str, priority: Optional[str] = None | |
) -> Tuple[ | |
Optional[int], Optional[int], Optional[int], Optional[int], Optional[int] | |
]: | |
""" | |
For a given model, get its available tpm | |
Params: | |
- model: str, the name of the model in the router model_list | |
- priority: Optional[str], the priority for the request. | |
Returns | |
- Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects] | |
- available_tpm: int or null - always 0 or positive. | |
- available_tpm: int or null - always 0 or positive. | |
- remaining_model_tpm: int or null. If available tpm is int, then this will be too. | |
- remaining_model_rpm: int or null. If available rpm is int, then this will be too. | |
- active_projects: int or null | |
""" | |
try: | |
weight: float = 1 | |
if ( | |
litellm.priority_reservation is None | |
or priority not in litellm.priority_reservation | |
): | |
verbose_proxy_logger.error( | |
"Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( | |
priority, litellm.priority_reservation | |
) | |
) | |
elif priority is not None and litellm.priority_reservation is not None: | |
if os.getenv("LITELLM_LICENSE", None) is None: | |
verbose_proxy_logger.error( | |
"PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." | |
) | |
else: | |
weight = litellm.priority_reservation[priority] | |
active_projects = await self.internal_usage_cache.async_get_cache( | |
model=model | |
) | |
( | |
current_model_tpm, | |
current_model_rpm, | |
) = await self.llm_router.get_model_group_usage(model_group=model) | |
model_group_info: Optional[ | |
ModelGroupInfo | |
] = self.llm_router.get_model_group_info(model_group=model) | |
total_model_tpm: Optional[int] = None | |
total_model_rpm: Optional[int] = None | |
if model_group_info is not None: | |
if model_group_info.tpm is not None: | |
total_model_tpm = model_group_info.tpm | |
if model_group_info.rpm is not None: | |
total_model_rpm = model_group_info.rpm | |
remaining_model_tpm: Optional[int] = None | |
if total_model_tpm is not None and current_model_tpm is not None: | |
remaining_model_tpm = total_model_tpm - current_model_tpm | |
elif total_model_tpm is not None: | |
remaining_model_tpm = total_model_tpm | |
remaining_model_rpm: Optional[int] = None | |
if total_model_rpm is not None and current_model_rpm is not None: | |
remaining_model_rpm = total_model_rpm - current_model_rpm | |
elif total_model_rpm is not None: | |
remaining_model_rpm = total_model_rpm | |
available_tpm: Optional[int] = None | |
if remaining_model_tpm is not None: | |
if active_projects is not None: | |
available_tpm = int(remaining_model_tpm * weight / active_projects) | |
else: | |
available_tpm = int(remaining_model_tpm * weight) | |
if available_tpm is not None and available_tpm < 0: | |
available_tpm = 0 | |
available_rpm: Optional[int] = None | |
if remaining_model_rpm is not None: | |
if active_projects is not None: | |
available_rpm = int(remaining_model_rpm * weight / active_projects) | |
else: | |
available_rpm = int(remaining_model_rpm * weight) | |
if available_rpm is not None and available_rpm < 0: | |
available_rpm = 0 | |
return ( | |
available_tpm, | |
available_rpm, | |
remaining_model_tpm, | |
remaining_model_rpm, | |
active_projects, | |
) | |
except Exception as e: | |
verbose_proxy_logger.exception( | |
"litellm.proxy.hooks.dynamic_rate_limiter.py::check_available_usage: Exception occurred - {}".format( | |
str(e) | |
) | |
) | |
return None, None, None, None, None | |
async def async_pre_call_hook( | |
self, | |
user_api_key_dict: UserAPIKeyAuth, | |
cache: DualCache, | |
data: dict, | |
call_type: Literal[ | |
"completion", | |
"text_completion", | |
"embeddings", | |
"image_generation", | |
"moderation", | |
"audio_transcription", | |
"pass_through_endpoint", | |
"rerank", | |
], | |
) -> Optional[ | |
Union[Exception, str, dict] | |
]: # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm | |
""" | |
- For a model group | |
- Check if tpm/rpm available | |
- Raise RateLimitError if no tpm/rpm available | |
""" | |
if "model" in data: | |
key_priority: Optional[str] = user_api_key_dict.metadata.get( | |
"priority", None | |
) | |
( | |
available_tpm, | |
available_rpm, | |
model_tpm, | |
model_rpm, | |
active_projects, | |
) = await self.check_available_usage( | |
model=data["model"], priority=key_priority | |
) | |
### CHECK TPM ### | |
if available_tpm is not None and available_tpm == 0: | |
raise HTTPException( | |
status_code=429, | |
detail={ | |
"error": "Key={} over available TPM={}. Model TPM={}, Active keys={}".format( | |
user_api_key_dict.api_key, | |
available_tpm, | |
model_tpm, | |
active_projects, | |
) | |
}, | |
) | |
### CHECK RPM ### | |
elif available_rpm is not None and available_rpm == 0: | |
raise HTTPException( | |
status_code=429, | |
detail={ | |
"error": "Key={} over available RPM={}. Model RPM={}, Active keys={}".format( | |
user_api_key_dict.api_key, | |
available_rpm, | |
model_rpm, | |
active_projects, | |
) | |
}, | |
) | |
elif available_rpm is not None or available_tpm is not None: | |
## UPDATE CACHE WITH ACTIVE PROJECT | |
asyncio.create_task( | |
self.internal_usage_cache.async_set_cache_sadd( # this is a set | |
model=data["model"], # type: ignore | |
value=[user_api_key_dict.token or "default_key"], | |
) | |
) | |
return None | |
async def async_post_call_success_hook( | |
self, data: dict, user_api_key_dict: UserAPIKeyAuth, response | |
): | |
try: | |
if isinstance(response, ModelResponse): | |
model_info = self.llm_router.get_model_info( | |
id=response._hidden_params["model_id"] | |
) | |
assert ( | |
model_info is not None | |
), "Model info for model with id={} is None".format( | |
response._hidden_params["model_id"] | |
) | |
key_priority: Optional[str] = user_api_key_dict.metadata.get( | |
"priority", None | |
) | |
( | |
available_tpm, | |
available_rpm, | |
model_tpm, | |
model_rpm, | |
active_projects, | |
) = await self.check_available_usage( | |
model=model_info["model_name"], priority=key_priority | |
) | |
response._hidden_params[ | |
"additional_headers" | |
] = { # Add additional response headers - easier debugging | |
"x-litellm-model_group": model_info["model_name"], | |
"x-ratelimit-remaining-litellm-project-tokens": available_tpm, | |
"x-ratelimit-remaining-litellm-project-requests": available_rpm, | |
"x-ratelimit-remaining-model-tokens": model_tpm, | |
"x-ratelimit-remaining-model-requests": model_rpm, | |
"x-ratelimit-current-active-projects": active_projects, | |
} | |
return response | |
return await super().async_post_call_success_hook( | |
data=data, | |
user_api_key_dict=user_api_key_dict, | |
response=response, | |
) | |
except Exception as e: | |
verbose_proxy_logger.exception( | |
"litellm.proxy.hooks.dynamic_rate_limiter.py::async_post_call_success_hook(): Exception occured - {}".format( | |
str(e) | |
) | |
) | |
return response | |