Spaces:
Sleeping
Sleeping
#### What this does #### | |
# Class for sending Slack Alerts # | |
import asyncio | |
import datetime | |
import os | |
import random | |
import time | |
from datetime import timedelta | |
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union | |
from openai import APIError | |
import litellm | |
import litellm.litellm_core_utils | |
import litellm.litellm_core_utils.litellm_logging | |
import litellm.types | |
from litellm._logging import verbose_logger, verbose_proxy_logger | |
from litellm.caching.caching import DualCache | |
from litellm.constants import HOURS_IN_A_DAY | |
from litellm.integrations.custom_batch_logger import CustomBatchLogger | |
from litellm.litellm_core_utils.duration_parser import duration_in_seconds | |
from litellm.litellm_core_utils.exception_mapping_utils import ( | |
_add_key_name_and_team_to_alert, | |
) | |
from litellm.llms.custom_httpx.http_handler import ( | |
get_async_httpx_client, | |
httpxSpecialProvider, | |
) | |
from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent | |
from litellm.types.integrations.slack_alerting import * | |
from ..email_templates.templates import * | |
from .batching_handler import send_to_webhook, squash_payloads | |
from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables | |
if TYPE_CHECKING: | |
from litellm.router import Router as _Router | |
Router = _Router | |
else: | |
Router = Any | |
class SlackAlerting(CustomBatchLogger): | |
""" | |
Class for sending Slack Alerts | |
""" | |
# Class variables or attributes | |
def __init__( | |
self, | |
internal_usage_cache: Optional[DualCache] = None, | |
alerting_threshold: Optional[ | |
float | |
] = None, # threshold for slow / hanging llm responses (in seconds) | |
alerting: Optional[List] = [], | |
alert_types: List[AlertType] = DEFAULT_ALERT_TYPES, | |
alert_to_webhook_url: Optional[ | |
Dict[AlertType, Union[List[str], str]] | |
] = None, # if user wants to separate alerts to diff channels | |
alerting_args={}, | |
default_webhook_url: Optional[str] = None, | |
**kwargs, | |
): | |
if alerting_threshold is None: | |
alerting_threshold = 300 | |
self.alerting_threshold = alerting_threshold | |
self.alerting = alerting | |
self.alert_types = alert_types | |
self.internal_usage_cache = internal_usage_cache or DualCache() | |
self.async_http_handler = get_async_httpx_client( | |
llm_provider=httpxSpecialProvider.LoggingCallback | |
) | |
self.alert_to_webhook_url = process_slack_alerting_variables( | |
alert_to_webhook_url=alert_to_webhook_url | |
) | |
self.is_running = False | |
self.alerting_args = SlackAlertingArgs(**alerting_args) | |
self.default_webhook_url = default_webhook_url | |
self.flush_lock = asyncio.Lock() | |
super().__init__(**kwargs, flush_lock=self.flush_lock) | |
def update_values( | |
self, | |
alerting: Optional[List] = None, | |
alerting_threshold: Optional[float] = None, | |
alert_types: Optional[List[AlertType]] = None, | |
alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None, | |
alerting_args: Optional[Dict] = None, | |
llm_router: Optional[Router] = None, | |
): | |
if alerting is not None: | |
self.alerting = alerting | |
asyncio.create_task(self.periodic_flush()) | |
if alerting_threshold is not None: | |
self.alerting_threshold = alerting_threshold | |
if alert_types is not None: | |
self.alert_types = alert_types | |
if alerting_args is not None: | |
self.alerting_args = SlackAlertingArgs(**alerting_args) | |
if alert_to_webhook_url is not None: | |
# update the dict | |
if self.alert_to_webhook_url is None: | |
self.alert_to_webhook_url = process_slack_alerting_variables( | |
alert_to_webhook_url=alert_to_webhook_url | |
) | |
else: | |
_new_values = ( | |
process_slack_alerting_variables( | |
alert_to_webhook_url=alert_to_webhook_url | |
) | |
or {} | |
) | |
self.alert_to_webhook_url.update(_new_values) | |
if llm_router is not None: | |
self.llm_router = llm_router | |
async def deployment_in_cooldown(self): | |
pass | |
async def deployment_removed_from_cooldown(self): | |
pass | |
def _all_possible_alert_types(self): | |
# used by the UI to show all supported alert types | |
# Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select | |
# return list of all values AlertType enum | |
return list(AlertType) | |
def _response_taking_too_long_callback_helper( | |
self, | |
kwargs, # kwargs to completion | |
start_time, | |
end_time, # start/end time | |
): | |
try: | |
time_difference = end_time - start_time | |
# Convert the timedelta to float (in seconds) | |
time_difference_float = time_difference.total_seconds() | |
litellm_params = kwargs.get("litellm_params", {}) | |
model = kwargs.get("model", "") | |
api_base = litellm.get_api_base(model=model, optional_params=litellm_params) | |
messages = kwargs.get("messages", None) | |
# if messages does not exist fallback to "input" | |
if messages is None: | |
messages = kwargs.get("input", None) | |
# only use first 100 chars for alerting | |
_messages = str(messages)[:100] | |
return time_difference_float, model, api_base, _messages | |
except Exception as e: | |
raise e | |
def _get_deployment_latencies_to_alert(self, metadata=None): | |
if metadata is None: | |
return None | |
if "_latency_per_deployment" in metadata: | |
# Translate model_id to -> api_base | |
# _latency_per_deployment is a dictionary that looks like this: | |
""" | |
_latency_per_deployment: { | |
api_base: 0.01336697916666667 | |
} | |
""" | |
_message_to_send = "" | |
_deployment_latencies = metadata["_latency_per_deployment"] | |
if len(_deployment_latencies) == 0: | |
return None | |
_deployment_latency_map: Optional[dict] = None | |
try: | |
# try sorting deployments by latency | |
_deployment_latencies = sorted( | |
_deployment_latencies.items(), key=lambda x: x[1] | |
) | |
_deployment_latency_map = dict(_deployment_latencies) | |
except Exception: | |
pass | |
if _deployment_latency_map is None: | |
return | |
for api_base, latency in _deployment_latency_map.items(): | |
_message_to_send += f"\n{api_base}: {round(latency,2)}s" | |
_message_to_send = "```" + _message_to_send + "```" | |
return _message_to_send | |
async def response_taking_too_long_callback( | |
self, | |
kwargs, # kwargs to completion | |
completion_response, # response from completion | |
start_time, | |
end_time, # start/end time | |
): | |
if self.alerting is None or self.alert_types is None: | |
return | |
( | |
time_difference_float, | |
model, | |
api_base, | |
messages, | |
) = self._response_taking_too_long_callback_helper( | |
kwargs=kwargs, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
if litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions: | |
messages = "Message not logged. litellm.redact_messages_in_exceptions=True" | |
request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`" | |
slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`" | |
alerting_metadata: dict = {} | |
if time_difference_float > self.alerting_threshold: | |
# add deployment latencies to alert | |
if ( | |
kwargs is not None | |
and "litellm_params" in kwargs | |
and "metadata" in kwargs["litellm_params"] | |
): | |
_metadata: dict = kwargs["litellm_params"]["metadata"] | |
request_info = _add_key_name_and_team_to_alert( | |
request_info=request_info, metadata=_metadata | |
) | |
_deployment_latency_map = self._get_deployment_latencies_to_alert( | |
metadata=_metadata | |
) | |
if _deployment_latency_map is not None: | |
request_info += ( | |
f"\nAvailable Deployment Latencies\n{_deployment_latency_map}" | |
) | |
if "alerting_metadata" in _metadata: | |
alerting_metadata = _metadata["alerting_metadata"] | |
await self.send_alert( | |
message=slow_message + request_info, | |
level="Low", | |
alert_type=AlertType.llm_too_slow, | |
alerting_metadata=alerting_metadata, | |
) | |
async def async_update_daily_reports( | |
self, deployment_metrics: DeploymentMetrics | |
) -> int: | |
""" | |
Store the perf by deployment in cache | |
- Number of failed requests per deployment | |
- Latency / output tokens per deployment | |
'deployment_id:daily_metrics:failed_requests' | |
'deployment_id:daily_metrics:latency_per_output_token' | |
Returns | |
int - count of metrics set (1 - if just latency, 2 - if failed + latency) | |
""" | |
return_val = 0 | |
try: | |
## FAILED REQUESTS ## | |
if deployment_metrics.failed_request: | |
await self.internal_usage_cache.async_increment_cache( | |
key="{}:{}".format( | |
deployment_metrics.id, | |
SlackAlertingCacheKeys.failed_requests_key.value, | |
), | |
value=1, | |
parent_otel_span=None, # no attached request, this is a background operation | |
) | |
return_val += 1 | |
## LATENCY ## | |
if deployment_metrics.latency_per_output_token is not None: | |
await self.internal_usage_cache.async_increment_cache( | |
key="{}:{}".format( | |
deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value | |
), | |
value=deployment_metrics.latency_per_output_token, | |
parent_otel_span=None, # no attached request, this is a background operation | |
) | |
return_val += 1 | |
return return_val | |
except Exception: | |
return 0 | |
async def send_daily_reports(self, router) -> bool: # noqa: PLR0915 | |
""" | |
Send a daily report on: | |
- Top 5 deployments with most failed requests | |
- Top 5 slowest deployments (normalized by latency/output tokens) | |
Get the value from redis cache (if available) or in-memory and send it | |
Cleanup: | |
- reset values in cache -> prevent memory leak | |
Returns: | |
True -> if successfuly sent | |
False -> if not sent | |
""" | |
ids = router.get_model_ids() | |
# get keys | |
failed_request_keys = [ | |
"{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value) | |
for id in ids | |
] | |
latency_keys = [ | |
"{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids | |
] | |
combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls | |
combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache( | |
keys=combined_metrics_keys | |
) # [1, 2, None, ..] | |
if combined_metrics_values is None: | |
return False | |
all_none = True | |
for val in combined_metrics_values: | |
if val is not None and val > 0: | |
all_none = False | |
break | |
if all_none: | |
return False | |
failed_request_values = combined_metrics_values[ | |
: len(failed_request_keys) | |
] # # [1, 2, None, ..] | |
latency_values = combined_metrics_values[len(failed_request_keys) :] | |
# find top 5 failed | |
## Replace None values with a placeholder value (-1 in this case) | |
placeholder_value = 0 | |
replaced_failed_values = [ | |
value if value is not None else placeholder_value | |
for value in failed_request_values | |
] | |
## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values) | |
top_5_failed = sorted( | |
range(len(replaced_failed_values)), | |
key=lambda i: replaced_failed_values[i], | |
reverse=True, | |
)[:5] | |
top_5_failed = [ | |
index for index in top_5_failed if replaced_failed_values[index] > 0 | |
] | |
# find top 5 slowest | |
# Replace None values with a placeholder value (-1 in this case) | |
placeholder_value = 0 | |
replaced_slowest_values = [ | |
value if value is not None else placeholder_value | |
for value in latency_values | |
] | |
# Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values) | |
top_5_slowest = sorted( | |
range(len(replaced_slowest_values)), | |
key=lambda i: replaced_slowest_values[i], | |
reverse=True, | |
)[:5] | |
top_5_slowest = [ | |
index for index in top_5_slowest if replaced_slowest_values[index] > 0 | |
] | |
# format alert -> return the litellm model name + api base | |
message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics 📈: \n\n" | |
message += "\n\n*❗️ Top Deployments with Most Failed Requests:*\n\n" | |
if not top_5_failed: | |
message += "\tNone\n" | |
for i in range(len(top_5_failed)): | |
key = failed_request_keys[top_5_failed[i]].split(":")[0] | |
_deployment = router.get_model_info(key) | |
if isinstance(_deployment, dict): | |
deployment_name = _deployment["litellm_params"].get("model", "") | |
else: | |
return False | |
api_base = litellm.get_api_base( | |
model=deployment_name, | |
optional_params=( | |
_deployment["litellm_params"] if _deployment is not None else {} | |
), | |
) | |
if api_base is None: | |
api_base = "" | |
value = replaced_failed_values[top_5_failed[i]] | |
message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n" | |
message += "\n\n*😅 Top Slowest Deployments:*\n\n" | |
if not top_5_slowest: | |
message += "\tNone\n" | |
for i in range(len(top_5_slowest)): | |
key = latency_keys[top_5_slowest[i]].split(":")[0] | |
_deployment = router.get_model_info(key) | |
if _deployment is not None: | |
deployment_name = _deployment["litellm_params"].get("model", "") | |
else: | |
deployment_name = "" | |
api_base = litellm.get_api_base( | |
model=deployment_name, | |
optional_params=( | |
_deployment["litellm_params"] if _deployment is not None else {} | |
), | |
) | |
value = round(replaced_slowest_values[top_5_slowest[i]], 3) | |
message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n" | |
# cache cleanup -> reset values to 0 | |
latency_cache_keys = [(key, 0) for key in latency_keys] | |
failed_request_cache_keys = [(key, 0) for key in failed_request_keys] | |
combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys | |
await self.internal_usage_cache.async_set_cache_pipeline( | |
cache_list=combined_metrics_cache_keys | |
) | |
message += f"\n\nNext Run is at: `{time.time() + self.alerting_args.daily_report_frequency}`s" | |
# send alert | |
await self.send_alert( | |
message=message, | |
level="Low", | |
alert_type=AlertType.daily_reports, | |
alerting_metadata={}, | |
) | |
return True | |
async def response_taking_too_long( | |
self, | |
start_time: Optional[datetime.datetime] = None, | |
end_time: Optional[datetime.datetime] = None, | |
type: Literal["hanging_request", "slow_response"] = "hanging_request", | |
request_data: Optional[dict] = None, | |
): | |
if self.alerting is None or self.alert_types is None: | |
return | |
model: str = "" | |
if request_data is not None: | |
model = request_data.get("model", "") | |
messages = request_data.get("messages", None) | |
if messages is None: | |
# if messages does not exist fallback to "input" | |
messages = request_data.get("input", None) | |
# try casting messages to str and get the first 100 characters, else mark as None | |
try: | |
messages = str(messages) | |
messages = messages[:100] | |
except Exception: | |
messages = "" | |
if ( | |
litellm.turn_off_message_logging | |
or litellm.redact_messages_in_exceptions | |
): | |
messages = ( | |
"Message not logged. litellm.redact_messages_in_exceptions=True" | |
) | |
request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`" | |
else: | |
request_info = "" | |
if type == "hanging_request": | |
await asyncio.sleep( | |
self.alerting_threshold | |
) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests | |
alerting_metadata: dict = {} | |
if await self._request_is_completed(request_data=request_data) is True: | |
return | |
if request_data is not None: | |
if request_data.get("deployment", None) is not None and isinstance( | |
request_data["deployment"], dict | |
): | |
_api_base = litellm.get_api_base( | |
model=model, | |
optional_params=request_data["deployment"].get( | |
"litellm_params", {} | |
), | |
) | |
if _api_base is None: | |
_api_base = "" | |
request_info += f"\nAPI Base: {_api_base}" | |
elif request_data.get("metadata", None) is not None and isinstance( | |
request_data["metadata"], dict | |
): | |
# In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data`` | |
# in that case we fallback to the api base set in the request metadata | |
_metadata: dict = request_data["metadata"] | |
_api_base = _metadata.get("api_base", "") | |
request_info = _add_key_name_and_team_to_alert( | |
request_info=request_info, metadata=_metadata | |
) | |
if _api_base is None: | |
_api_base = "" | |
if "alerting_metadata" in _metadata: | |
alerting_metadata = _metadata["alerting_metadata"] | |
request_info += f"\nAPI Base: `{_api_base}`" | |
# only alert hanging responses if they have not been marked as success | |
alerting_message = ( | |
f"`Requests are hanging - {self.alerting_threshold}s+ request time`" | |
) | |
if "langfuse" in litellm.success_callback: | |
langfuse_url = await _add_langfuse_trace_id_to_alert( | |
request_data=request_data, | |
) | |
if langfuse_url is not None: | |
request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url) | |
# add deployment latencies to alert | |
_deployment_latency_map = self._get_deployment_latencies_to_alert( | |
metadata=request_data.get("metadata", {}) | |
) | |
if _deployment_latency_map is not None: | |
request_info += f"\nDeployment Latencies\n{_deployment_latency_map}" | |
await self.send_alert( | |
message=alerting_message + request_info, | |
level="Medium", | |
alert_type=AlertType.llm_requests_hanging, | |
alerting_metadata=alerting_metadata, | |
) | |
async def failed_tracking_alert(self, error_message: str, failing_model: str): | |
""" | |
Raise alert when tracking failed for specific model | |
Args: | |
error_message (str): Error message | |
failing_model (str): Model that failed tracking | |
""" | |
if self.alerting is None or self.alert_types is None: | |
# do nothing if alerting is not switched on | |
return | |
if "failed_tracking_spend" not in self.alert_types: | |
return | |
_cache: DualCache = self.internal_usage_cache | |
message = "Failed Tracking Cost for " + error_message | |
_cache_key = "budget_alerts:failed_tracking:{}".format(failing_model) | |
result = await _cache.async_get_cache(key=_cache_key) | |
if result is None: | |
await self.send_alert( | |
message=message, | |
level="High", | |
alert_type=AlertType.failed_tracking_spend, | |
alerting_metadata={}, | |
) | |
await _cache.async_set_cache( | |
key=_cache_key, | |
value="SENT", | |
ttl=self.alerting_args.budget_alert_ttl, | |
) | |
async def budget_alerts( # noqa: PLR0915 | |
self, | |
type: Literal[ | |
"token_budget", | |
"soft_budget", | |
"user_budget", | |
"team_budget", | |
"proxy_budget", | |
"projected_limit_exceeded", | |
], | |
user_info: CallInfo, | |
): | |
## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727 | |
# - Alert once within 24hr period | |
# - Cache this information | |
# - Don't re-alert, if alert already sent | |
_cache: DualCache = self.internal_usage_cache | |
if self.alerting is None or self.alert_types is None: | |
# do nothing if alerting is not switched on | |
return | |
if "budget_alerts" not in self.alert_types: | |
return | |
_id: Optional[str] = "default_id" # used for caching | |
user_info_json = user_info.model_dump(exclude_none=True) | |
user_info_str = self._get_user_info_str(user_info) | |
event: Optional[ | |
Literal[ | |
"budget_crossed", | |
"threshold_crossed", | |
"projected_limit_exceeded", | |
"soft_budget_crossed", | |
] | |
] = None | |
event_group: Optional[ | |
Literal["internal_user", "team", "key", "proxy", "customer"] | |
] = None | |
event_message: str = "" | |
webhook_event: Optional[WebhookEvent] = None | |
if type == "proxy_budget": | |
event_group = "proxy" | |
event_message += "Proxy Budget: " | |
elif type == "soft_budget": | |
event_group = "proxy" | |
event_message += "Soft Budget Crossed: " | |
elif type == "user_budget": | |
event_group = "internal_user" | |
event_message += "User Budget: " | |
_id = user_info.user_id or _id | |
elif type == "team_budget": | |
event_group = "team" | |
event_message += "Team Budget: " | |
_id = user_info.team_id or _id | |
elif type == "token_budget": | |
event_group = "key" | |
event_message += "Key Budget: " | |
_id = user_info.token | |
elif type == "projected_limit_exceeded": | |
event_group = "key" | |
event_message += "Key Budget: Projected Limit Exceeded" | |
event = "projected_limit_exceeded" | |
_id = user_info.token | |
# percent of max_budget left to spend | |
if user_info.max_budget is None and user_info.soft_budget is None: | |
return | |
percent_left: float = 0 | |
if user_info.max_budget is not None: | |
if user_info.max_budget > 0: | |
percent_left = ( | |
user_info.max_budget - user_info.spend | |
) / user_info.max_budget | |
# check if crossed budget | |
if user_info.max_budget is not None: | |
if user_info.spend >= user_info.max_budget: | |
event = "budget_crossed" | |
event_message += ( | |
f"Budget Crossed\n Total Budget:`{user_info.max_budget}`" | |
) | |
elif percent_left <= SLACK_ALERTING_THRESHOLD_5_PERCENT: | |
event = "threshold_crossed" | |
event_message += "5% Threshold Crossed " | |
elif percent_left <= SLACK_ALERTING_THRESHOLD_15_PERCENT: | |
event = "threshold_crossed" | |
event_message += "15% Threshold Crossed" | |
elif user_info.soft_budget is not None: | |
if user_info.spend >= user_info.soft_budget: | |
event = "soft_budget_crossed" | |
if event is not None and event_group is not None: | |
_cache_key = "budget_alerts:{}:{}".format(event, _id) | |
result = await _cache.async_get_cache(key=_cache_key) | |
if result is None: | |
webhook_event = WebhookEvent( | |
event=event, | |
event_group=event_group, | |
event_message=event_message, | |
**user_info_json, | |
) | |
await self.send_alert( | |
message=event_message + "\n\n" + user_info_str, | |
level="High", | |
alert_type=AlertType.budget_alerts, | |
user_info=webhook_event, | |
alerting_metadata={}, | |
) | |
await _cache.async_set_cache( | |
key=_cache_key, | |
value="SENT", | |
ttl=self.alerting_args.budget_alert_ttl, | |
) | |
return | |
return | |
def _get_user_info_str(self, user_info: CallInfo) -> str: | |
""" | |
Create a standard message for a budget alert | |
""" | |
_all_fields_as_dict = user_info.model_dump(exclude_none=True) | |
_all_fields_as_dict.pop("token") | |
msg = "" | |
for k, v in _all_fields_as_dict.items(): | |
msg += f"*{k}:* `{v}`\n" | |
return msg | |
async def customer_spend_alert( | |
self, | |
token: Optional[str], | |
key_alias: Optional[str], | |
end_user_id: Optional[str], | |
response_cost: Optional[float], | |
max_budget: Optional[float], | |
): | |
if ( | |
self.alerting is not None | |
and "webhook" in self.alerting | |
and end_user_id is not None | |
and token is not None | |
and response_cost is not None | |
): | |
# log customer spend | |
event = WebhookEvent( | |
spend=response_cost, | |
max_budget=max_budget, | |
token=token, | |
customer_id=end_user_id, | |
user_id=None, | |
team_id=None, | |
user_email=None, | |
key_alias=key_alias, | |
projected_exceeded_date=None, | |
projected_spend=None, | |
event="spend_tracked", | |
event_group="customer", | |
event_message="Customer spend tracked. Customer={}, spend={}".format( | |
end_user_id, response_cost | |
), | |
) | |
await self.send_webhook_alert(webhook_event=event) | |
def _count_outage_alerts(self, alerts: List[int]) -> str: | |
""" | |
Parameters: | |
- alerts: List[int] -> list of error codes (either 408 or 500+) | |
Returns: | |
- str -> formatted string. This is an alert message, giving a human-friendly description of the errors. | |
""" | |
error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0} | |
for alert in alerts: | |
if alert == 408: | |
error_breakdown["Timeout Errors"] += 1 | |
elif alert >= 500: | |
error_breakdown["API Errors"] += 1 | |
else: | |
error_breakdown["Unknown Errors"] += 1 | |
error_msg = "" | |
for key, value in error_breakdown.items(): | |
if value > 0: | |
error_msg += "\n{}: {}\n".format(key, value) | |
return error_msg | |
def _outage_alert_msg_factory( | |
self, | |
alert_type: Literal["Major", "Minor"], | |
key: Literal["Model", "Region"], | |
key_val: str, | |
provider: str, | |
api_base: Optional[str], | |
outage_value: BaseOutageModel, | |
) -> str: | |
"""Format an alert message for slack""" | |
headers = {f"{key} Name": key_val, "Provider": provider} | |
if api_base is not None: | |
headers["API Base"] = api_base # type: ignore | |
headers_str = "\n" | |
for k, v in headers.items(): | |
headers_str += f"*{k}:* `{v}`\n" | |
return f"""\n\n | |
*⚠️ {alert_type} Service Outage* | |
{headers_str} | |
*Errors:* | |
{self._count_outage_alerts(alerts=outage_value["alerts"])} | |
*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n | |
""" | |
async def region_outage_alerts( | |
self, | |
exception: APIError, | |
deployment_id: str, | |
) -> None: | |
""" | |
Send slack alert if specific provider region is having an outage. | |
Track for 408 (Timeout) and >=500 Error codes | |
""" | |
## CREATE (PROVIDER+REGION) ID ## | |
if self.llm_router is None: | |
return | |
deployment = self.llm_router.get_deployment(model_id=deployment_id) | |
if deployment is None: | |
return | |
model = deployment.litellm_params.model | |
### GET PROVIDER ### | |
provider = deployment.litellm_params.custom_llm_provider | |
if provider is None: | |
model, provider, _, _ = litellm.get_llm_provider(model=model) | |
### GET REGION ### | |
region_name = deployment.litellm_params.region_name | |
if region_name is None: | |
region_name = litellm.utils._get_model_region( | |
custom_llm_provider=provider, litellm_params=deployment.litellm_params | |
) | |
if region_name is None: | |
return | |
### UNIQUE CACHE KEY ### | |
cache_key = provider + region_name | |
outage_value: Optional[ | |
ProviderRegionOutageModel | |
] = await self.internal_usage_cache.async_get_cache(key=cache_key) | |
if ( | |
getattr(exception, "status_code", None) is None | |
or ( | |
exception.status_code != 408 # type: ignore | |
and exception.status_code < 500 # type: ignore | |
) | |
or self.llm_router is None | |
): | |
return | |
if outage_value is None: | |
_deployment_set = set() | |
_deployment_set.add(deployment_id) | |
outage_value = ProviderRegionOutageModel( | |
provider_region_id=cache_key, | |
alerts=[exception.status_code], # type: ignore | |
minor_alert_sent=False, | |
major_alert_sent=False, | |
last_updated_at=time.time(), | |
deployment_ids=_deployment_set, | |
) | |
## add to cache ## | |
await self.internal_usage_cache.async_set_cache( | |
key=cache_key, | |
value=outage_value, | |
ttl=self.alerting_args.region_outage_alert_ttl, | |
) | |
return | |
if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size: | |
outage_value["alerts"].append(exception.status_code) # type: ignore | |
else: # prevent memory leaks | |
pass | |
_deployment_set = outage_value["deployment_ids"] | |
_deployment_set.add(deployment_id) | |
outage_value["deployment_ids"] = _deployment_set | |
outage_value["last_updated_at"] = time.time() | |
## MINOR OUTAGE ALERT SENT ## | |
if ( | |
outage_value["minor_alert_sent"] is False | |
and len(outage_value["alerts"]) | |
>= self.alerting_args.minor_outage_alert_threshold | |
and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment | |
): | |
msg = self._outage_alert_msg_factory( | |
alert_type="Minor", | |
key="Region", | |
key_val=region_name, | |
api_base=None, | |
outage_value=outage_value, | |
provider=provider, | |
) | |
# send minor alert | |
await self.send_alert( | |
message=msg, | |
level="Medium", | |
alert_type=AlertType.outage_alerts, | |
alerting_metadata={}, | |
) | |
# set to true | |
outage_value["minor_alert_sent"] = True | |
## MAJOR OUTAGE ALERT SENT ## | |
elif ( | |
outage_value["major_alert_sent"] is False | |
and len(outage_value["alerts"]) | |
>= self.alerting_args.major_outage_alert_threshold | |
and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment | |
): | |
msg = self._outage_alert_msg_factory( | |
alert_type="Major", | |
key="Region", | |
key_val=region_name, | |
api_base=None, | |
outage_value=outage_value, | |
provider=provider, | |
) | |
# send minor alert | |
await self.send_alert( | |
message=msg, | |
level="High", | |
alert_type=AlertType.outage_alerts, | |
alerting_metadata={}, | |
) | |
# set to true | |
outage_value["major_alert_sent"] = True | |
## update cache ## | |
await self.internal_usage_cache.async_set_cache( | |
key=cache_key, value=outage_value | |
) | |
async def outage_alerts( | |
self, | |
exception: APIError, | |
deployment_id: str, | |
) -> None: | |
""" | |
Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500). | |
key = model_id | |
value = { | |
- model_id | |
- threshold | |
- alerts [] | |
} | |
ttl = 1hr | |
max_alerts_size = 10 | |
""" | |
try: | |
outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id) # type: ignore | |
if ( | |
getattr(exception, "status_code", None) is None | |
or ( | |
exception.status_code != 408 # type: ignore | |
and exception.status_code < 500 # type: ignore | |
) | |
or self.llm_router is None | |
): | |
return | |
### EXTRACT MODEL DETAILS ### | |
deployment = self.llm_router.get_deployment(model_id=deployment_id) | |
if deployment is None: | |
return | |
model = deployment.litellm_params.model | |
provider = deployment.litellm_params.custom_llm_provider | |
if provider is None: | |
try: | |
model, provider, _, _ = litellm.get_llm_provider(model=model) | |
except Exception: | |
provider = "" | |
api_base = litellm.get_api_base( | |
model=model, optional_params=deployment.litellm_params | |
) | |
if outage_value is None: | |
outage_value = OutageModel( | |
model_id=deployment_id, | |
alerts=[exception.status_code], # type: ignore | |
minor_alert_sent=False, | |
major_alert_sent=False, | |
last_updated_at=time.time(), | |
) | |
## add to cache ## | |
await self.internal_usage_cache.async_set_cache( | |
key=deployment_id, | |
value=outage_value, | |
ttl=self.alerting_args.outage_alert_ttl, | |
) | |
return | |
if ( | |
len(outage_value["alerts"]) | |
< self.alerting_args.max_outage_alert_list_size | |
): | |
outage_value["alerts"].append(exception.status_code) # type: ignore | |
else: # prevent memory leaks | |
pass | |
outage_value["last_updated_at"] = time.time() | |
## MINOR OUTAGE ALERT SENT ## | |
if ( | |
outage_value["minor_alert_sent"] is False | |
and len(outage_value["alerts"]) | |
>= self.alerting_args.minor_outage_alert_threshold | |
): | |
msg = self._outage_alert_msg_factory( | |
alert_type="Minor", | |
key="Model", | |
key_val=model, | |
api_base=api_base, | |
outage_value=outage_value, | |
provider=provider, | |
) | |
# send minor alert | |
await self.send_alert( | |
message=msg, | |
level="Medium", | |
alert_type=AlertType.outage_alerts, | |
alerting_metadata={}, | |
) | |
# set to true | |
outage_value["minor_alert_sent"] = True | |
elif ( | |
outage_value["major_alert_sent"] is False | |
and len(outage_value["alerts"]) | |
>= self.alerting_args.major_outage_alert_threshold | |
): | |
msg = self._outage_alert_msg_factory( | |
alert_type="Major", | |
key="Model", | |
key_val=model, | |
api_base=api_base, | |
outage_value=outage_value, | |
provider=provider, | |
) | |
# send minor alert | |
await self.send_alert( | |
message=msg, | |
level="High", | |
alert_type=AlertType.outage_alerts, | |
alerting_metadata={}, | |
) | |
# set to true | |
outage_value["major_alert_sent"] = True | |
## update cache ## | |
await self.internal_usage_cache.async_set_cache( | |
key=deployment_id, value=outage_value | |
) | |
except Exception: | |
pass | |
async def model_added_alert( | |
self, model_name: str, litellm_model_name: str, passed_model_info: Any | |
): | |
base_model_from_user = getattr(passed_model_info, "base_model", None) | |
model_info = {} | |
base_model = "" | |
if base_model_from_user is not None: | |
model_info = litellm.model_cost.get(base_model_from_user, {}) | |
base_model = f"Base Model: `{base_model_from_user}`\n" | |
else: | |
model_info = litellm.model_cost.get(litellm_model_name, {}) | |
model_info_str = "" | |
for k, v in model_info.items(): | |
if k == "input_cost_per_token" or k == "output_cost_per_token": | |
# when converting to string it should not be 1.63e-06 | |
v = "{:.8f}".format(v) | |
model_info_str += f"{k}: {v}\n" | |
message = f""" | |
*🚅 New Model Added* | |
Model Name: `{model_name}` | |
{base_model} | |
Usage OpenAI Python SDK: | |
``` | |
import openai | |
client = openai.OpenAI( | |
api_key="your_api_key", | |
base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")} | |
) | |
response = client.chat.completions.create( | |
model="{model_name}", # model to send to the proxy | |
messages = [ | |
{{ | |
"role": "user", | |
"content": "this is a test request, write a short poem" | |
}} | |
] | |
) | |
``` | |
Model Info: | |
``` | |
{model_info_str} | |
``` | |
""" | |
alert_val = self.send_alert( | |
message=message, | |
level="Low", | |
alert_type=AlertType.new_model_added, | |
alerting_metadata={}, | |
) | |
if alert_val is not None and asyncio.iscoroutine(alert_val): | |
await alert_val | |
async def model_removed_alert(self, model_name: str): | |
pass | |
async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool: | |
""" | |
Sends structured alert to webhook, if set. | |
Currently only implemented for budget alerts | |
Returns -> True if sent, False if not. | |
Raises Exception | |
- if WEBHOOK_URL is not set | |
""" | |
webhook_url = os.getenv("WEBHOOK_URL", None) | |
if webhook_url is None: | |
raise Exception("Missing webhook_url from environment") | |
payload = webhook_event.model_dump_json() | |
headers = {"Content-type": "application/json"} | |
response = await self.async_http_handler.post( | |
url=webhook_url, | |
headers=headers, | |
data=payload, | |
) | |
if response.status_code == 200: | |
return True | |
else: | |
print("Error sending webhook alert. Error=", response.text) # noqa | |
return False | |
async def _check_if_using_premium_email_feature( | |
self, | |
premium_user: bool, | |
email_logo_url: Optional[str] = None, | |
email_support_contact: Optional[str] = None, | |
): | |
from litellm.proxy.proxy_server import CommonProxyErrors, premium_user | |
if premium_user is not True: | |
if email_logo_url is not None or email_support_contact is not None: | |
raise ValueError( | |
f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}" | |
) | |
return | |
async def send_key_created_or_user_invited_email( | |
self, webhook_event: WebhookEvent | |
) -> bool: | |
try: | |
from litellm.proxy.utils import send_email | |
if self.alerting is None or "email" not in self.alerting: | |
# do nothing if user does not want email alerts | |
verbose_proxy_logger.error( | |
"Error sending email alert - 'email' not in self.alerting %s", | |
self.alerting, | |
) | |
return False | |
from litellm.proxy.proxy_server import premium_user, prisma_client | |
email_logo_url = os.getenv( | |
"SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) | |
) | |
email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) | |
await self._check_if_using_premium_email_feature( | |
premium_user, email_logo_url, email_support_contact | |
) | |
if email_logo_url is None: | |
email_logo_url = LITELLM_LOGO_URL | |
if email_support_contact is None: | |
email_support_contact = LITELLM_SUPPORT_CONTACT | |
event_name = webhook_event.event_message | |
recipient_email = webhook_event.user_email | |
recipient_user_id = webhook_event.user_id | |
if ( | |
recipient_email is None | |
and recipient_user_id is not None | |
and prisma_client is not None | |
): | |
user_row = await prisma_client.db.litellm_usertable.find_unique( | |
where={"user_id": recipient_user_id} | |
) | |
if user_row is not None: | |
recipient_email = user_row.user_email | |
key_token = webhook_event.token | |
key_budget = webhook_event.max_budget | |
base_url = os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000") | |
email_html_content = "Alert from LiteLLM Server" | |
if recipient_email is None: | |
verbose_proxy_logger.error( | |
"Trying to send email alert to no recipient", | |
extra=webhook_event.dict(), | |
) | |
if webhook_event.event == "key_created": | |
email_html_content = KEY_CREATED_EMAIL_TEMPLATE.format( | |
email_logo_url=email_logo_url, | |
recipient_email=recipient_email, | |
key_budget=key_budget, | |
key_token=key_token, | |
base_url=base_url, | |
email_support_contact=email_support_contact, | |
) | |
elif webhook_event.event == "internal_user_created": | |
# GET TEAM NAME | |
team_id = webhook_event.team_id | |
team_name = "Default Team" | |
if team_id is not None and prisma_client is not None: | |
team_row = await prisma_client.db.litellm_teamtable.find_unique( | |
where={"team_id": team_id} | |
) | |
if team_row is not None: | |
team_name = team_row.team_alias or "-" | |
email_html_content = USER_INVITED_EMAIL_TEMPLATE.format( | |
email_logo_url=email_logo_url, | |
recipient_email=recipient_email, | |
team_name=team_name, | |
base_url=base_url, | |
email_support_contact=email_support_contact, | |
) | |
else: | |
verbose_proxy_logger.error( | |
"Trying to send email alert on unknown webhook event", | |
extra=webhook_event.model_dump(), | |
) | |
webhook_event.model_dump_json() | |
email_event = { | |
"to": recipient_email, | |
"subject": f"LiteLLM: {event_name}", | |
"html": email_html_content, | |
} | |
await send_email( | |
receiver_email=email_event["to"], | |
subject=email_event["subject"], | |
html=email_event["html"], | |
) | |
return True | |
except Exception as e: | |
verbose_proxy_logger.error("Error sending email alert %s", str(e)) | |
return False | |
async def send_email_alert_using_smtp( | |
self, webhook_event: WebhookEvent, alert_type: str | |
) -> bool: | |
""" | |
Sends structured Email alert to an SMTP server | |
Currently only implemented for budget alerts | |
Returns -> True if sent, False if not. | |
""" | |
from litellm.proxy.proxy_server import premium_user | |
from litellm.proxy.utils import send_email | |
email_logo_url = os.getenv( | |
"SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) | |
) | |
email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) | |
await self._check_if_using_premium_email_feature( | |
premium_user, email_logo_url, email_support_contact | |
) | |
if email_logo_url is None: | |
email_logo_url = LITELLM_LOGO_URL | |
if email_support_contact is None: | |
email_support_contact = LITELLM_SUPPORT_CONTACT | |
event_name = webhook_event.event_message | |
recipient_email = webhook_event.user_email | |
user_name = webhook_event.user_id | |
max_budget = webhook_event.max_budget | |
email_html_content = "Alert from LiteLLM Server" | |
if recipient_email is None: | |
verbose_proxy_logger.error( | |
"Trying to send email alert to no recipient", extra=webhook_event.dict() | |
) | |
if webhook_event.event == "budget_crossed": | |
email_html_content = f""" | |
<img src="{email_logo_url}" alt="LiteLLM Logo" width="150" height="50" /> | |
<p> Hi {user_name}, <br/> | |
Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br /> | |
API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br /> | |
If you have any questions, please send an email to {email_support_contact} <br /> <br /> | |
Best, <br /> | |
The LiteLLM team <br /> | |
""" | |
webhook_event.model_dump_json() | |
email_event = { | |
"to": recipient_email, | |
"subject": f"LiteLLM: {event_name}", | |
"html": email_html_content, | |
} | |
await send_email( | |
receiver_email=email_event["to"], | |
subject=email_event["subject"], | |
html=email_event["html"], | |
) | |
if webhook_event.event_group == "team": | |
from litellm.integrations.email_alerting import send_team_budget_alert | |
await send_team_budget_alert(webhook_event=webhook_event) | |
return False | |
async def send_alert( | |
self, | |
message: str, | |
level: Literal["Low", "Medium", "High"], | |
alert_type: AlertType, | |
alerting_metadata: dict, | |
user_info: Optional[WebhookEvent] = None, | |
**kwargs, | |
): | |
""" | |
Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 | |
- Responses taking too long | |
- Requests are hanging | |
- Calls are failing | |
- DB Read/Writes are failing | |
- Proxy Close to max budget | |
- Key Close to max budget | |
Parameters: | |
level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'. | |
message: str - what is the alert about | |
""" | |
if self.alerting is None: | |
return | |
if ( | |
"webhook" in self.alerting | |
and alert_type == "budget_alerts" | |
and user_info is not None | |
): | |
await self.send_webhook_alert(webhook_event=user_info) | |
if ( | |
"email" in self.alerting | |
and alert_type == "budget_alerts" | |
and user_info is not None | |
): | |
# only send budget alerts over Email | |
await self.send_email_alert_using_smtp( | |
webhook_event=user_info, alert_type=alert_type | |
) | |
if "slack" not in self.alerting: | |
return | |
if alert_type not in self.alert_types: | |
return | |
from datetime import datetime | |
# Get the current timestamp | |
current_time = datetime.now().strftime("%H:%M:%S") | |
_proxy_base_url = os.getenv("PROXY_BASE_URL", None) | |
if alert_type == "daily_reports" or alert_type == "new_model_added": | |
formatted_message = message | |
else: | |
formatted_message = ( | |
f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" | |
) | |
if kwargs: | |
for key, value in kwargs.items(): | |
formatted_message += f"\n\n{key}: `{value}`\n\n" | |
if alerting_metadata: | |
for key, value in alerting_metadata.items(): | |
formatted_message += f"\n\n*Alerting Metadata*: \n{key}: `{value}`\n\n" | |
if _proxy_base_url is not None: | |
formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" | |
# check if we find the slack webhook url in self.alert_to_webhook_url | |
if ( | |
self.alert_to_webhook_url is not None | |
and alert_type in self.alert_to_webhook_url | |
): | |
slack_webhook_url: Optional[ | |
Union[str, List[str]] | |
] = self.alert_to_webhook_url[alert_type] | |
elif self.default_webhook_url is not None: | |
slack_webhook_url = self.default_webhook_url | |
else: | |
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None) | |
if slack_webhook_url is None: | |
raise ValueError("Missing SLACK_WEBHOOK_URL from environment") | |
payload = {"text": formatted_message} | |
headers = {"Content-type": "application/json"} | |
if isinstance(slack_webhook_url, list): | |
for url in slack_webhook_url: | |
self.log_queue.append( | |
{ | |
"url": url, | |
"headers": headers, | |
"payload": payload, | |
"alert_type": alert_type, | |
} | |
) | |
else: | |
self.log_queue.append( | |
{ | |
"url": slack_webhook_url, | |
"headers": headers, | |
"payload": payload, | |
"alert_type": alert_type, | |
} | |
) | |
if len(self.log_queue) >= self.batch_size: | |
await self.flush_queue() | |
async def async_send_batch(self): | |
if not self.log_queue: | |
return | |
squashed_queue = squash_payloads(self.log_queue) | |
tasks = [ | |
send_to_webhook( | |
slackAlertingInstance=self, item=item["item"], count=item["count"] | |
) | |
for item in squashed_queue.values() | |
] | |
await asyncio.gather(*tasks) | |
self.log_queue.clear() | |
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
"""Log deployment latency""" | |
try: | |
if "daily_reports" in self.alert_types: | |
litellm_params = kwargs.get("litellm_params", {}) or {} | |
model_info = litellm_params.get("model_info", {}) or {} | |
model_id = model_info.get("id", "") or "" | |
response_s: timedelta = end_time - start_time | |
final_value = response_s | |
if isinstance(response_obj, litellm.ModelResponse) and ( | |
hasattr(response_obj, "usage") | |
and response_obj.usage is not None # type: ignore | |
and hasattr(response_obj.usage, "completion_tokens") # type: ignore | |
): | |
completion_tokens = response_obj.usage.completion_tokens # type: ignore | |
if completion_tokens is not None and completion_tokens > 0: | |
final_value = float( | |
response_s.total_seconds() / completion_tokens | |
) | |
if isinstance(final_value, timedelta): | |
final_value = final_value.total_seconds() | |
await self.async_update_daily_reports( | |
DeploymentMetrics( | |
id=model_id, | |
failed_request=False, | |
latency_per_output_token=final_value, | |
updated_at=litellm.utils.get_utc_datetime(), | |
) | |
) | |
except Exception as e: | |
verbose_proxy_logger.error( | |
f"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: {str(e)}" | |
) | |
pass | |
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
"""Log failure + deployment latency""" | |
_litellm_params = kwargs.get("litellm_params", {}) | |
_model_info = _litellm_params.get("model_info", {}) or {} | |
model_id = _model_info.get("id", "") | |
try: | |
if "daily_reports" in self.alert_types: | |
try: | |
await self.async_update_daily_reports( | |
DeploymentMetrics( | |
id=model_id, | |
failed_request=True, | |
latency_per_output_token=None, | |
updated_at=litellm.utils.get_utc_datetime(), | |
) | |
) | |
except Exception as e: | |
verbose_logger.debug(f"Exception raises -{str(e)}") | |
if isinstance(kwargs.get("exception", ""), APIError): | |
if "outage_alerts" in self.alert_types: | |
await self.outage_alerts( | |
exception=kwargs["exception"], | |
deployment_id=model_id, | |
) | |
if "region_outage_alerts" in self.alert_types: | |
await self.region_outage_alerts( | |
exception=kwargs["exception"], deployment_id=model_id | |
) | |
except Exception: | |
pass | |
async def _run_scheduler_helper(self, llm_router) -> bool: | |
""" | |
Returns: | |
- True -> report sent | |
- False -> report not sent | |
""" | |
report_sent_bool = False | |
report_sent = await self.internal_usage_cache.async_get_cache( | |
key=SlackAlertingCacheKeys.report_sent_key.value, | |
parent_otel_span=None, | |
) # None | float | |
current_time = time.time() | |
if report_sent is None: | |
await self.internal_usage_cache.async_set_cache( | |
key=SlackAlertingCacheKeys.report_sent_key.value, | |
value=current_time, | |
) | |
elif isinstance(report_sent, float): | |
# Check if current time - interval >= time last sent | |
interval_seconds = self.alerting_args.daily_report_frequency | |
if current_time - report_sent >= interval_seconds: | |
# Sneak in the reporting logic here | |
await self.send_daily_reports(router=llm_router) | |
# Also, don't forget to update the report_sent time after sending the report! | |
await self.internal_usage_cache.async_set_cache( | |
key=SlackAlertingCacheKeys.report_sent_key.value, | |
value=current_time, | |
) | |
report_sent_bool = True | |
return report_sent_bool | |
async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None): | |
""" | |
If 'daily_reports' enabled | |
Ping redis cache every 5 minutes to check if we should send the report | |
If yes -> call send_daily_report() | |
""" | |
if llm_router is None or self.alert_types is None: | |
return | |
if "daily_reports" in self.alert_types: | |
while True: | |
await self._run_scheduler_helper(llm_router=llm_router) | |
interval = random.randint( | |
self.alerting_args.report_check_interval - 3, | |
self.alerting_args.report_check_interval + 3, | |
) # shuffle to prevent collisions | |
await asyncio.sleep(interval) | |
return | |
async def send_weekly_spend_report( | |
self, | |
time_range: str = "7d", | |
): | |
""" | |
Send a spend report for a configurable time range. | |
Args: | |
time_range: A string specifying the time range for the report, e.g., "1d", "7d", "30d" | |
""" | |
if self.alerting is None or "spend_reports" not in self.alert_types: | |
return | |
try: | |
from litellm.proxy.spend_tracking.spend_management_endpoints import ( | |
_get_spend_report_for_time_range, | |
) | |
# Parse the time range | |
days = int(time_range[:-1]) | |
if time_range[-1].lower() != "d": | |
raise ValueError("Time range must be specified in days, e.g., '7d'") | |
todays_date = datetime.datetime.now().date() | |
start_date = todays_date - datetime.timedelta(days=days) | |
_event_cache_key = f"weekly_spend_report_sent_{start_date.strftime('%Y-%m-%d')}_{todays_date.strftime('%Y-%m-%d')}" | |
if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): | |
return | |
_resp = await _get_spend_report_for_time_range( | |
start_date=start_date.strftime("%Y-%m-%d"), | |
end_date=todays_date.strftime("%Y-%m-%d"), | |
) | |
if _resp is None or _resp == ([], []): | |
return | |
spend_per_team, spend_per_tag = _resp | |
_spend_message = f"*💸 Spend Report for `{start_date.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` ({days} days)*\n" | |
if spend_per_team is not None: | |
_spend_message += "\n*Team Spend Report:*\n" | |
for spend in spend_per_team: | |
_team_spend = round(float(spend["total_spend"]), 4) | |
_spend_message += ( | |
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" | |
) | |
if spend_per_tag is not None: | |
_spend_message += "\n*Tag Spend Report:*\n" | |
for spend in spend_per_tag: | |
_tag_spend = round(float(spend["total_spend"]), 4) | |
_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" | |
await self.send_alert( | |
message=_spend_message, | |
level="Low", | |
alert_type=AlertType.spend_reports, | |
alerting_metadata={}, | |
) | |
await self.internal_usage_cache.async_set_cache( | |
key=_event_cache_key, | |
value="SENT", | |
ttl=duration_in_seconds(time_range), | |
) | |
except ValueError as ve: | |
verbose_proxy_logger.error(f"Invalid time range format: {ve}") | |
except Exception as e: | |
verbose_proxy_logger.error(f"Error sending spend report: {e}") | |
async def send_monthly_spend_report(self): | |
""" """ | |
try: | |
from calendar import monthrange | |
from litellm.proxy.spend_tracking.spend_management_endpoints import ( | |
_get_spend_report_for_time_range, | |
) | |
todays_date = datetime.datetime.now().date() | |
first_day_of_month = todays_date.replace(day=1) | |
_, last_day_of_month = monthrange(todays_date.year, todays_date.month) | |
last_day_of_month = first_day_of_month + datetime.timedelta( | |
days=last_day_of_month - 1 | |
) | |
_event_cache_key = f"monthly_spend_report_sent_{first_day_of_month.strftime('%Y-%m-%d')}_{last_day_of_month.strftime('%Y-%m-%d')}" | |
if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): | |
return | |
_resp = await _get_spend_report_for_time_range( | |
start_date=first_day_of_month.strftime("%Y-%m-%d"), | |
end_date=last_day_of_month.strftime("%Y-%m-%d"), | |
) | |
if _resp is None or _resp == ([], []): | |
return | |
monthly_spend_per_team, monthly_spend_per_tag = _resp | |
_spend_message = f"*💸 Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n" | |
if monthly_spend_per_team is not None: | |
_spend_message += "\n*Team Spend Report:*\n" | |
for spend in monthly_spend_per_team: | |
_team_spend = spend["total_spend"] | |
_team_spend = float(_team_spend) | |
# round to 4 decimal places | |
_team_spend = round(_team_spend, 4) | |
_spend_message += ( | |
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" | |
) | |
if monthly_spend_per_tag is not None: | |
_spend_message += "\n*Tag Spend Report:*\n" | |
for spend in monthly_spend_per_tag: | |
_tag_spend = spend["total_spend"] | |
_tag_spend = float(_tag_spend) | |
# round to 4 decimal places | |
_tag_spend = round(_tag_spend, 4) | |
_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" | |
await self.send_alert( | |
message=_spend_message, | |
level="Low", | |
alert_type=AlertType.spend_reports, | |
alerting_metadata={}, | |
) | |
await self.internal_usage_cache.async_set_cache( | |
key=_event_cache_key, | |
value="SENT", | |
ttl=(30 * HOURS_IN_A_DAY * 60 * 60), # 1 month | |
) | |
except Exception as e: | |
verbose_proxy_logger.exception("Error sending weekly spend report %s", e) | |
async def send_fallback_stats_from_prometheus(self): | |
""" | |
Helper to send fallback statistics from prometheus server -> to slack | |
This runs once per day and sends an overview of all the fallback statistics | |
""" | |
try: | |
from litellm.integrations.prometheus_helpers.prometheus_api import ( | |
get_fallback_metric_from_prometheus, | |
) | |
# call prometheuslogger. | |
falllback_success_info_prometheus = ( | |
await get_fallback_metric_from_prometheus() | |
) | |
fallback_message = ( | |
f"*Fallback Statistics:*\n{falllback_success_info_prometheus}" | |
) | |
await self.send_alert( | |
message=fallback_message, | |
level="Low", | |
alert_type=AlertType.fallback_reports, | |
alerting_metadata={}, | |
) | |
except Exception as e: | |
verbose_proxy_logger.error("Error sending weekly spend report %s", e) | |
pass | |
async def send_virtual_key_event_slack( | |
self, | |
key_event: VirtualKeyEvent, | |
alert_type: AlertType, | |
event_name: str, | |
): | |
""" | |
Handles sending Virtual Key related alerts | |
Example: | |
- New Virtual Key Created | |
- Internal User Updated | |
- Team Created, Updated, Deleted | |
""" | |
try: | |
message = f"`{event_name}`\n" | |
key_event_dict = key_event.model_dump() | |
# Add Created by information first | |
message += "*Action Done by:*\n" | |
for key, value in key_event_dict.items(): | |
if "created_by" in key: | |
message += f"{key}: `{value}`\n" | |
# Add args sent to function in the alert | |
message += "\n*Arguments passed:*\n" | |
request_kwargs = key_event.request_kwargs | |
for key, value in request_kwargs.items(): | |
if key == "user_api_key_dict": | |
continue | |
message += f"{key}: `{value}`\n" | |
await self.send_alert( | |
message=message, | |
level="High", | |
alert_type=alert_type, | |
alerting_metadata={}, | |
) | |
except Exception as e: | |
verbose_proxy_logger.error( | |
"Error sending send_virtual_key_event_slack %s", e | |
) | |
return | |
async def _request_is_completed(self, request_data: Optional[dict]) -> bool: | |
""" | |
Returns True if the request is completed - either as a success or failure | |
""" | |
if request_data is None: | |
return False | |
if ( | |
request_data.get("litellm_status", "") != "success" | |
and request_data.get("litellm_status", "") != "fail" | |
): | |
## CHECK IF CACHE IS UPDATED | |
litellm_call_id = request_data.get("litellm_call_id", "") | |
status: Optional[str] = await self.internal_usage_cache.async_get_cache( | |
key="request_status:{}".format(litellm_call_id), local_only=True | |
) | |
if status is not None and (status == "success" or status == "fail"): | |
return True | |
return False | |