Spaces:
Paused
Paused
| # used for /metrics endpoint on LiteLLM Proxy | |
| #### What this does #### | |
| # On success, log events to Prometheus | |
| import sys | |
| from datetime import datetime, timedelta | |
| from typing import ( | |
| TYPE_CHECKING, | |
| Any, | |
| Awaitable, | |
| Callable, | |
| List, | |
| Literal, | |
| Optional, | |
| Tuple, | |
| cast, | |
| ) | |
| import litellm | |
| from litellm._logging import print_verbose, verbose_logger | |
| from litellm.integrations.custom_logger import CustomLogger | |
| from litellm.proxy._types import LiteLLM_TeamTable, UserAPIKeyAuth | |
| from litellm.types.integrations.prometheus import * | |
| from litellm.types.utils import StandardLoggingPayload | |
| from litellm.utils import get_end_user_id_for_cost_tracking | |
| if TYPE_CHECKING: | |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
| else: | |
| AsyncIOScheduler = Any | |
| class PrometheusLogger(CustomLogger): | |
| # Class variables or attributes | |
| def __init__( | |
| self, | |
| **kwargs, | |
| ): | |
| try: | |
| from prometheus_client import Counter, Gauge, Histogram | |
| from litellm.proxy.proxy_server import CommonProxyErrors, premium_user | |
| if premium_user is not True: | |
| verbose_logger.warning( | |
| f"🚨🚨🚨 Prometheus Metrics is on LiteLLM Enterprise\n🚨 {CommonProxyErrors.not_premium_user.value}" | |
| ) | |
| self.litellm_not_a_premium_user_metric = Counter( | |
| name="litellm_not_a_premium_user_metric", | |
| documentation=f"🚨🚨🚨 Prometheus Metrics is on LiteLLM Enterprise. 🚨 {CommonProxyErrors.not_premium_user.value}", | |
| ) | |
| return | |
| self.litellm_proxy_failed_requests_metric = Counter( | |
| name="litellm_proxy_failed_requests_metric", | |
| documentation="Total number of failed responses from proxy - the client did not get a success response from litellm proxy", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_proxy_failed_requests_metric" | |
| ), | |
| ) | |
| self.litellm_proxy_total_requests_metric = Counter( | |
| name="litellm_proxy_total_requests_metric", | |
| documentation="Total number of requests made to the proxy server - track number of client side requests", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_proxy_total_requests_metric" | |
| ), | |
| ) | |
| # request latency metrics | |
| self.litellm_request_total_latency_metric = Histogram( | |
| "litellm_request_total_latency_metric", | |
| "Total latency (seconds) for a request to LiteLLM", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_request_total_latency_metric" | |
| ), | |
| buckets=LATENCY_BUCKETS, | |
| ) | |
| self.litellm_llm_api_latency_metric = Histogram( | |
| "litellm_llm_api_latency_metric", | |
| "Total latency (seconds) for a models LLM API call", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_llm_api_latency_metric" | |
| ), | |
| buckets=LATENCY_BUCKETS, | |
| ) | |
| self.litellm_llm_api_time_to_first_token_metric = Histogram( | |
| "litellm_llm_api_time_to_first_token_metric", | |
| "Time to first token for a models LLM API call", | |
| labelnames=[ | |
| "model", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| "team", | |
| "team_alias", | |
| ], | |
| buckets=LATENCY_BUCKETS, | |
| ) | |
| # Counter for spend | |
| self.litellm_spend_metric = Counter( | |
| "litellm_spend_metric", | |
| "Total spend on LLM requests", | |
| labelnames=[ | |
| "end_user", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| "model", | |
| "team", | |
| "team_alias", | |
| "user", | |
| ], | |
| ) | |
| # Counter for total_output_tokens | |
| self.litellm_tokens_metric = Counter( | |
| "litellm_total_tokens", | |
| "Total number of input + output tokens from LLM requests", | |
| labelnames=[ | |
| "end_user", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| "model", | |
| "team", | |
| "team_alias", | |
| "user", | |
| ], | |
| ) | |
| self.litellm_input_tokens_metric = Counter( | |
| "litellm_input_tokens", | |
| "Total number of input tokens from LLM requests", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_input_tokens_metric" | |
| ), | |
| ) | |
| self.litellm_output_tokens_metric = Counter( | |
| "litellm_output_tokens", | |
| "Total number of output tokens from LLM requests", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_output_tokens_metric" | |
| ), | |
| ) | |
| # Remaining Budget for Team | |
| self.litellm_remaining_team_budget_metric = Gauge( | |
| "litellm_remaining_team_budget_metric", | |
| "Remaining budget for team", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_remaining_team_budget_metric" | |
| ), | |
| ) | |
| # Max Budget for Team | |
| self.litellm_team_max_budget_metric = Gauge( | |
| "litellm_team_max_budget_metric", | |
| "Maximum budget set for team", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_team_max_budget_metric" | |
| ), | |
| ) | |
| # Team Budget Reset At | |
| self.litellm_team_budget_remaining_hours_metric = Gauge( | |
| "litellm_team_budget_remaining_hours_metric", | |
| "Remaining days for team budget to be reset", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_team_budget_remaining_hours_metric" | |
| ), | |
| ) | |
| # Remaining Budget for API Key | |
| self.litellm_remaining_api_key_budget_metric = Gauge( | |
| "litellm_remaining_api_key_budget_metric", | |
| "Remaining budget for api key", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_remaining_api_key_budget_metric" | |
| ), | |
| ) | |
| # Max Budget for API Key | |
| self.litellm_api_key_max_budget_metric = Gauge( | |
| "litellm_api_key_max_budget_metric", | |
| "Maximum budget set for api key", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_api_key_max_budget_metric" | |
| ), | |
| ) | |
| self.litellm_api_key_budget_remaining_hours_metric = Gauge( | |
| "litellm_api_key_budget_remaining_hours_metric", | |
| "Remaining hours for api key budget to be reset", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_api_key_budget_remaining_hours_metric" | |
| ), | |
| ) | |
| ######################################## | |
| # LiteLLM Virtual API KEY metrics | |
| ######################################## | |
| # Remaining MODEL RPM limit for API Key | |
| self.litellm_remaining_api_key_requests_for_model = Gauge( | |
| "litellm_remaining_api_key_requests_for_model", | |
| "Remaining Requests API Key can make for model (model based rpm limit on key)", | |
| labelnames=["hashed_api_key", "api_key_alias", "model"], | |
| ) | |
| # Remaining MODEL TPM limit for API Key | |
| self.litellm_remaining_api_key_tokens_for_model = Gauge( | |
| "litellm_remaining_api_key_tokens_for_model", | |
| "Remaining Tokens API Key can make for model (model based tpm limit on key)", | |
| labelnames=["hashed_api_key", "api_key_alias", "model"], | |
| ) | |
| ######################################## | |
| # LLM API Deployment Metrics / analytics | |
| ######################################## | |
| # Remaining Rate Limit for model | |
| self.litellm_remaining_requests_metric = Gauge( | |
| "litellm_remaining_requests", | |
| "LLM Deployment Analytics - remaining requests for model, returned from LLM API Provider", | |
| labelnames=[ | |
| "model_group", | |
| "api_provider", | |
| "api_base", | |
| "litellm_model_name", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| ], | |
| ) | |
| self.litellm_remaining_tokens_metric = Gauge( | |
| "litellm_remaining_tokens", | |
| "remaining tokens for model, returned from LLM API Provider", | |
| labelnames=[ | |
| "model_group", | |
| "api_provider", | |
| "api_base", | |
| "litellm_model_name", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| ], | |
| ) | |
| self.litellm_overhead_latency_metric = Histogram( | |
| "litellm_overhead_latency_metric", | |
| "Latency overhead (milliseconds) added by LiteLLM processing", | |
| labelnames=[ | |
| "model_group", | |
| "api_provider", | |
| "api_base", | |
| "litellm_model_name", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| ], | |
| buckets=LATENCY_BUCKETS, | |
| ) | |
| # llm api provider budget metrics | |
| self.litellm_provider_remaining_budget_metric = Gauge( | |
| "litellm_provider_remaining_budget_metric", | |
| "Remaining budget for provider - used when you set provider budget limits", | |
| labelnames=["api_provider"], | |
| ) | |
| # Get all keys | |
| _logged_llm_labels = [ | |
| UserAPIKeyLabelNames.v2_LITELLM_MODEL_NAME.value, | |
| UserAPIKeyLabelNames.MODEL_ID.value, | |
| UserAPIKeyLabelNames.API_BASE.value, | |
| UserAPIKeyLabelNames.API_PROVIDER.value, | |
| ] | |
| team_and_key_labels = [ | |
| "hashed_api_key", | |
| "api_key_alias", | |
| "team", | |
| "team_alias", | |
| ] | |
| # Metric for deployment state | |
| self.litellm_deployment_state = Gauge( | |
| "litellm_deployment_state", | |
| "LLM Deployment Analytics - The state of the deployment: 0 = healthy, 1 = partial outage, 2 = complete outage", | |
| labelnames=_logged_llm_labels, | |
| ) | |
| self.litellm_deployment_cooled_down = Counter( | |
| "litellm_deployment_cooled_down", | |
| "LLM Deployment Analytics - Number of times a deployment has been cooled down by LiteLLM load balancing logic. exception_status is the status of the exception that caused the deployment to be cooled down", | |
| labelnames=_logged_llm_labels + [EXCEPTION_STATUS], | |
| ) | |
| self.litellm_deployment_success_responses = Counter( | |
| name="litellm_deployment_success_responses", | |
| documentation="LLM Deployment Analytics - Total number of successful LLM API calls via litellm", | |
| labelnames=[REQUESTED_MODEL] + _logged_llm_labels + team_and_key_labels, | |
| ) | |
| self.litellm_deployment_failure_responses = Counter( | |
| name="litellm_deployment_failure_responses", | |
| documentation="LLM Deployment Analytics - Total number of failed LLM API calls for a specific LLM deploymeny. exception_status is the status of the exception from the llm api", | |
| labelnames=[REQUESTED_MODEL] | |
| + _logged_llm_labels | |
| + EXCEPTION_LABELS | |
| + team_and_key_labels, | |
| ) | |
| self.litellm_deployment_failure_by_tag_responses = Counter( | |
| "litellm_deployment_failure_by_tag_responses", | |
| "Total number of failed LLM API calls for a specific LLM deploymeny by custom metadata tags", | |
| labelnames=[ | |
| UserAPIKeyLabelNames.REQUESTED_MODEL.value, | |
| UserAPIKeyLabelNames.TAG.value, | |
| ] | |
| + _logged_llm_labels | |
| + EXCEPTION_LABELS, | |
| ) | |
| self.litellm_deployment_total_requests = Counter( | |
| name="litellm_deployment_total_requests", | |
| documentation="LLM Deployment Analytics - Total number of LLM API calls via litellm - success + failure", | |
| labelnames=[REQUESTED_MODEL] + _logged_llm_labels + team_and_key_labels, | |
| ) | |
| # Deployment Latency tracking | |
| team_and_key_labels = [ | |
| "hashed_api_key", | |
| "api_key_alias", | |
| "team", | |
| "team_alias", | |
| ] | |
| self.litellm_deployment_latency_per_output_token = Histogram( | |
| name="litellm_deployment_latency_per_output_token", | |
| documentation="LLM Deployment Analytics - Latency per output token", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_deployment_latency_per_output_token" | |
| ), | |
| ) | |
| self.litellm_deployment_successful_fallbacks = Counter( | |
| "litellm_deployment_successful_fallbacks", | |
| "LLM Deployment Analytics - Number of successful fallback requests from primary model -> fallback model", | |
| PrometheusMetricLabels.get_labels( | |
| "litellm_deployment_successful_fallbacks" | |
| ), | |
| ) | |
| self.litellm_deployment_failed_fallbacks = Counter( | |
| "litellm_deployment_failed_fallbacks", | |
| "LLM Deployment Analytics - Number of failed fallback requests from primary model -> fallback model", | |
| PrometheusMetricLabels.get_labels( | |
| "litellm_deployment_failed_fallbacks" | |
| ), | |
| ) | |
| self.litellm_llm_api_failed_requests_metric = Counter( | |
| name="litellm_llm_api_failed_requests_metric", | |
| documentation="deprecated - use litellm_proxy_failed_requests_metric", | |
| labelnames=[ | |
| "end_user", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| "model", | |
| "team", | |
| "team_alias", | |
| "user", | |
| ], | |
| ) | |
| self.litellm_requests_metric = Counter( | |
| name="litellm_requests_metric", | |
| documentation="deprecated - use litellm_proxy_total_requests_metric. Total number of LLM calls to litellm - track total per API Key, team, user", | |
| labelnames=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_requests_metric" | |
| ), | |
| ) | |
| except Exception as e: | |
| print_verbose(f"Got exception on init prometheus client {str(e)}") | |
| raise e | |
| async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| # Define prometheus client | |
| from litellm.types.utils import StandardLoggingPayload | |
| verbose_logger.debug( | |
| f"prometheus Logging - Enters success logging function for kwargs {kwargs}" | |
| ) | |
| # unpack kwargs | |
| standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( | |
| "standard_logging_object" | |
| ) | |
| if standard_logging_payload is None or not isinstance( | |
| standard_logging_payload, dict | |
| ): | |
| raise ValueError( | |
| f"standard_logging_object is required, got={standard_logging_payload}" | |
| ) | |
| model = kwargs.get("model", "") | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| _metadata = litellm_params.get("metadata", {}) | |
| end_user_id = get_end_user_id_for_cost_tracking( | |
| litellm_params, service_type="prometheus" | |
| ) | |
| user_id = standard_logging_payload["metadata"]["user_api_key_user_id"] | |
| user_api_key = standard_logging_payload["metadata"]["user_api_key_hash"] | |
| user_api_key_alias = standard_logging_payload["metadata"]["user_api_key_alias"] | |
| user_api_team = standard_logging_payload["metadata"]["user_api_key_team_id"] | |
| user_api_team_alias = standard_logging_payload["metadata"][ | |
| "user_api_key_team_alias" | |
| ] | |
| output_tokens = standard_logging_payload["completion_tokens"] | |
| tokens_used = standard_logging_payload["total_tokens"] | |
| response_cost = standard_logging_payload["response_cost"] | |
| _requester_metadata = standard_logging_payload["metadata"].get( | |
| "requester_metadata" | |
| ) | |
| if standard_logging_payload is not None and isinstance( | |
| standard_logging_payload, dict | |
| ): | |
| _tags = standard_logging_payload["request_tags"] | |
| else: | |
| _tags = [] | |
| print_verbose( | |
| f"inside track_prometheus_metrics, model {model}, response_cost {response_cost}, tokens_used {tokens_used}, end_user_id {end_user_id}, user_api_key {user_api_key}" | |
| ) | |
| enum_values = UserAPIKeyLabelValues( | |
| end_user=end_user_id, | |
| hashed_api_key=user_api_key, | |
| api_key_alias=user_api_key_alias, | |
| requested_model=standard_logging_payload["model_group"], | |
| team=user_api_team, | |
| team_alias=user_api_team_alias, | |
| user=user_id, | |
| user_email=standard_logging_payload["metadata"]["user_api_key_user_email"], | |
| status_code="200", | |
| model=model, | |
| litellm_model_name=model, | |
| tags=_tags, | |
| model_id=standard_logging_payload["model_id"], | |
| api_base=standard_logging_payload["api_base"], | |
| api_provider=standard_logging_payload["custom_llm_provider"], | |
| exception_status=None, | |
| exception_class=None, | |
| custom_metadata_labels=get_custom_labels_from_metadata( | |
| metadata=standard_logging_payload["metadata"].get("requester_metadata") | |
| or {} | |
| ), | |
| ) | |
| if ( | |
| user_api_key is not None | |
| and isinstance(user_api_key, str) | |
| and user_api_key.startswith("sk-") | |
| ): | |
| from litellm.proxy.utils import hash_token | |
| user_api_key = hash_token(user_api_key) | |
| # increment total LLM requests and spend metric | |
| self._increment_top_level_request_and_spend_metrics( | |
| end_user_id=end_user_id, | |
| user_api_key=user_api_key, | |
| user_api_key_alias=user_api_key_alias, | |
| model=model, | |
| user_api_team=user_api_team, | |
| user_api_team_alias=user_api_team_alias, | |
| user_id=user_id, | |
| response_cost=response_cost, | |
| enum_values=enum_values, | |
| ) | |
| # input, output, total token metrics | |
| self._increment_token_metrics( | |
| # why type ignore below? | |
| # 1. We just checked if isinstance(standard_logging_payload, dict). Pyright complains. | |
| # 2. Pyright does not allow us to run isinstance(standard_logging_payload, StandardLoggingPayload) <- this would be ideal | |
| standard_logging_payload=standard_logging_payload, # type: ignore | |
| end_user_id=end_user_id, | |
| user_api_key=user_api_key, | |
| user_api_key_alias=user_api_key_alias, | |
| model=model, | |
| user_api_team=user_api_team, | |
| user_api_team_alias=user_api_team_alias, | |
| user_id=user_id, | |
| enum_values=enum_values, | |
| ) | |
| # remaining budget metrics | |
| await self._increment_remaining_budget_metrics( | |
| user_api_team=user_api_team, | |
| user_api_team_alias=user_api_team_alias, | |
| user_api_key=user_api_key, | |
| user_api_key_alias=user_api_key_alias, | |
| litellm_params=litellm_params, | |
| response_cost=response_cost, | |
| ) | |
| # set proxy virtual key rpm/tpm metrics | |
| self._set_virtual_key_rate_limit_metrics( | |
| user_api_key=user_api_key, | |
| user_api_key_alias=user_api_key_alias, | |
| kwargs=kwargs, | |
| metadata=_metadata, | |
| ) | |
| # set latency metrics | |
| self._set_latency_metrics( | |
| kwargs=kwargs, | |
| model=model, | |
| user_api_key=user_api_key, | |
| user_api_key_alias=user_api_key_alias, | |
| user_api_team=user_api_team, | |
| user_api_team_alias=user_api_team_alias, | |
| # why type ignore below? | |
| # 1. We just checked if isinstance(standard_logging_payload, dict). Pyright complains. | |
| # 2. Pyright does not allow us to run isinstance(standard_logging_payload, StandardLoggingPayload) <- this would be ideal | |
| enum_values=enum_values, | |
| ) | |
| # set x-ratelimit headers | |
| self.set_llm_deployment_success_metrics( | |
| kwargs, start_time, end_time, enum_values, output_tokens | |
| ) | |
| if ( | |
| standard_logging_payload["stream"] is True | |
| ): # log successful streaming requests from logging event hook. | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_proxy_total_requests_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_proxy_total_requests_metric.labels(**_labels).inc() | |
| def _increment_token_metrics( | |
| self, | |
| standard_logging_payload: StandardLoggingPayload, | |
| end_user_id: Optional[str], | |
| user_api_key: Optional[str], | |
| user_api_key_alias: Optional[str], | |
| model: Optional[str], | |
| user_api_team: Optional[str], | |
| user_api_team_alias: Optional[str], | |
| user_id: Optional[str], | |
| enum_values: UserAPIKeyLabelValues, | |
| ): | |
| # token metrics | |
| self.litellm_tokens_metric.labels( | |
| end_user_id, | |
| user_api_key, | |
| user_api_key_alias, | |
| model, | |
| user_api_team, | |
| user_api_team_alias, | |
| user_id, | |
| ).inc(standard_logging_payload["total_tokens"]) | |
| if standard_logging_payload is not None and isinstance( | |
| standard_logging_payload, dict | |
| ): | |
| _tags = standard_logging_payload["request_tags"] | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_input_tokens_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_input_tokens_metric.labels(**_labels).inc( | |
| standard_logging_payload["prompt_tokens"] | |
| ) | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_output_tokens_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_output_tokens_metric.labels(**_labels).inc( | |
| standard_logging_payload["completion_tokens"] | |
| ) | |
| async def _increment_remaining_budget_metrics( | |
| self, | |
| user_api_team: Optional[str], | |
| user_api_team_alias: Optional[str], | |
| user_api_key: Optional[str], | |
| user_api_key_alias: Optional[str], | |
| litellm_params: dict, | |
| response_cost: float, | |
| ): | |
| _team_spend = litellm_params.get("metadata", {}).get( | |
| "user_api_key_team_spend", None | |
| ) | |
| _team_max_budget = litellm_params.get("metadata", {}).get( | |
| "user_api_key_team_max_budget", None | |
| ) | |
| _api_key_spend = litellm_params.get("metadata", {}).get( | |
| "user_api_key_spend", None | |
| ) | |
| _api_key_max_budget = litellm_params.get("metadata", {}).get( | |
| "user_api_key_max_budget", None | |
| ) | |
| await self._set_api_key_budget_metrics_after_api_request( | |
| user_api_key=user_api_key, | |
| user_api_key_alias=user_api_key_alias, | |
| response_cost=response_cost, | |
| key_max_budget=_api_key_max_budget, | |
| key_spend=_api_key_spend, | |
| ) | |
| await self._set_team_budget_metrics_after_api_request( | |
| user_api_team=user_api_team, | |
| user_api_team_alias=user_api_team_alias, | |
| team_spend=_team_spend, | |
| team_max_budget=_team_max_budget, | |
| response_cost=response_cost, | |
| ) | |
| def _increment_top_level_request_and_spend_metrics( | |
| self, | |
| end_user_id: Optional[str], | |
| user_api_key: Optional[str], | |
| user_api_key_alias: Optional[str], | |
| model: Optional[str], | |
| user_api_team: Optional[str], | |
| user_api_team_alias: Optional[str], | |
| user_id: Optional[str], | |
| response_cost: float, | |
| enum_values: UserAPIKeyLabelValues, | |
| ): | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_requests_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_requests_metric.labels(**_labels).inc() | |
| self.litellm_spend_metric.labels( | |
| end_user_id, | |
| user_api_key, | |
| user_api_key_alias, | |
| model, | |
| user_api_team, | |
| user_api_team_alias, | |
| user_id, | |
| ).inc(response_cost) | |
| def _set_virtual_key_rate_limit_metrics( | |
| self, | |
| user_api_key: Optional[str], | |
| user_api_key_alias: Optional[str], | |
| kwargs: dict, | |
| metadata: dict, | |
| ): | |
| from litellm.proxy.common_utils.callback_utils import ( | |
| get_model_group_from_litellm_kwargs, | |
| ) | |
| # Set remaining rpm/tpm for API Key + model | |
| # see parallel_request_limiter.py - variables are set there | |
| model_group = get_model_group_from_litellm_kwargs(kwargs) | |
| remaining_requests_variable_name = ( | |
| f"litellm-key-remaining-requests-{model_group}" | |
| ) | |
| remaining_tokens_variable_name = f"litellm-key-remaining-tokens-{model_group}" | |
| remaining_requests = ( | |
| metadata.get(remaining_requests_variable_name, sys.maxsize) or sys.maxsize | |
| ) | |
| remaining_tokens = ( | |
| metadata.get(remaining_tokens_variable_name, sys.maxsize) or sys.maxsize | |
| ) | |
| self.litellm_remaining_api_key_requests_for_model.labels( | |
| user_api_key, user_api_key_alias, model_group | |
| ).set(remaining_requests) | |
| self.litellm_remaining_api_key_tokens_for_model.labels( | |
| user_api_key, user_api_key_alias, model_group | |
| ).set(remaining_tokens) | |
| def _set_latency_metrics( | |
| self, | |
| kwargs: dict, | |
| model: Optional[str], | |
| user_api_key: Optional[str], | |
| user_api_key_alias: Optional[str], | |
| user_api_team: Optional[str], | |
| user_api_team_alias: Optional[str], | |
| enum_values: UserAPIKeyLabelValues, | |
| ): | |
| # latency metrics | |
| end_time: datetime = kwargs.get("end_time") or datetime.now() | |
| start_time: Optional[datetime] = kwargs.get("start_time") | |
| api_call_start_time = kwargs.get("api_call_start_time", None) | |
| completion_start_time = kwargs.get("completion_start_time", None) | |
| time_to_first_token_seconds = self._safe_duration_seconds( | |
| start_time=api_call_start_time, | |
| end_time=completion_start_time, | |
| ) | |
| if ( | |
| time_to_first_token_seconds is not None | |
| and kwargs.get("stream", False) is True # only emit for streaming requests | |
| ): | |
| self.litellm_llm_api_time_to_first_token_metric.labels( | |
| model, | |
| user_api_key, | |
| user_api_key_alias, | |
| user_api_team, | |
| user_api_team_alias, | |
| ).observe(time_to_first_token_seconds) | |
| else: | |
| verbose_logger.debug( | |
| "Time to first token metric not emitted, stream option in model_parameters is not True" | |
| ) | |
| api_call_total_time_seconds = self._safe_duration_seconds( | |
| start_time=api_call_start_time, | |
| end_time=end_time, | |
| ) | |
| if api_call_total_time_seconds is not None: | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_llm_api_latency_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_llm_api_latency_metric.labels(**_labels).observe( | |
| api_call_total_time_seconds | |
| ) | |
| # total request latency | |
| total_time_seconds = self._safe_duration_seconds( | |
| start_time=start_time, | |
| end_time=end_time, | |
| ) | |
| if total_time_seconds is not None: | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_request_total_latency_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_request_total_latency_metric.labels(**_labels).observe( | |
| total_time_seconds | |
| ) | |
| async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
| from litellm.types.utils import StandardLoggingPayload | |
| verbose_logger.debug( | |
| f"prometheus Logging - Enters failure logging function for kwargs {kwargs}" | |
| ) | |
| # unpack kwargs | |
| model = kwargs.get("model", "") | |
| standard_logging_payload: StandardLoggingPayload = kwargs.get( | |
| "standard_logging_object", {} | |
| ) | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| end_user_id = get_end_user_id_for_cost_tracking( | |
| litellm_params, service_type="prometheus" | |
| ) | |
| user_id = standard_logging_payload["metadata"]["user_api_key_user_id"] | |
| user_api_key = standard_logging_payload["metadata"]["user_api_key_hash"] | |
| user_api_key_alias = standard_logging_payload["metadata"]["user_api_key_alias"] | |
| user_api_team = standard_logging_payload["metadata"]["user_api_key_team_id"] | |
| user_api_team_alias = standard_logging_payload["metadata"][ | |
| "user_api_key_team_alias" | |
| ] | |
| kwargs.get("exception", None) | |
| try: | |
| self.litellm_llm_api_failed_requests_metric.labels( | |
| end_user_id, | |
| user_api_key, | |
| user_api_key_alias, | |
| model, | |
| user_api_team, | |
| user_api_team_alias, | |
| user_id, | |
| ).inc() | |
| self.set_llm_deployment_failure_metrics(kwargs) | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "prometheus Layer Error(): Exception occured - {}".format(str(e)) | |
| ) | |
| pass | |
| pass | |
| async def async_post_call_failure_hook( | |
| self, | |
| request_data: dict, | |
| original_exception: Exception, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| ): | |
| """ | |
| Track client side failures | |
| Proxy level tracking - failed client side requests | |
| labelnames=[ | |
| "end_user", | |
| "hashed_api_key", | |
| "api_key_alias", | |
| REQUESTED_MODEL, | |
| "team", | |
| "team_alias", | |
| ] + EXCEPTION_LABELS, | |
| """ | |
| try: | |
| _tags = cast(List[str], request_data.get("tags") or []) | |
| enum_values = UserAPIKeyLabelValues( | |
| end_user=user_api_key_dict.end_user_id, | |
| user=user_api_key_dict.user_id, | |
| user_email=user_api_key_dict.user_email, | |
| hashed_api_key=user_api_key_dict.api_key, | |
| api_key_alias=user_api_key_dict.key_alias, | |
| team=user_api_key_dict.team_id, | |
| team_alias=user_api_key_dict.team_alias, | |
| requested_model=request_data.get("model", ""), | |
| status_code=str(getattr(original_exception, "status_code", None)), | |
| exception_status=str(getattr(original_exception, "status_code", None)), | |
| exception_class=self._get_exception_class_name(original_exception), | |
| tags=_tags, | |
| ) | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_proxy_failed_requests_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_proxy_failed_requests_metric.labels(**_labels).inc() | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_proxy_total_requests_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_proxy_total_requests_metric.labels(**_labels).inc() | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "prometheus Layer Error(): Exception occured - {}".format(str(e)) | |
| ) | |
| pass | |
| async def async_post_call_success_hook( | |
| self, data: dict, user_api_key_dict: UserAPIKeyAuth, response | |
| ): | |
| """ | |
| Proxy level tracking - triggered when the proxy responds with a success response to the client | |
| """ | |
| try: | |
| enum_values = UserAPIKeyLabelValues( | |
| end_user=user_api_key_dict.end_user_id, | |
| hashed_api_key=user_api_key_dict.api_key, | |
| api_key_alias=user_api_key_dict.key_alias, | |
| requested_model=data.get("model", ""), | |
| team=user_api_key_dict.team_id, | |
| team_alias=user_api_key_dict.team_alias, | |
| user=user_api_key_dict.user_id, | |
| user_email=user_api_key_dict.user_email, | |
| status_code="200", | |
| ) | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_proxy_total_requests_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_proxy_total_requests_metric.labels(**_labels).inc() | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "prometheus Layer Error(): Exception occured - {}".format(str(e)) | |
| ) | |
| pass | |
| def set_llm_deployment_failure_metrics(self, request_kwargs: dict): | |
| """ | |
| Sets Failure metrics when an LLM API call fails | |
| - mark the deployment as partial outage | |
| - increment deployment failure responses metric | |
| - increment deployment total requests metric | |
| Args: | |
| request_kwargs: dict | |
| """ | |
| try: | |
| verbose_logger.debug("setting remaining tokens requests metric") | |
| standard_logging_payload: StandardLoggingPayload = request_kwargs.get( | |
| "standard_logging_object", {} | |
| ) | |
| _litellm_params = request_kwargs.get("litellm_params", {}) or {} | |
| litellm_model_name = request_kwargs.get("model", None) | |
| model_group = standard_logging_payload.get("model_group", None) | |
| api_base = standard_logging_payload.get("api_base", None) | |
| model_id = standard_logging_payload.get("model_id", None) | |
| exception: Exception = request_kwargs.get("exception", None) | |
| llm_provider = _litellm_params.get("custom_llm_provider", None) | |
| """ | |
| log these labels | |
| ["litellm_model_name", "model_id", "api_base", "api_provider"] | |
| """ | |
| self.set_deployment_partial_outage( | |
| litellm_model_name=litellm_model_name, | |
| model_id=model_id, | |
| api_base=api_base, | |
| api_provider=llm_provider, | |
| ) | |
| self.litellm_deployment_failure_responses.labels( | |
| litellm_model_name=litellm_model_name, | |
| model_id=model_id, | |
| api_base=api_base, | |
| api_provider=llm_provider, | |
| exception_status=str(getattr(exception, "status_code", None)), | |
| exception_class=self._get_exception_class_name(exception), | |
| requested_model=model_group, | |
| hashed_api_key=standard_logging_payload["metadata"][ | |
| "user_api_key_hash" | |
| ], | |
| api_key_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_alias" | |
| ], | |
| team=standard_logging_payload["metadata"]["user_api_key_team_id"], | |
| team_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_team_alias" | |
| ], | |
| ).inc() | |
| # tag based tracking | |
| if standard_logging_payload is not None and isinstance( | |
| standard_logging_payload, dict | |
| ): | |
| _tags = standard_logging_payload["request_tags"] | |
| for tag in _tags: | |
| self.litellm_deployment_failure_by_tag_responses.labels( | |
| **{ | |
| UserAPIKeyLabelNames.REQUESTED_MODEL.value: model_group, | |
| UserAPIKeyLabelNames.TAG.value: tag, | |
| UserAPIKeyLabelNames.v2_LITELLM_MODEL_NAME.value: litellm_model_name, | |
| UserAPIKeyLabelNames.MODEL_ID.value: model_id, | |
| UserAPIKeyLabelNames.API_BASE.value: api_base, | |
| UserAPIKeyLabelNames.API_PROVIDER.value: llm_provider, | |
| UserAPIKeyLabelNames.EXCEPTION_CLASS.value: exception.__class__.__name__, | |
| UserAPIKeyLabelNames.EXCEPTION_STATUS.value: str( | |
| getattr(exception, "status_code", None) | |
| ), | |
| } | |
| ).inc() | |
| self.litellm_deployment_total_requests.labels( | |
| litellm_model_name=litellm_model_name, | |
| model_id=model_id, | |
| api_base=api_base, | |
| api_provider=llm_provider, | |
| requested_model=model_group, | |
| hashed_api_key=standard_logging_payload["metadata"][ | |
| "user_api_key_hash" | |
| ], | |
| api_key_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_alias" | |
| ], | |
| team=standard_logging_payload["metadata"]["user_api_key_team_id"], | |
| team_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_team_alias" | |
| ], | |
| ).inc() | |
| pass | |
| except Exception as e: | |
| verbose_logger.debug( | |
| "Prometheus Error: set_llm_deployment_failure_metrics. Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| def set_llm_deployment_success_metrics( | |
| self, | |
| request_kwargs: dict, | |
| start_time, | |
| end_time, | |
| enum_values: UserAPIKeyLabelValues, | |
| output_tokens: float = 1.0, | |
| ): | |
| try: | |
| verbose_logger.debug("setting remaining tokens requests metric") | |
| standard_logging_payload: Optional[ | |
| StandardLoggingPayload | |
| ] = request_kwargs.get("standard_logging_object") | |
| if standard_logging_payload is None: | |
| return | |
| model_group = standard_logging_payload["model_group"] | |
| api_base = standard_logging_payload["api_base"] | |
| _response_headers = request_kwargs.get("response_headers") | |
| _litellm_params = request_kwargs.get("litellm_params", {}) or {} | |
| _metadata = _litellm_params.get("metadata", {}) | |
| litellm_model_name = request_kwargs.get("model", None) | |
| llm_provider = _litellm_params.get("custom_llm_provider", None) | |
| _model_info = _metadata.get("model_info") or {} | |
| model_id = _model_info.get("id", None) | |
| remaining_requests: Optional[int] = None | |
| remaining_tokens: Optional[int] = None | |
| if additional_headers := standard_logging_payload["hidden_params"][ | |
| "additional_headers" | |
| ]: | |
| # OpenAI / OpenAI Compatible headers | |
| remaining_requests = additional_headers.get( | |
| "x_ratelimit_remaining_requests", None | |
| ) | |
| remaining_tokens = additional_headers.get( | |
| "x_ratelimit_remaining_tokens", None | |
| ) | |
| if litellm_overhead_time_ms := standard_logging_payload[ | |
| "hidden_params" | |
| ].get("litellm_overhead_time_ms"): | |
| self.litellm_overhead_latency_metric.labels( | |
| model_group, | |
| llm_provider, | |
| api_base, | |
| litellm_model_name, | |
| standard_logging_payload["metadata"]["user_api_key_hash"], | |
| standard_logging_payload["metadata"]["user_api_key_alias"], | |
| ).observe( | |
| litellm_overhead_time_ms / 1000 | |
| ) # set as seconds | |
| if remaining_requests: | |
| """ | |
| "model_group", | |
| "api_provider", | |
| "api_base", | |
| "litellm_model_name" | |
| """ | |
| self.litellm_remaining_requests_metric.labels( | |
| model_group, | |
| llm_provider, | |
| api_base, | |
| litellm_model_name, | |
| standard_logging_payload["metadata"]["user_api_key_hash"], | |
| standard_logging_payload["metadata"]["user_api_key_alias"], | |
| ).set(remaining_requests) | |
| if remaining_tokens: | |
| self.litellm_remaining_tokens_metric.labels( | |
| model_group, | |
| llm_provider, | |
| api_base, | |
| litellm_model_name, | |
| standard_logging_payload["metadata"]["user_api_key_hash"], | |
| standard_logging_payload["metadata"]["user_api_key_alias"], | |
| ).set(remaining_tokens) | |
| """ | |
| log these labels | |
| ["litellm_model_name", "requested_model", model_id", "api_base", "api_provider"] | |
| """ | |
| self.set_deployment_healthy( | |
| litellm_model_name=litellm_model_name, | |
| model_id=model_id, | |
| api_base=api_base, | |
| api_provider=llm_provider, | |
| ) | |
| self.litellm_deployment_success_responses.labels( | |
| litellm_model_name=litellm_model_name, | |
| model_id=model_id, | |
| api_base=api_base, | |
| api_provider=llm_provider, | |
| requested_model=model_group, | |
| hashed_api_key=standard_logging_payload["metadata"][ | |
| "user_api_key_hash" | |
| ], | |
| api_key_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_alias" | |
| ], | |
| team=standard_logging_payload["metadata"]["user_api_key_team_id"], | |
| team_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_team_alias" | |
| ], | |
| ).inc() | |
| self.litellm_deployment_total_requests.labels( | |
| litellm_model_name=litellm_model_name, | |
| model_id=model_id, | |
| api_base=api_base, | |
| api_provider=llm_provider, | |
| requested_model=model_group, | |
| hashed_api_key=standard_logging_payload["metadata"][ | |
| "user_api_key_hash" | |
| ], | |
| api_key_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_alias" | |
| ], | |
| team=standard_logging_payload["metadata"]["user_api_key_team_id"], | |
| team_alias=standard_logging_payload["metadata"][ | |
| "user_api_key_team_alias" | |
| ], | |
| ).inc() | |
| # Track deployment Latency | |
| response_ms: timedelta = end_time - start_time | |
| time_to_first_token_response_time: Optional[timedelta] = None | |
| if ( | |
| request_kwargs.get("stream", None) is not None | |
| and request_kwargs["stream"] is True | |
| ): | |
| # only log ttft for streaming request | |
| time_to_first_token_response_time = ( | |
| request_kwargs.get("completion_start_time", end_time) - start_time | |
| ) | |
| # use the metric that is not None | |
| # if streaming - use time_to_first_token_response | |
| # if not streaming - use response_ms | |
| _latency: timedelta = time_to_first_token_response_time or response_ms | |
| _latency_seconds = _latency.total_seconds() | |
| # latency per output token | |
| latency_per_token = None | |
| if output_tokens is not None and output_tokens > 0: | |
| latency_per_token = _latency_seconds / output_tokens | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_deployment_latency_per_output_token" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_deployment_latency_per_output_token.labels( | |
| **_labels | |
| ).observe(latency_per_token) | |
| except Exception as e: | |
| verbose_logger.error( | |
| "Prometheus Error: set_llm_deployment_success_metrics. Exception occured - {}".format( | |
| str(e) | |
| ) | |
| ) | |
| return | |
| def _get_exception_class_name(exception: Exception) -> str: | |
| exception_class_name = "" | |
| if hasattr(exception, "llm_provider"): | |
| exception_class_name = getattr(exception, "llm_provider") or "" | |
| # pretty print the provider name on prometheus | |
| # eg. `openai` -> `Openai.` | |
| if len(exception_class_name) >= 1: | |
| exception_class_name = ( | |
| exception_class_name[0].upper() + exception_class_name[1:] + "." | |
| ) | |
| exception_class_name += exception.__class__.__name__ | |
| return exception_class_name | |
| async def log_success_fallback_event( | |
| self, original_model_group: str, kwargs: dict, original_exception: Exception | |
| ): | |
| """ | |
| Logs a successful LLM fallback event on prometheus | |
| """ | |
| from litellm.litellm_core_utils.litellm_logging import ( | |
| StandardLoggingMetadata, | |
| StandardLoggingPayloadSetup, | |
| ) | |
| verbose_logger.debug( | |
| "Prometheus: log_success_fallback_event, original_model_group: %s, kwargs: %s", | |
| original_model_group, | |
| kwargs, | |
| ) | |
| _metadata = kwargs.get("metadata", {}) | |
| standard_metadata: StandardLoggingMetadata = ( | |
| StandardLoggingPayloadSetup.get_standard_logging_metadata( | |
| metadata=_metadata | |
| ) | |
| ) | |
| _new_model = kwargs.get("model") | |
| _tags = cast(List[str], kwargs.get("tags") or []) | |
| enum_values = UserAPIKeyLabelValues( | |
| requested_model=original_model_group, | |
| fallback_model=_new_model, | |
| hashed_api_key=standard_metadata["user_api_key_hash"], | |
| api_key_alias=standard_metadata["user_api_key_alias"], | |
| team=standard_metadata["user_api_key_team_id"], | |
| team_alias=standard_metadata["user_api_key_team_alias"], | |
| exception_status=str(getattr(original_exception, "status_code", None)), | |
| exception_class=self._get_exception_class_name(original_exception), | |
| tags=_tags, | |
| ) | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_deployment_successful_fallbacks" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_deployment_successful_fallbacks.labels(**_labels).inc() | |
| async def log_failure_fallback_event( | |
| self, original_model_group: str, kwargs: dict, original_exception: Exception | |
| ): | |
| """ | |
| Logs a failed LLM fallback event on prometheus | |
| """ | |
| from litellm.litellm_core_utils.litellm_logging import ( | |
| StandardLoggingMetadata, | |
| StandardLoggingPayloadSetup, | |
| ) | |
| verbose_logger.debug( | |
| "Prometheus: log_failure_fallback_event, original_model_group: %s, kwargs: %s", | |
| original_model_group, | |
| kwargs, | |
| ) | |
| _new_model = kwargs.get("model") | |
| _metadata = kwargs.get("metadata", {}) | |
| _tags = cast(List[str], kwargs.get("tags") or []) | |
| standard_metadata: StandardLoggingMetadata = ( | |
| StandardLoggingPayloadSetup.get_standard_logging_metadata( | |
| metadata=_metadata | |
| ) | |
| ) | |
| enum_values = UserAPIKeyLabelValues( | |
| requested_model=original_model_group, | |
| fallback_model=_new_model, | |
| hashed_api_key=standard_metadata["user_api_key_hash"], | |
| api_key_alias=standard_metadata["user_api_key_alias"], | |
| team=standard_metadata["user_api_key_team_id"], | |
| team_alias=standard_metadata["user_api_key_team_alias"], | |
| exception_status=str(getattr(original_exception, "status_code", None)), | |
| exception_class=self._get_exception_class_name(original_exception), | |
| tags=_tags, | |
| ) | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_deployment_failed_fallbacks" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_deployment_failed_fallbacks.labels(**_labels).inc() | |
| def set_litellm_deployment_state( | |
| self, | |
| state: int, | |
| litellm_model_name: str, | |
| model_id: Optional[str], | |
| api_base: Optional[str], | |
| api_provider: str, | |
| ): | |
| self.litellm_deployment_state.labels( | |
| litellm_model_name, model_id, api_base, api_provider | |
| ).set(state) | |
| def set_deployment_healthy( | |
| self, | |
| litellm_model_name: str, | |
| model_id: str, | |
| api_base: str, | |
| api_provider: str, | |
| ): | |
| self.set_litellm_deployment_state( | |
| 0, litellm_model_name, model_id, api_base, api_provider | |
| ) | |
| def set_deployment_partial_outage( | |
| self, | |
| litellm_model_name: str, | |
| model_id: Optional[str], | |
| api_base: Optional[str], | |
| api_provider: str, | |
| ): | |
| self.set_litellm_deployment_state( | |
| 1, litellm_model_name, model_id, api_base, api_provider | |
| ) | |
| def set_deployment_complete_outage( | |
| self, | |
| litellm_model_name: str, | |
| model_id: Optional[str], | |
| api_base: Optional[str], | |
| api_provider: str, | |
| ): | |
| self.set_litellm_deployment_state( | |
| 2, litellm_model_name, model_id, api_base, api_provider | |
| ) | |
| def increment_deployment_cooled_down( | |
| self, | |
| litellm_model_name: str, | |
| model_id: str, | |
| api_base: str, | |
| api_provider: str, | |
| exception_status: str, | |
| ): | |
| """ | |
| increment metric when litellm.Router / load balancing logic places a deployment in cool down | |
| """ | |
| self.litellm_deployment_cooled_down.labels( | |
| litellm_model_name, model_id, api_base, api_provider, exception_status | |
| ).inc() | |
| def track_provider_remaining_budget( | |
| self, provider: str, spend: float, budget_limit: float | |
| ): | |
| """ | |
| Track provider remaining budget in Prometheus | |
| """ | |
| self.litellm_provider_remaining_budget_metric.labels(provider).set( | |
| self._safe_get_remaining_budget( | |
| max_budget=budget_limit, | |
| spend=spend, | |
| ) | |
| ) | |
| def _safe_get_remaining_budget( | |
| self, max_budget: Optional[float], spend: Optional[float] | |
| ) -> float: | |
| if max_budget is None: | |
| return float("inf") | |
| if spend is None: | |
| return max_budget | |
| return max_budget - spend | |
| async def _initialize_budget_metrics( | |
| self, | |
| data_fetch_function: Callable[..., Awaitable[Tuple[List[Any], Optional[int]]]], | |
| set_metrics_function: Callable[[List[Any]], Awaitable[None]], | |
| data_type: Literal["teams", "keys"], | |
| ): | |
| """ | |
| Generic method to initialize budget metrics for teams or API keys. | |
| Args: | |
| data_fetch_function: Function to fetch data with pagination. | |
| set_metrics_function: Function to set metrics for the fetched data. | |
| data_type: String representing the type of data ("teams" or "keys") for logging purposes. | |
| """ | |
| from litellm.proxy.proxy_server import prisma_client | |
| if prisma_client is None: | |
| return | |
| try: | |
| page = 1 | |
| page_size = 50 | |
| data, total_count = await data_fetch_function( | |
| page_size=page_size, page=page | |
| ) | |
| if total_count is None: | |
| total_count = len(data) | |
| # Calculate total pages needed | |
| total_pages = (total_count + page_size - 1) // page_size | |
| # Set metrics for first page of data | |
| await set_metrics_function(data) | |
| # Get and set metrics for remaining pages | |
| for page in range(2, total_pages + 1): | |
| data, _ = await data_fetch_function(page_size=page_size, page=page) | |
| await set_metrics_function(data) | |
| except Exception as e: | |
| verbose_logger.exception( | |
| f"Error initializing {data_type} budget metrics: {str(e)}" | |
| ) | |
| async def _initialize_team_budget_metrics(self): | |
| """ | |
| Initialize team budget metrics by reusing the generic pagination logic. | |
| """ | |
| from litellm.proxy.management_endpoints.team_endpoints import ( | |
| get_paginated_teams, | |
| ) | |
| from litellm.proxy.proxy_server import prisma_client | |
| if prisma_client is None: | |
| verbose_logger.debug( | |
| "Prometheus: skipping team metrics initialization, DB not initialized" | |
| ) | |
| return | |
| async def fetch_teams( | |
| page_size: int, page: int | |
| ) -> Tuple[List[LiteLLM_TeamTable], Optional[int]]: | |
| teams, total_count = await get_paginated_teams( | |
| prisma_client=prisma_client, page_size=page_size, page=page | |
| ) | |
| if total_count is None: | |
| total_count = len(teams) | |
| return teams, total_count | |
| await self._initialize_budget_metrics( | |
| data_fetch_function=fetch_teams, | |
| set_metrics_function=self._set_team_list_budget_metrics, | |
| data_type="teams", | |
| ) | |
| async def _initialize_api_key_budget_metrics(self): | |
| """ | |
| Initialize API key budget metrics by reusing the generic pagination logic. | |
| """ | |
| from typing import Union | |
| from litellm.constants import UI_SESSION_TOKEN_TEAM_ID | |
| from litellm.proxy.management_endpoints.key_management_endpoints import ( | |
| _list_key_helper, | |
| ) | |
| from litellm.proxy.proxy_server import prisma_client | |
| if prisma_client is None: | |
| verbose_logger.debug( | |
| "Prometheus: skipping key metrics initialization, DB not initialized" | |
| ) | |
| return | |
| async def fetch_keys( | |
| page_size: int, page: int | |
| ) -> Tuple[List[Union[str, UserAPIKeyAuth]], Optional[int]]: | |
| key_list_response = await _list_key_helper( | |
| prisma_client=prisma_client, | |
| page=page, | |
| size=page_size, | |
| user_id=None, | |
| team_id=None, | |
| key_alias=None, | |
| key_hash=None, | |
| exclude_team_id=UI_SESSION_TOKEN_TEAM_ID, | |
| return_full_object=True, | |
| organization_id=None, | |
| ) | |
| keys = key_list_response.get("keys", []) | |
| total_count = key_list_response.get("total_count") | |
| if total_count is None: | |
| total_count = len(keys) | |
| return keys, total_count | |
| await self._initialize_budget_metrics( | |
| data_fetch_function=fetch_keys, | |
| set_metrics_function=self._set_key_list_budget_metrics, | |
| data_type="keys", | |
| ) | |
| async def initialize_remaining_budget_metrics(self): | |
| """ | |
| Handler for initializing remaining budget metrics for all teams to avoid metric discrepancies. | |
| Runs when prometheus logger starts up. | |
| - If redis cache is available, we use the pod lock manager to acquire a lock and initialize the metrics. | |
| - Ensures only one pod emits the metrics at a time. | |
| - If redis cache is not available, we initialize the metrics directly. | |
| """ | |
| from litellm.constants import PROMETHEUS_EMIT_BUDGET_METRICS_JOB_NAME | |
| from litellm.proxy.proxy_server import proxy_logging_obj | |
| pod_lock_manager = proxy_logging_obj.db_spend_update_writer.pod_lock_manager | |
| # if using redis, ensure only one pod emits the metrics at a time | |
| if pod_lock_manager and pod_lock_manager.redis_cache: | |
| if await pod_lock_manager.acquire_lock( | |
| cronjob_id=PROMETHEUS_EMIT_BUDGET_METRICS_JOB_NAME | |
| ): | |
| try: | |
| await self._initialize_remaining_budget_metrics() | |
| finally: | |
| await pod_lock_manager.release_lock( | |
| cronjob_id=PROMETHEUS_EMIT_BUDGET_METRICS_JOB_NAME | |
| ) | |
| else: | |
| # if not using redis, initialize the metrics directly | |
| await self._initialize_remaining_budget_metrics() | |
| async def _initialize_remaining_budget_metrics(self): | |
| """ | |
| Helper to initialize remaining budget metrics for all teams and API keys. | |
| """ | |
| verbose_logger.debug("Emitting key, team budget metrics....") | |
| await self._initialize_team_budget_metrics() | |
| await self._initialize_api_key_budget_metrics() | |
| async def _set_key_list_budget_metrics( | |
| self, keys: List[Union[str, UserAPIKeyAuth]] | |
| ): | |
| """Helper function to set budget metrics for a list of keys""" | |
| for key in keys: | |
| if isinstance(key, UserAPIKeyAuth): | |
| self._set_key_budget_metrics(key) | |
| async def _set_team_list_budget_metrics(self, teams: List[LiteLLM_TeamTable]): | |
| """Helper function to set budget metrics for a list of teams""" | |
| for team in teams: | |
| self._set_team_budget_metrics(team) | |
| async def _set_team_budget_metrics_after_api_request( | |
| self, | |
| user_api_team: Optional[str], | |
| user_api_team_alias: Optional[str], | |
| team_spend: float, | |
| team_max_budget: float, | |
| response_cost: float, | |
| ): | |
| """ | |
| Set team budget metrics after an LLM API request | |
| - Assemble a LiteLLM_TeamTable object | |
| - looks up team info from db if not available in metadata | |
| - Set team budget metrics | |
| """ | |
| if user_api_team: | |
| team_object = await self._assemble_team_object( | |
| team_id=user_api_team, | |
| team_alias=user_api_team_alias or "", | |
| spend=team_spend, | |
| max_budget=team_max_budget, | |
| response_cost=response_cost, | |
| ) | |
| self._set_team_budget_metrics(team_object) | |
| async def _assemble_team_object( | |
| self, | |
| team_id: str, | |
| team_alias: str, | |
| spend: Optional[float], | |
| max_budget: Optional[float], | |
| response_cost: float, | |
| ) -> LiteLLM_TeamTable: | |
| """ | |
| Assemble a LiteLLM_TeamTable object | |
| for fields not available in metadata, we fetch from db | |
| Fields not available in metadata: | |
| - `budget_reset_at` | |
| """ | |
| from litellm.proxy.auth.auth_checks import get_team_object | |
| from litellm.proxy.proxy_server import prisma_client, user_api_key_cache | |
| _total_team_spend = (spend or 0) + response_cost | |
| team_object = LiteLLM_TeamTable( | |
| team_id=team_id, | |
| team_alias=team_alias, | |
| spend=_total_team_spend, | |
| max_budget=max_budget, | |
| ) | |
| try: | |
| team_info = await get_team_object( | |
| team_id=team_id, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| ) | |
| except Exception as e: | |
| verbose_logger.debug( | |
| f"[Non-Blocking] Prometheus: Error getting team info: {str(e)}" | |
| ) | |
| return team_object | |
| if team_info: | |
| team_object.budget_reset_at = team_info.budget_reset_at | |
| return team_object | |
| def _set_team_budget_metrics( | |
| self, | |
| team: LiteLLM_TeamTable, | |
| ): | |
| """ | |
| Set team budget metrics for a single team | |
| - Remaining Budget | |
| - Max Budget | |
| - Budget Reset At | |
| """ | |
| enum_values = UserAPIKeyLabelValues( | |
| team=team.team_id, | |
| team_alias=team.team_alias or "", | |
| ) | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_remaining_team_budget_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_remaining_team_budget_metric.labels(**_labels).set( | |
| self._safe_get_remaining_budget( | |
| max_budget=team.max_budget, | |
| spend=team.spend, | |
| ) | |
| ) | |
| if team.max_budget is not None: | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_team_max_budget_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_team_max_budget_metric.labels(**_labels).set(team.max_budget) | |
| if team.budget_reset_at is not None: | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_team_budget_remaining_hours_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_team_budget_remaining_hours_metric.labels(**_labels).set( | |
| self._get_remaining_hours_for_budget_reset( | |
| budget_reset_at=team.budget_reset_at | |
| ) | |
| ) | |
| def _set_key_budget_metrics(self, user_api_key_dict: UserAPIKeyAuth): | |
| """ | |
| Set virtual key budget metrics | |
| - Remaining Budget | |
| - Max Budget | |
| - Budget Reset At | |
| """ | |
| enum_values = UserAPIKeyLabelValues( | |
| hashed_api_key=user_api_key_dict.token, | |
| api_key_alias=user_api_key_dict.key_alias or "", | |
| ) | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_remaining_api_key_budget_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_remaining_api_key_budget_metric.labels(**_labels).set( | |
| self._safe_get_remaining_budget( | |
| max_budget=user_api_key_dict.max_budget, | |
| spend=user_api_key_dict.spend, | |
| ) | |
| ) | |
| if user_api_key_dict.max_budget is not None: | |
| _labels = prometheus_label_factory( | |
| supported_enum_labels=PrometheusMetricLabels.get_labels( | |
| label_name="litellm_api_key_max_budget_metric" | |
| ), | |
| enum_values=enum_values, | |
| ) | |
| self.litellm_api_key_max_budget_metric.labels(**_labels).set( | |
| user_api_key_dict.max_budget | |
| ) | |
| if user_api_key_dict.budget_reset_at is not None: | |
| self.litellm_api_key_budget_remaining_hours_metric.labels(**_labels).set( | |
| self._get_remaining_hours_for_budget_reset( | |
| budget_reset_at=user_api_key_dict.budget_reset_at | |
| ) | |
| ) | |
| async def _set_api_key_budget_metrics_after_api_request( | |
| self, | |
| user_api_key: Optional[str], | |
| user_api_key_alias: Optional[str], | |
| response_cost: float, | |
| key_max_budget: float, | |
| key_spend: Optional[float], | |
| ): | |
| if user_api_key: | |
| user_api_key_dict = await self._assemble_key_object( | |
| user_api_key=user_api_key, | |
| user_api_key_alias=user_api_key_alias or "", | |
| key_max_budget=key_max_budget, | |
| key_spend=key_spend, | |
| response_cost=response_cost, | |
| ) | |
| self._set_key_budget_metrics(user_api_key_dict) | |
| async def _assemble_key_object( | |
| self, | |
| user_api_key: str, | |
| user_api_key_alias: str, | |
| key_max_budget: float, | |
| key_spend: Optional[float], | |
| response_cost: float, | |
| ) -> UserAPIKeyAuth: | |
| """ | |
| Assemble a UserAPIKeyAuth object | |
| """ | |
| from litellm.proxy.auth.auth_checks import get_key_object | |
| from litellm.proxy.proxy_server import prisma_client, user_api_key_cache | |
| _total_key_spend = (key_spend or 0) + response_cost | |
| user_api_key_dict = UserAPIKeyAuth( | |
| token=user_api_key, | |
| key_alias=user_api_key_alias, | |
| max_budget=key_max_budget, | |
| spend=_total_key_spend, | |
| ) | |
| try: | |
| if user_api_key_dict.token: | |
| key_object = await get_key_object( | |
| hashed_token=user_api_key_dict.token, | |
| prisma_client=prisma_client, | |
| user_api_key_cache=user_api_key_cache, | |
| ) | |
| if key_object: | |
| user_api_key_dict.budget_reset_at = key_object.budget_reset_at | |
| except Exception as e: | |
| verbose_logger.debug( | |
| f"[Non-Blocking] Prometheus: Error getting key info: {str(e)}" | |
| ) | |
| return user_api_key_dict | |
| def _get_remaining_hours_for_budget_reset(self, budget_reset_at: datetime) -> float: | |
| """ | |
| Get remaining hours for budget reset | |
| """ | |
| return ( | |
| budget_reset_at - datetime.now(budget_reset_at.tzinfo) | |
| ).total_seconds() / 3600 | |
| def _safe_duration_seconds( | |
| self, | |
| start_time: Any, | |
| end_time: Any, | |
| ) -> Optional[float]: | |
| """ | |
| Compute the duration in seconds between two objects. | |
| Returns the duration as a float if both start and end are instances of datetime, | |
| otherwise returns None. | |
| """ | |
| if isinstance(start_time, datetime) and isinstance(end_time, datetime): | |
| return (end_time - start_time).total_seconds() | |
| return None | |
| def initialize_budget_metrics_cron_job(scheduler: AsyncIOScheduler): | |
| """ | |
| Initialize budget metrics as a cron job. This job runs every `PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES` minutes. | |
| It emits the current remaining budget metrics for all Keys and Teams. | |
| """ | |
| from litellm.constants import PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES | |
| from litellm.integrations.custom_logger import CustomLogger | |
| from litellm.integrations.prometheus import PrometheusLogger | |
| prometheus_loggers: List[ | |
| CustomLogger | |
| ] = litellm.logging_callback_manager.get_custom_loggers_for_type( | |
| callback_type=PrometheusLogger | |
| ) | |
| # we need to get the initialized prometheus logger instance(s) and call logger.initialize_remaining_budget_metrics() on them | |
| verbose_logger.debug("found %s prometheus loggers", len(prometheus_loggers)) | |
| if len(prometheus_loggers) > 0: | |
| prometheus_logger = cast(PrometheusLogger, prometheus_loggers[0]) | |
| verbose_logger.debug( | |
| "Initializing remaining budget metrics as a cron job executing every %s minutes" | |
| % PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES | |
| ) | |
| scheduler.add_job( | |
| prometheus_logger.initialize_remaining_budget_metrics, | |
| "interval", | |
| minutes=PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES, | |
| ) | |
| def _mount_metrics_endpoint(premium_user: bool): | |
| """ | |
| Mount the Prometheus metrics endpoint with optional authentication. | |
| Args: | |
| premium_user (bool): Whether the user is a premium user | |
| require_auth (bool, optional): Whether to require authentication for the metrics endpoint. | |
| Defaults to False. | |
| """ | |
| from prometheus_client import make_asgi_app | |
| from litellm._logging import verbose_proxy_logger | |
| from litellm.proxy._types import CommonProxyErrors | |
| from litellm.proxy.proxy_server import app | |
| if premium_user is not True: | |
| verbose_proxy_logger.warning( | |
| f"Prometheus metrics are only available for premium users. {CommonProxyErrors.not_premium_user.value}" | |
| ) | |
| # Create metrics ASGI app | |
| metrics_app = make_asgi_app() | |
| # Mount the metrics app to the app | |
| app.mount("/metrics", metrics_app) | |
| verbose_proxy_logger.debug( | |
| "Starting Prometheus Metrics on /metrics (no authentication)" | |
| ) | |
| def prometheus_label_factory( | |
| supported_enum_labels: List[str], | |
| enum_values: UserAPIKeyLabelValues, | |
| tag: Optional[str] = None, | |
| ) -> dict: | |
| """ | |
| Returns a dictionary of label + values for prometheus. | |
| Ensures end_user param is not sent to prometheus if it is not supported. | |
| """ | |
| # Extract dictionary from Pydantic object | |
| enum_dict = enum_values.model_dump() | |
| # Filter supported labels | |
| filtered_labels = { | |
| label: value | |
| for label, value in enum_dict.items() | |
| if label in supported_enum_labels | |
| } | |
| if UserAPIKeyLabelNames.END_USER.value in filtered_labels: | |
| filtered_labels["end_user"] = get_end_user_id_for_cost_tracking( | |
| litellm_params={"user_api_key_end_user_id": enum_values.end_user}, | |
| service_type="prometheus", | |
| ) | |
| if enum_values.custom_metadata_labels is not None: | |
| for key, value in enum_values.custom_metadata_labels.items(): | |
| if key in supported_enum_labels: | |
| filtered_labels[key] = value | |
| for label in supported_enum_labels: | |
| if label not in filtered_labels: | |
| filtered_labels[label] = None | |
| return filtered_labels | |
| def get_custom_labels_from_metadata(metadata: dict) -> Dict[str, str]: | |
| """ | |
| Get custom labels from metadata | |
| """ | |
| keys = litellm.custom_prometheus_metadata_labels | |
| if keys is None or len(keys) == 0: | |
| return {} | |
| result: Dict[str, str] = {} | |
| for key in keys: | |
| # Split the dot notation key into parts | |
| original_key = key | |
| key = key.replace("metadata.", "", 1) if key.startswith("metadata.") else key | |
| keys_parts = key.split(".") | |
| # Traverse through the dictionary using the parts | |
| value = metadata | |
| for part in keys_parts: | |
| value = value.get(part, None) # Get the value, return None if not found | |
| if value is None: | |
| break | |
| if value is not None and isinstance(value, str): | |
| result[original_key.replace(".", "_")] = value | |
| return result | |