Spaces:
Running
Running
# What is this? | |
## Common Utility file for Logging handler | |
# Logging function -> log the exact model details + what's being sent | Non-Blocking | |
import copy | |
import datetime | |
import json | |
import os | |
import re | |
import subprocess | |
import sys | |
import time | |
import traceback | |
import uuid | |
from datetime import datetime as dt_object | |
from functools import lru_cache | |
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, cast | |
from pydantic import BaseModel | |
import litellm | |
from litellm import ( | |
_custom_logger_compatible_callbacks_literal, | |
json_logs, | |
log_raw_request_response, | |
turn_off_message_logging, | |
) | |
from litellm._logging import _is_debugging_on, verbose_logger | |
from litellm.batches.batch_utils import _handle_completed_batch | |
from litellm.caching.caching import DualCache, InMemoryCache | |
from litellm.caching.caching_handler import LLMCachingHandler | |
from litellm.constants import ( | |
DEFAULT_MOCK_RESPONSE_COMPLETION_TOKEN_COUNT, | |
DEFAULT_MOCK_RESPONSE_PROMPT_TOKEN_COUNT, | |
) | |
from litellm.cost_calculator import ( | |
RealtimeAPITokenUsageProcessor, | |
_select_model_name_for_cost_calc, | |
) | |
from litellm.integrations.agentops import AgentOps | |
from litellm.integrations.anthropic_cache_control_hook import AnthropicCacheControlHook | |
from litellm.integrations.arize.arize import ArizeLogger | |
from litellm.integrations.custom_guardrail import CustomGuardrail | |
from litellm.integrations.custom_logger import CustomLogger | |
from litellm.integrations.mlflow import MlflowLogger | |
from litellm.integrations.pagerduty.pagerduty import PagerDutyAlerting | |
from litellm.integrations.rag_hooks.bedrock_knowledgebase import ( | |
BedrockKnowledgeBaseHook, | |
) | |
from litellm.litellm_core_utils.get_litellm_params import get_litellm_params | |
from litellm.litellm_core_utils.llm_cost_calc.tool_call_cost_tracking import ( | |
StandardBuiltInToolCostTracking, | |
) | |
from litellm.litellm_core_utils.model_param_helper import ModelParamHelper | |
from litellm.litellm_core_utils.redact_messages import ( | |
redact_message_input_output_from_custom_logger, | |
redact_message_input_output_from_logging, | |
) | |
from litellm.responses.utils import ResponseAPILoggingUtils | |
from litellm.types.llms.openai import ( | |
AllMessageValues, | |
Batch, | |
FineTuningJob, | |
HttpxBinaryResponseContent, | |
OpenAIFileObject, | |
OpenAIModerationResponse, | |
ResponseCompletedEvent, | |
ResponsesAPIResponse, | |
) | |
from litellm.types.rerank import RerankResponse | |
from litellm.types.router import CustomPricingLiteLLMParams | |
from litellm.types.utils import ( | |
CallTypes, | |
DynamicPromptManagementParamLiteral, | |
EmbeddingResponse, | |
ImageResponse, | |
LiteLLMBatch, | |
LiteLLMLoggingBaseClass, | |
LiteLLMRealtimeStreamLoggingObject, | |
ModelResponse, | |
ModelResponseStream, | |
RawRequestTypedDict, | |
StandardBuiltInToolsParams, | |
StandardCallbackDynamicParams, | |
StandardLoggingAdditionalHeaders, | |
StandardLoggingHiddenParams, | |
StandardLoggingMCPToolCall, | |
StandardLoggingMetadata, | |
StandardLoggingModelCostFailureDebugInformation, | |
StandardLoggingModelInformation, | |
StandardLoggingPayload, | |
StandardLoggingPayloadErrorInformation, | |
StandardLoggingPayloadStatus, | |
StandardLoggingPromptManagementMetadata, | |
TextCompletionResponse, | |
TranscriptionResponse, | |
Usage, | |
) | |
from litellm.utils import _get_base_model_from_metadata, executor, print_verbose | |
from ..integrations.argilla import ArgillaLogger | |
from ..integrations.arize.arize_phoenix import ArizePhoenixLogger | |
from ..integrations.athina import AthinaLogger | |
from ..integrations.azure_storage.azure_storage import AzureBlobStorageLogger | |
from ..integrations.braintrust_logging import BraintrustLogger | |
from ..integrations.custom_prompt_management import CustomPromptManagement | |
from ..integrations.datadog.datadog import DataDogLogger | |
from ..integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger | |
from ..integrations.dynamodb import DyanmoDBLogger | |
from ..integrations.galileo import GalileoObserve | |
from ..integrations.gcs_bucket.gcs_bucket import GCSBucketLogger | |
from ..integrations.gcs_pubsub.pub_sub import GcsPubSubLogger | |
from ..integrations.greenscale import GreenscaleLogger | |
from ..integrations.helicone import HeliconeLogger | |
from ..integrations.humanloop import HumanloopLogger | |
from ..integrations.lago import LagoLogger | |
from ..integrations.langfuse.langfuse import LangFuseLogger | |
from ..integrations.langfuse.langfuse_handler import LangFuseHandler | |
from ..integrations.langfuse.langfuse_prompt_management import LangfusePromptManagement | |
from ..integrations.langsmith import LangsmithLogger | |
from ..integrations.literal_ai import LiteralAILogger | |
from ..integrations.logfire_logger import LogfireLevel, LogfireLogger | |
from ..integrations.lunary import LunaryLogger | |
from ..integrations.openmeter import OpenMeterLogger | |
from ..integrations.opik.opik import OpikLogger | |
from ..integrations.prometheus import PrometheusLogger | |
from ..integrations.prompt_layer import PromptLayerLogger | |
from ..integrations.s3 import S3Logger | |
from ..integrations.supabase import Supabase | |
from ..integrations.traceloop import TraceloopLogger | |
from ..integrations.weights_biases import WeightsBiasesLogger | |
from .exception_mapping_utils import _get_response_headers | |
from .initialize_dynamic_callback_params import ( | |
initialize_standard_callback_dynamic_params as _initialize_standard_callback_dynamic_params, | |
) | |
from .specialty_caches.dynamic_logging_cache import DynamicLoggingCache | |
try: | |
from ..proxy.enterprise.enterprise_callbacks.generic_api_callback import ( | |
GenericAPILogger, | |
) | |
except Exception as e: | |
verbose_logger.debug( | |
f"[Non-Blocking] Unable to import GenericAPILogger - LiteLLM Enterprise Feature - {str(e)}" | |
) | |
_in_memory_loggers: List[Any] = [] | |
### GLOBAL VARIABLES ### | |
sentry_sdk_instance = None | |
capture_exception = None | |
add_breadcrumb = None | |
posthog = None | |
slack_app = None | |
alerts_channel = None | |
heliconeLogger = None | |
athinaLogger = None | |
promptLayerLogger = None | |
logfireLogger = None | |
weightsBiasesLogger = None | |
customLogger = None | |
langFuseLogger = None | |
openMeterLogger = None | |
lagoLogger = None | |
dataDogLogger = None | |
prometheusLogger = None | |
dynamoLogger = None | |
s3Logger = None | |
genericAPILogger = None | |
greenscaleLogger = None | |
lunaryLogger = None | |
supabaseClient = None | |
callback_list: Optional[List[str]] = [] | |
user_logger_fn = None | |
additional_details: Optional[Dict[str, str]] = {} | |
local_cache: Optional[Dict[str, str]] = {} | |
last_fetched_at = None | |
last_fetched_at_keys = None | |
#### | |
class ServiceTraceIDCache: | |
def __init__(self) -> None: | |
self.cache = InMemoryCache() | |
def get_cache(self, litellm_call_id: str, service_name: str) -> Optional[str]: | |
key_name = "{}:{}".format(service_name, litellm_call_id) | |
response = self.cache.get_cache(key=key_name) | |
return response | |
def set_cache(self, litellm_call_id: str, service_name: str, trace_id: str) -> None: | |
key_name = "{}:{}".format(service_name, litellm_call_id) | |
self.cache.set_cache(key=key_name, value=trace_id) | |
return None | |
in_memory_trace_id_cache = ServiceTraceIDCache() | |
in_memory_dynamic_logger_cache = DynamicLoggingCache() | |
class Logging(LiteLLMLoggingBaseClass): | |
global supabaseClient, promptLayerLogger, weightsBiasesLogger, logfireLogger, capture_exception, add_breadcrumb, lunaryLogger, logfireLogger, prometheusLogger, slack_app | |
custom_pricing: bool = False | |
stream_options = None | |
def __init__( | |
self, | |
model: str, | |
messages, | |
stream, | |
call_type, | |
start_time, | |
litellm_call_id: str, | |
function_id: str, | |
litellm_trace_id: Optional[str] = None, | |
dynamic_input_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = None, | |
dynamic_success_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = None, | |
dynamic_async_success_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = None, | |
dynamic_failure_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = None, | |
dynamic_async_failure_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = None, | |
applied_guardrails: Optional[List[str]] = None, | |
kwargs: Optional[Dict] = None, | |
log_raw_request_response: bool = False, | |
): | |
_input: Optional[str] = messages # save original value of messages | |
if messages is not None: | |
if isinstance(messages, str): | |
messages = [ | |
{"role": "user", "content": messages} | |
] # convert text completion input to the chat completion format | |
elif ( | |
isinstance(messages, list) | |
and len(messages) > 0 | |
and isinstance(messages[0], str) | |
): | |
new_messages = [] | |
for m in messages: | |
new_messages.append({"role": "user", "content": m}) | |
messages = new_messages | |
self.model = model | |
self.messages = copy.deepcopy(messages) | |
self.stream = stream | |
self.start_time = start_time # log the call start time | |
self.call_type = call_type | |
self.litellm_call_id = litellm_call_id | |
self.litellm_trace_id: str = litellm_trace_id or str(uuid.uuid4()) | |
self.function_id = function_id | |
self.streaming_chunks: List[Any] = [] # for generating complete stream response | |
self.sync_streaming_chunks: List[Any] = ( | |
[] | |
) # for generating complete stream response | |
self.log_raw_request_response = log_raw_request_response | |
# Initialize dynamic callbacks | |
self.dynamic_input_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = dynamic_input_callbacks | |
self.dynamic_success_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = dynamic_success_callbacks | |
self.dynamic_async_success_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = dynamic_async_success_callbacks | |
self.dynamic_failure_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = dynamic_failure_callbacks | |
self.dynamic_async_failure_callbacks: Optional[ | |
List[Union[str, Callable, CustomLogger]] | |
] = dynamic_async_failure_callbacks | |
# Process dynamic callbacks | |
self.process_dynamic_callbacks() | |
## DYNAMIC LANGFUSE / GCS / logging callback KEYS ## | |
self.standard_callback_dynamic_params: StandardCallbackDynamicParams = ( | |
self.initialize_standard_callback_dynamic_params(kwargs) | |
) | |
self.standard_built_in_tools_params: StandardBuiltInToolsParams = ( | |
self.initialize_standard_built_in_tools_params(kwargs) | |
) | |
## TIME TO FIRST TOKEN LOGGING ## | |
self.completion_start_time: Optional[datetime.datetime] = None | |
self._llm_caching_handler: Optional[LLMCachingHandler] = None | |
# INITIAL LITELLM_PARAMS | |
litellm_params = {} | |
if kwargs is not None: | |
litellm_params = get_litellm_params(**kwargs) | |
litellm_params = scrub_sensitive_keys_in_metadata(litellm_params) | |
self.litellm_params = litellm_params | |
self.model_call_details: Dict[str, Any] = { | |
"litellm_trace_id": litellm_trace_id, | |
"litellm_call_id": litellm_call_id, | |
"input": _input, | |
"litellm_params": litellm_params, | |
"applied_guardrails": applied_guardrails, | |
"model": model, | |
} | |
def process_dynamic_callbacks(self): | |
""" | |
Initializes CustomLogger compatible callbacks in self.dynamic_* callbacks | |
If a callback is in litellm._known_custom_logger_compatible_callbacks, it needs to be intialized and added to the respective dynamic_* callback list. | |
""" | |
# Process input callbacks | |
self.dynamic_input_callbacks = self._process_dynamic_callback_list( | |
self.dynamic_input_callbacks, dynamic_callbacks_type="input" | |
) | |
# Process failure callbacks | |
self.dynamic_failure_callbacks = self._process_dynamic_callback_list( | |
self.dynamic_failure_callbacks, dynamic_callbacks_type="failure" | |
) | |
# Process async failure callbacks | |
self.dynamic_async_failure_callbacks = self._process_dynamic_callback_list( | |
self.dynamic_async_failure_callbacks, dynamic_callbacks_type="async_failure" | |
) | |
# Process success callbacks | |
self.dynamic_success_callbacks = self._process_dynamic_callback_list( | |
self.dynamic_success_callbacks, dynamic_callbacks_type="success" | |
) | |
# Process async success callbacks | |
self.dynamic_async_success_callbacks = self._process_dynamic_callback_list( | |
self.dynamic_async_success_callbacks, dynamic_callbacks_type="async_success" | |
) | |
def _process_dynamic_callback_list( | |
self, | |
callback_list: Optional[List[Union[str, Callable, CustomLogger]]], | |
dynamic_callbacks_type: Literal[ | |
"input", "success", "failure", "async_success", "async_failure" | |
], | |
) -> Optional[List[Union[str, Callable, CustomLogger]]]: | |
""" | |
Helper function to initialize CustomLogger compatible callbacks in self.dynamic_* callbacks | |
- If a callback is in litellm._known_custom_logger_compatible_callbacks, | |
replace the string with the initialized callback class. | |
- If dynamic callback is a "success" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_async_success_callbacks | |
- If dynamic callback is a "failure" callback that is a known_custom_logger_compatible_callbacks then add it to dynamic_failure_callbacks | |
""" | |
if callback_list is None: | |
return None | |
processed_list: List[Union[str, Callable, CustomLogger]] = [] | |
for callback in callback_list: | |
if ( | |
isinstance(callback, str) | |
and callback in litellm._known_custom_logger_compatible_callbacks | |
): | |
callback_class = _init_custom_logger_compatible_class( | |
callback, internal_usage_cache=None, llm_router=None # type: ignore | |
) | |
if callback_class is not None: | |
processed_list.append(callback_class) | |
# If processing dynamic_success_callbacks, add to dynamic_async_success_callbacks | |
if dynamic_callbacks_type == "success": | |
if self.dynamic_async_success_callbacks is None: | |
self.dynamic_async_success_callbacks = [] | |
self.dynamic_async_success_callbacks.append(callback_class) | |
elif dynamic_callbacks_type == "failure": | |
if self.dynamic_async_failure_callbacks is None: | |
self.dynamic_async_failure_callbacks = [] | |
self.dynamic_async_failure_callbacks.append(callback_class) | |
else: | |
processed_list.append(callback) | |
return processed_list | |
def initialize_standard_callback_dynamic_params( | |
self, kwargs: Optional[Dict] = None | |
) -> StandardCallbackDynamicParams: | |
""" | |
Initialize the standard callback dynamic params from the kwargs | |
checks if langfuse_secret_key, gcs_bucket_name in kwargs and sets the corresponding attributes in StandardCallbackDynamicParams | |
""" | |
return _initialize_standard_callback_dynamic_params(kwargs) | |
def initialize_standard_built_in_tools_params( | |
self, kwargs: Optional[Dict] = None | |
) -> StandardBuiltInToolsParams: | |
""" | |
Initialize the standard built-in tools params from the kwargs | |
checks if web_search_options in kwargs or tools and sets the corresponding attribute in StandardBuiltInToolsParams | |
""" | |
return StandardBuiltInToolsParams( | |
web_search_options=StandardBuiltInToolCostTracking._get_web_search_options( | |
kwargs or {} | |
), | |
file_search=StandardBuiltInToolCostTracking._get_file_search_tool_call( | |
kwargs or {} | |
), | |
) | |
def update_environment_variables( | |
self, | |
litellm_params: Dict, | |
optional_params: Dict, | |
model: Optional[str] = None, | |
user: Optional[str] = None, | |
**additional_params, | |
): | |
self.optional_params = optional_params | |
if model is not None: | |
self.model = model | |
self.user = user | |
self.litellm_params = { | |
**self.litellm_params, | |
**scrub_sensitive_keys_in_metadata(litellm_params), | |
} | |
self.logger_fn = litellm_params.get("logger_fn", None) | |
verbose_logger.debug(f"self.optional_params: {self.optional_params}") | |
self.model_call_details.update( | |
{ | |
"model": self.model, | |
"messages": self.messages, | |
"optional_params": self.optional_params, | |
"litellm_params": self.litellm_params, | |
"start_time": self.start_time, | |
"stream": self.stream, | |
"user": user, | |
"call_type": str(self.call_type), | |
"litellm_call_id": self.litellm_call_id, | |
"completion_start_time": self.completion_start_time, | |
"standard_callback_dynamic_params": self.standard_callback_dynamic_params, | |
**self.optional_params, | |
**additional_params, | |
} | |
) | |
## check if stream options is set ## - used by CustomStreamWrapper for easy instrumentation | |
if "stream_options" in additional_params: | |
self.stream_options = additional_params["stream_options"] | |
## check if custom pricing set ## | |
custom_pricing_keys = CustomPricingLiteLLMParams.model_fields.keys() | |
for key in custom_pricing_keys: | |
if litellm_params.get(key) is not None: | |
self.custom_pricing = True | |
if "custom_llm_provider" in self.model_call_details: | |
self.custom_llm_provider = self.model_call_details["custom_llm_provider"] | |
def should_run_prompt_management_hooks( | |
self, | |
non_default_params: Dict, | |
prompt_id: Optional[str] = None, | |
) -> bool: | |
""" | |
Return True if prompt management hooks should be run | |
""" | |
if prompt_id: | |
return True | |
if self._should_run_prompt_management_hooks_without_prompt_id( | |
non_default_params=non_default_params | |
): | |
return True | |
return False | |
def _should_run_prompt_management_hooks_without_prompt_id( | |
self, | |
non_default_params: Dict, | |
) -> bool: | |
""" | |
Certain prompt management hooks don't need a `prompt_id` to be passed in, they are triggered by dynamic params | |
eg. AnthropicCacheControlHook and BedrockKnowledgeBaseHook both don't require a `prompt_id` to be passed in, they are triggered by dynamic params | |
""" | |
for param in non_default_params: | |
if param in DynamicPromptManagementParamLiteral.list_all_params(): | |
return True | |
return False | |
def get_chat_completion_prompt( | |
self, | |
model: str, | |
messages: List[AllMessageValues], | |
non_default_params: Dict, | |
prompt_id: Optional[str], | |
prompt_variables: Optional[dict], | |
prompt_management_logger: Optional[CustomLogger] = None, | |
) -> Tuple[str, List[AllMessageValues], dict]: | |
custom_logger = ( | |
prompt_management_logger | |
or self.get_custom_logger_for_prompt_management( | |
model=model, non_default_params=non_default_params | |
) | |
) | |
if custom_logger: | |
( | |
model, | |
messages, | |
non_default_params, | |
) = custom_logger.get_chat_completion_prompt( | |
model=model, | |
messages=messages, | |
non_default_params=non_default_params or {}, | |
prompt_id=prompt_id, | |
prompt_variables=prompt_variables, | |
dynamic_callback_params=self.standard_callback_dynamic_params, | |
) | |
self.messages = messages | |
return model, messages, non_default_params | |
async def async_get_chat_completion_prompt( | |
self, | |
model: str, | |
messages: List[AllMessageValues], | |
non_default_params: Dict, | |
prompt_id: Optional[str], | |
prompt_variables: Optional[dict], | |
prompt_management_logger: Optional[CustomLogger] = None, | |
) -> Tuple[str, List[AllMessageValues], dict]: | |
custom_logger = ( | |
prompt_management_logger | |
or self.get_custom_logger_for_prompt_management( | |
model=model, non_default_params=non_default_params | |
) | |
) | |
if custom_logger: | |
( | |
model, | |
messages, | |
non_default_params, | |
) = await custom_logger.async_get_chat_completion_prompt( | |
model=model, | |
messages=messages, | |
non_default_params=non_default_params or {}, | |
prompt_id=prompt_id, | |
prompt_variables=prompt_variables, | |
dynamic_callback_params=self.standard_callback_dynamic_params, | |
) | |
self.messages = messages | |
return model, messages, non_default_params | |
def get_custom_logger_for_prompt_management( | |
self, model: str, non_default_params: Dict | |
) -> Optional[CustomLogger]: | |
""" | |
Get a custom logger for prompt management based on model name or available callbacks. | |
Args: | |
model: The model name to check for prompt management integration | |
Returns: | |
A CustomLogger instance if one is found, None otherwise | |
""" | |
# First check if model starts with a known custom logger compatible callback | |
for callback_name in litellm._known_custom_logger_compatible_callbacks: | |
if model.startswith(callback_name): | |
custom_logger = _init_custom_logger_compatible_class( | |
logging_integration=callback_name, | |
internal_usage_cache=None, | |
llm_router=None, | |
) | |
if custom_logger is not None: | |
self.model_call_details["prompt_integration"] = model.split("/")[0] | |
return custom_logger | |
# Then check for any registered CustomPromptManagement loggers | |
prompt_management_loggers = ( | |
litellm.logging_callback_manager.get_custom_loggers_for_type( | |
callback_type=CustomPromptManagement | |
) | |
) | |
if prompt_management_loggers: | |
logger = prompt_management_loggers[0] | |
self.model_call_details["prompt_integration"] = logger.__class__.__name__ | |
return logger | |
if anthropic_cache_control_logger := AnthropicCacheControlHook.get_custom_logger_for_anthropic_cache_control_hook( | |
non_default_params | |
): | |
self.model_call_details["prompt_integration"] = ( | |
anthropic_cache_control_logger.__class__.__name__ | |
) | |
return anthropic_cache_control_logger | |
if bedrock_knowledgebase_logger := BedrockKnowledgeBaseHook.get_initialized_custom_logger( | |
non_default_params | |
): | |
self.model_call_details["prompt_integration"] = ( | |
bedrock_knowledgebase_logger.__class__.__name__ | |
) | |
return bedrock_knowledgebase_logger | |
return None | |
def get_custom_logger_for_anthropic_cache_control_hook( | |
self, non_default_params: Dict | |
) -> Optional[CustomLogger]: | |
if non_default_params.get("cache_control_injection_points", None): | |
custom_logger = _init_custom_logger_compatible_class( | |
logging_integration="anthropic_cache_control_hook", | |
internal_usage_cache=None, | |
llm_router=None, | |
) | |
return custom_logger | |
return None | |
def _get_raw_request_body(self, data: Optional[Union[dict, str]]) -> dict: | |
if data is None: | |
return {"error": "Received empty dictionary for raw request body"} | |
if isinstance(data, str): | |
try: | |
return json.loads(data) | |
except Exception: | |
return { | |
"error": "Unable to parse raw request body. Got - {}".format(data) | |
} | |
return data | |
def _get_masked_api_base(self, api_base: str) -> str: | |
if "key=" in api_base: | |
# Find the position of "key=" in the string | |
key_index = api_base.find("key=") + 4 | |
# Mask the last 5 characters after "key=" | |
masked_api_base = api_base[:key_index] + "*" * 5 + api_base[-4:] | |
else: | |
masked_api_base = api_base | |
return str(masked_api_base) | |
def _pre_call(self, input, api_key, model=None, additional_args={}): | |
""" | |
Common helper function across the sync + async pre-call function | |
""" | |
self.model_call_details["input"] = input | |
self.model_call_details["api_key"] = api_key | |
self.model_call_details["additional_args"] = additional_args | |
self.model_call_details["log_event_type"] = "pre_api_call" | |
if ( | |
model | |
): # if model name was changes pre-call, overwrite the initial model call name with the new one | |
self.model_call_details["model"] = model | |
self.model_call_details["litellm_params"]["api_base"] = ( | |
self._get_masked_api_base(additional_args.get("api_base", "")) | |
) | |
def pre_call(self, input, api_key, model=None, additional_args={}): # noqa: PLR0915 | |
# Log the exact input to the LLM API | |
litellm.error_logs["PRE_CALL"] = locals() | |
try: | |
self._pre_call( | |
input=input, | |
api_key=api_key, | |
model=model, | |
additional_args=additional_args, | |
) | |
# User Logging -> if you pass in a custom logging function | |
self._print_llm_call_debugging_log( | |
api_base=additional_args.get("api_base", ""), | |
headers=additional_args.get("headers", {}), | |
additional_args=additional_args, | |
) | |
# log raw request to provider (like LangFuse) -- if opted in. | |
if ( | |
self.log_raw_request_response is True | |
or log_raw_request_response is True | |
): | |
_litellm_params = self.model_call_details.get("litellm_params", {}) | |
_metadata = _litellm_params.get("metadata", {}) or {} | |
try: | |
# [Non-blocking Extra Debug Information in metadata] | |
if turn_off_message_logging is True: | |
_metadata["raw_request"] = ( | |
"redacted by litellm. \ | |
'litellm.turn_off_message_logging=True'" | |
) | |
else: | |
curl_command = self._get_request_curl_command( | |
api_base=additional_args.get("api_base", ""), | |
headers=additional_args.get("headers", {}), | |
additional_args=additional_args, | |
data=additional_args.get("complete_input_dict", {}), | |
) | |
_metadata["raw_request"] = str(curl_command) | |
# split up, so it's easier to parse in the UI | |
self.model_call_details["raw_request_typed_dict"] = ( | |
RawRequestTypedDict( | |
raw_request_api_base=str( | |
additional_args.get("api_base") or "" | |
), | |
raw_request_body=self._get_raw_request_body( | |
additional_args.get("complete_input_dict", {}) | |
), | |
raw_request_headers=self._get_masked_headers( | |
additional_args.get("headers", {}) or {}, | |
ignore_sensitive_headers=True, | |
), | |
error=None, | |
) | |
) | |
except Exception as e: | |
self.model_call_details["raw_request_typed_dict"] = ( | |
RawRequestTypedDict( | |
error=str(e), | |
) | |
) | |
_metadata["raw_request"] = ( | |
"Unable to Log \ | |
raw request: {}".format( | |
str(e) | |
) | |
) | |
if self.logger_fn and callable(self.logger_fn): | |
try: | |
self.logger_fn( | |
self.model_call_details | |
) # Expectation: any logger function passed in by the user should accept a dict object | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( | |
str(e) | |
) | |
) | |
self.model_call_details["api_call_start_time"] = datetime.datetime.now() | |
# Input Integration Logging -> If you want to log the fact that an attempt to call the model was made | |
callbacks = litellm.input_callback + (self.dynamic_input_callbacks or []) | |
for callback in callbacks: | |
try: | |
if callback == "supabase" and supabaseClient is not None: | |
verbose_logger.debug("reaches supabase for logging!") | |
model = self.model_call_details["model"] | |
messages = self.model_call_details["input"] | |
verbose_logger.debug(f"supabaseClient: {supabaseClient}") | |
supabaseClient.input_log_event( | |
model=model, | |
messages=messages, | |
end_user=self.model_call_details.get("user", "default"), | |
litellm_call_id=self.litellm_params["litellm_call_id"], | |
print_verbose=print_verbose, | |
) | |
elif callback == "sentry" and add_breadcrumb: | |
try: | |
details_to_log = copy.deepcopy(self.model_call_details) | |
except Exception: | |
details_to_log = self.model_call_details | |
if litellm.turn_off_message_logging: | |
# make a copy of the _model_Call_details and log it | |
details_to_log.pop("messages", None) | |
details_to_log.pop("input", None) | |
details_to_log.pop("prompt", None) | |
add_breadcrumb( | |
category="litellm.llm_call", | |
message=f"Model Call Details pre-call: {details_to_log}", | |
level="info", | |
) | |
elif isinstance(callback, CustomLogger): # custom logger class | |
callback.log_pre_api_call( | |
model=self.model, | |
messages=self.messages, | |
kwargs=self.model_call_details, | |
) | |
elif ( | |
callable(callback) and customLogger is not None | |
): # custom logger functions | |
customLogger.log_input_event( | |
model=self.model, | |
messages=self.messages, | |
kwargs=self.model_call_details, | |
print_verbose=print_verbose, | |
callback_func=callback, | |
) | |
except Exception as e: | |
verbose_logger.exception( | |
"litellm.Logging.pre_call(): Exception occured - {}".format( | |
str(e) | |
) | |
) | |
verbose_logger.debug( | |
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" | |
) | |
if capture_exception: # log this error to sentry for debugging | |
capture_exception(e) | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( | |
str(e) | |
) | |
) | |
verbose_logger.error( | |
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" | |
) | |
if capture_exception: # log this error to sentry for debugging | |
capture_exception(e) | |
def _print_llm_call_debugging_log( | |
self, | |
api_base: str, | |
headers: dict, | |
additional_args: dict, | |
): | |
""" | |
Internal debugging helper function | |
Prints the RAW curl command sent from LiteLLM | |
""" | |
if _is_debugging_on(): | |
if json_logs: | |
masked_headers = self._get_masked_headers(headers) | |
verbose_logger.debug( | |
"POST Request Sent from LiteLLM", | |
extra={"api_base": {api_base}, **masked_headers}, | |
) | |
else: | |
headers = additional_args.get("headers", {}) | |
if headers is None: | |
headers = {} | |
data = additional_args.get("complete_input_dict", {}) | |
api_base = str(additional_args.get("api_base", "")) | |
curl_command = self._get_request_curl_command( | |
api_base=api_base, | |
headers=headers, | |
additional_args=additional_args, | |
data=data, | |
) | |
verbose_logger.debug(f"\033[92m{curl_command}\033[0m\n") | |
def _get_request_body(self, data: dict) -> str: | |
return str(data) | |
def _get_request_curl_command( | |
self, api_base: str, headers: Optional[dict], additional_args: dict, data: dict | |
) -> str: | |
masked_api_base = self._get_masked_api_base(api_base) | |
if headers is None: | |
headers = {} | |
curl_command = "\n\nPOST Request Sent from LiteLLM:\n" | |
curl_command += "curl -X POST \\\n" | |
curl_command += f"{masked_api_base} \\\n" | |
masked_headers = self._get_masked_headers(headers) | |
formatted_headers = " ".join( | |
[f"-H '{k}: {v}'" for k, v in masked_headers.items()] | |
) | |
curl_command += ( | |
f"{formatted_headers} \\\n" if formatted_headers.strip() != "" else "" | |
) | |
curl_command += f"-d '{self._get_request_body(data)}'\n" | |
if additional_args.get("request_str", None) is not None: | |
# print the sagemaker / bedrock client request | |
curl_command = "\nRequest Sent from LiteLLM:\n" | |
curl_command += additional_args.get("request_str", None) | |
elif api_base == "": | |
curl_command = str(self.model_call_details) | |
return curl_command | |
def _get_masked_headers( | |
self, headers: dict, ignore_sensitive_headers: bool = False | |
) -> dict: | |
""" | |
Internal debugging helper function | |
Masks the headers of the request sent from LiteLLM | |
""" | |
return _get_masked_values( | |
headers, ignore_sensitive_values=ignore_sensitive_headers | |
) | |
def post_call( | |
self, original_response, input=None, api_key=None, additional_args={} | |
): | |
# Log the exact result from the LLM API, for streaming - log the type of response received | |
litellm.error_logs["POST_CALL"] = locals() | |
if isinstance(original_response, dict): | |
original_response = json.dumps(original_response) | |
try: | |
self.model_call_details["input"] = input | |
self.model_call_details["api_key"] = api_key | |
self.model_call_details["original_response"] = original_response | |
self.model_call_details["additional_args"] = additional_args | |
self.model_call_details["log_event_type"] = "post_api_call" | |
if json_logs: | |
verbose_logger.debug( | |
"RAW RESPONSE:\n{}\n\n".format( | |
self.model_call_details.get( | |
"original_response", self.model_call_details | |
) | |
), | |
) | |
else: | |
print_verbose( | |
"RAW RESPONSE:\n{}\n\n".format( | |
self.model_call_details.get( | |
"original_response", self.model_call_details | |
) | |
) | |
) | |
if self.logger_fn and callable(self.logger_fn): | |
try: | |
self.logger_fn( | |
self.model_call_details | |
) # Expectation: any logger function passed in by the user should accept a dict object | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( | |
str(e) | |
) | |
) | |
original_response = redact_message_input_output_from_logging( | |
model_call_details=( | |
self.model_call_details | |
if hasattr(self, "model_call_details") | |
else {} | |
), | |
result=original_response, | |
) | |
# Input Integration Logging -> If you want to log the fact that an attempt to call the model was made | |
callbacks = litellm.input_callback + (self.dynamic_input_callbacks or []) | |
for callback in callbacks: | |
try: | |
if callback == "sentry" and add_breadcrumb: | |
verbose_logger.debug("reaches sentry breadcrumbing") | |
try: | |
details_to_log = copy.deepcopy(self.model_call_details) | |
except Exception: | |
details_to_log = self.model_call_details | |
if litellm.turn_off_message_logging: | |
# make a copy of the _model_Call_details and log it | |
details_to_log.pop("messages", None) | |
details_to_log.pop("input", None) | |
details_to_log.pop("prompt", None) | |
add_breadcrumb( | |
category="litellm.llm_call", | |
message=f"Model Call Details post-call: {details_to_log}", | |
level="info", | |
) | |
elif isinstance(callback, CustomLogger): # custom logger class | |
callback.log_post_api_call( | |
kwargs=self.model_call_details, | |
response_obj=None, | |
start_time=self.start_time, | |
end_time=None, | |
) | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while post-call logging with integrations {}".format( | |
str(e) | |
) | |
) | |
verbose_logger.debug( | |
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" | |
) | |
if capture_exception: # log this error to sentry for debugging | |
capture_exception(e) | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while logging {}".format( | |
str(e) | |
) | |
) | |
def get_response_ms(self) -> float: | |
return ( | |
self.model_call_details.get("end_time", datetime.datetime.now()) | |
- self.model_call_details.get("start_time", datetime.datetime.now()) | |
).total_seconds() * 1000 | |
def _response_cost_calculator( | |
self, | |
result: Union[ | |
ModelResponse, | |
ModelResponseStream, | |
EmbeddingResponse, | |
ImageResponse, | |
TranscriptionResponse, | |
TextCompletionResponse, | |
HttpxBinaryResponseContent, | |
RerankResponse, | |
Batch, | |
FineTuningJob, | |
ResponsesAPIResponse, | |
ResponseCompletedEvent, | |
OpenAIFileObject, | |
LiteLLMRealtimeStreamLoggingObject, | |
OpenAIModerationResponse, | |
], | |
cache_hit: Optional[bool] = None, | |
litellm_model_name: Optional[str] = None, | |
router_model_id: Optional[str] = None, | |
) -> Optional[float]: | |
""" | |
Calculate response cost using result + logging object variables. | |
used for consistent cost calculation across response headers + logging integrations. | |
""" | |
## RESPONSE COST ## | |
custom_pricing = use_custom_pricing_for_model( | |
litellm_params=( | |
self.litellm_params if hasattr(self, "litellm_params") else None | |
) | |
) | |
prompt = "" # use for tts cost calc | |
_input = self.model_call_details.get("input", None) | |
if _input is not None and isinstance(_input, str): | |
prompt = _input | |
if cache_hit is None: | |
cache_hit = self.model_call_details.get("cache_hit", False) | |
try: | |
response_cost_calculator_kwargs = { | |
"response_object": result, | |
"model": litellm_model_name or self.model, | |
"cache_hit": cache_hit, | |
"custom_llm_provider": self.model_call_details.get( | |
"custom_llm_provider", None | |
), | |
"base_model": _get_base_model_from_metadata( | |
model_call_details=self.model_call_details | |
), | |
"call_type": self.call_type, | |
"optional_params": self.optional_params, | |
"custom_pricing": custom_pricing, | |
"prompt": prompt, | |
"standard_built_in_tools_params": self.standard_built_in_tools_params, | |
"router_model_id": router_model_id, | |
} | |
except Exception as e: # error creating kwargs for cost calculation | |
debug_info = StandardLoggingModelCostFailureDebugInformation( | |
error_str=str(e), | |
traceback_str=_get_traceback_str_for_error(str(e)), | |
) | |
verbose_logger.debug( | |
f"response_cost_failure_debug_information: {debug_info}" | |
) | |
self.model_call_details["response_cost_failure_debug_information"] = ( | |
debug_info | |
) | |
return None | |
try: | |
response_cost = litellm.response_cost_calculator( | |
**response_cost_calculator_kwargs | |
) | |
verbose_logger.debug(f"response_cost: {response_cost}") | |
return response_cost | |
except Exception as e: # error calculating cost | |
debug_info = StandardLoggingModelCostFailureDebugInformation( | |
error_str=str(e), | |
traceback_str=_get_traceback_str_for_error(str(e)), | |
model=response_cost_calculator_kwargs["model"], | |
cache_hit=response_cost_calculator_kwargs["cache_hit"], | |
custom_llm_provider=response_cost_calculator_kwargs[ | |
"custom_llm_provider" | |
], | |
base_model=response_cost_calculator_kwargs["base_model"], | |
call_type=response_cost_calculator_kwargs["call_type"], | |
custom_pricing=response_cost_calculator_kwargs["custom_pricing"], | |
) | |
verbose_logger.debug( | |
f"response_cost_failure_debug_information: {debug_info}" | |
) | |
self.model_call_details["response_cost_failure_debug_information"] = ( | |
debug_info | |
) | |
return None | |
async def _response_cost_calculator_async( | |
self, | |
result: Union[ | |
ModelResponse, | |
ModelResponseStream, | |
EmbeddingResponse, | |
ImageResponse, | |
TranscriptionResponse, | |
TextCompletionResponse, | |
HttpxBinaryResponseContent, | |
RerankResponse, | |
Batch, | |
FineTuningJob, | |
], | |
cache_hit: Optional[bool] = None, | |
) -> Optional[float]: | |
return self._response_cost_calculator(result=result, cache_hit=cache_hit) | |
def should_run_callback( | |
self, callback: litellm.CALLBACK_TYPES, litellm_params: dict, event_hook: str | |
) -> bool: | |
if litellm.global_disable_no_log_param: | |
return True | |
if litellm_params.get("no-log", False) is True: | |
# proxy cost tracking cal backs should run | |
if not ( | |
isinstance(callback, CustomLogger) | |
and "_PROXY_" in callback.__class__.__name__ | |
): | |
verbose_logger.debug( | |
f"no-log request, skipping logging for {event_hook} event" | |
) | |
return False | |
return True | |
def _update_completion_start_time(self, completion_start_time: datetime.datetime): | |
self.completion_start_time = completion_start_time | |
self.model_call_details["completion_start_time"] = self.completion_start_time | |
def _success_handler_helper_fn( | |
self, | |
result=None, | |
start_time=None, | |
end_time=None, | |
cache_hit=None, | |
standard_logging_object: Optional[StandardLoggingPayload] = None, | |
): | |
try: | |
if start_time is None: | |
start_time = self.start_time | |
if end_time is None: | |
end_time = datetime.datetime.now() | |
if self.completion_start_time is None: | |
self.completion_start_time = end_time | |
self.model_call_details["completion_start_time"] = ( | |
self.completion_start_time | |
) | |
self.model_call_details["log_event_type"] = "successful_api_call" | |
self.model_call_details["end_time"] = end_time | |
self.model_call_details["cache_hit"] = cache_hit | |
if self.call_type == CallTypes.anthropic_messages.value: | |
result = self._handle_anthropic_messages_response_logging(result=result) | |
## if model in model cost map - log the response cost | |
## else set cost to None | |
logging_result = result | |
if self.call_type == CallTypes.arealtime.value and isinstance(result, list): | |
combined_usage_object = RealtimeAPITokenUsageProcessor.collect_and_combine_usage_from_realtime_stream_results( | |
results=result | |
) | |
logging_result = ( | |
RealtimeAPITokenUsageProcessor.create_logging_realtime_object( | |
usage=combined_usage_object, | |
results=result, | |
) | |
) | |
# self.model_call_details[ | |
# "response_cost" | |
# ] = handle_realtime_stream_cost_calculation( | |
# results=result, | |
# combined_usage_object=combined_usage_object, | |
# custom_llm_provider=self.custom_llm_provider, | |
# litellm_model_name=self.model, | |
# ) | |
# self.model_call_details["combined_usage_object"] = combined_usage_object | |
if ( | |
standard_logging_object is None | |
and result is not None | |
and self.stream is not True | |
): | |
if ( | |
isinstance(logging_result, ModelResponse) | |
or isinstance(logging_result, ModelResponseStream) | |
or isinstance(logging_result, EmbeddingResponse) | |
or isinstance(logging_result, ImageResponse) | |
or isinstance(logging_result, TranscriptionResponse) | |
or isinstance(logging_result, TextCompletionResponse) | |
or isinstance(logging_result, HttpxBinaryResponseContent) # tts | |
or isinstance(logging_result, RerankResponse) | |
or isinstance(logging_result, FineTuningJob) | |
or isinstance(logging_result, LiteLLMBatch) | |
or isinstance(logging_result, ResponsesAPIResponse) | |
or isinstance(logging_result, OpenAIFileObject) | |
or isinstance(logging_result, LiteLLMRealtimeStreamLoggingObject) | |
or isinstance(logging_result, OpenAIModerationResponse) | |
): | |
## HIDDEN PARAMS ## | |
hidden_params = getattr(logging_result, "_hidden_params", {}) | |
if hidden_params: | |
# add to metadata for logging | |
if self.model_call_details.get("litellm_params") is not None: | |
self.model_call_details["litellm_params"].setdefault( | |
"metadata", {} | |
) | |
if ( | |
self.model_call_details["litellm_params"]["metadata"] | |
is None | |
): | |
self.model_call_details["litellm_params"][ | |
"metadata" | |
] = {} | |
self.model_call_details["litellm_params"]["metadata"][ # type: ignore | |
"hidden_params" | |
] = getattr( | |
logging_result, "_hidden_params", {} | |
) | |
## RESPONSE COST - Only calculate if not in hidden_params ## | |
if "response_cost" in hidden_params: | |
self.model_call_details["response_cost"] = hidden_params[ | |
"response_cost" | |
] | |
else: | |
self.model_call_details["response_cost"] = ( | |
self._response_cost_calculator(result=logging_result) | |
) | |
## STANDARDIZED LOGGING PAYLOAD | |
self.model_call_details["standard_logging_object"] = ( | |
get_standard_logging_object_payload( | |
kwargs=self.model_call_details, | |
init_response_obj=logging_result, | |
start_time=start_time, | |
end_time=end_time, | |
logging_obj=self, | |
status="success", | |
standard_built_in_tools_params=self.standard_built_in_tools_params, | |
) | |
) | |
elif isinstance(result, dict) or isinstance(result, list): | |
## STANDARDIZED LOGGING PAYLOAD | |
self.model_call_details["standard_logging_object"] = ( | |
get_standard_logging_object_payload( | |
kwargs=self.model_call_details, | |
init_response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
logging_obj=self, | |
status="success", | |
standard_built_in_tools_params=self.standard_built_in_tools_params, | |
) | |
) | |
elif standard_logging_object is not None: | |
self.model_call_details["standard_logging_object"] = ( | |
standard_logging_object | |
) | |
else: # streaming chunks + image gen. | |
self.model_call_details["response_cost"] = None | |
if ( | |
litellm.max_budget | |
and self.stream is False | |
and result is not None | |
and isinstance(result, dict) | |
and "content" in result | |
): | |
time_diff = (end_time - start_time).total_seconds() | |
float_diff = float(time_diff) | |
litellm._current_cost += litellm.completion_cost( | |
model=self.model, | |
prompt="", | |
completion=getattr(result, "content", ""), | |
total_time=float_diff, | |
standard_built_in_tools_params=self.standard_built_in_tools_params, | |
) | |
return start_time, end_time, result | |
except Exception as e: | |
raise Exception(f"[Non-Blocking] LiteLLM.Success_Call Error: {str(e)}") | |
def success_handler( # noqa: PLR0915 | |
self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs | |
): | |
verbose_logger.debug( | |
f"Logging Details LiteLLM-Success Call: Cache_hit={cache_hit}" | |
) | |
start_time, end_time, result = self._success_handler_helper_fn( | |
start_time=start_time, | |
end_time=end_time, | |
result=result, | |
cache_hit=cache_hit, | |
standard_logging_object=kwargs.get("standard_logging_object", None), | |
) | |
try: | |
## BUILD COMPLETE STREAMED RESPONSE | |
complete_streaming_response: Optional[ | |
Union[ModelResponse, TextCompletionResponse, ResponsesAPIResponse] | |
] = None | |
if "complete_streaming_response" in self.model_call_details: | |
return # break out of this. | |
complete_streaming_response = self._get_assembled_streaming_response( | |
result=result, | |
start_time=start_time, | |
end_time=end_time, | |
is_async=False, | |
streaming_chunks=self.sync_streaming_chunks, | |
) | |
if complete_streaming_response is not None: | |
verbose_logger.debug( | |
"Logging Details LiteLLM-Success Call streaming complete" | |
) | |
self.model_call_details["complete_streaming_response"] = ( | |
complete_streaming_response | |
) | |
self.model_call_details["response_cost"] = ( | |
self._response_cost_calculator(result=complete_streaming_response) | |
) | |
## STANDARDIZED LOGGING PAYLOAD | |
self.model_call_details["standard_logging_object"] = ( | |
get_standard_logging_object_payload( | |
kwargs=self.model_call_details, | |
init_response_obj=complete_streaming_response, | |
start_time=start_time, | |
end_time=end_time, | |
logging_obj=self, | |
status="success", | |
standard_built_in_tools_params=self.standard_built_in_tools_params, | |
) | |
) | |
callbacks = self.get_combined_callback_list( | |
dynamic_success_callbacks=self.dynamic_success_callbacks, | |
global_callbacks=litellm.success_callback, | |
) | |
## REDACT MESSAGES ## | |
result = redact_message_input_output_from_logging( | |
model_call_details=( | |
self.model_call_details | |
if hasattr(self, "model_call_details") | |
else {} | |
), | |
result=result, | |
) | |
## LOGGING HOOK ## | |
for callback in callbacks: | |
if isinstance(callback, CustomLogger): | |
self.model_call_details, result = callback.logging_hook( | |
kwargs=self.model_call_details, | |
result=result, | |
call_type=self.call_type, | |
) | |
for callback in callbacks: | |
try: | |
litellm_params = self.model_call_details.get("litellm_params", {}) | |
should_run = self.should_run_callback( | |
callback=callback, | |
litellm_params=litellm_params, | |
event_hook="success_handler", | |
) | |
if not should_run: | |
continue | |
if callback == "promptlayer" and promptLayerLogger is not None: | |
print_verbose("reaches promptlayer for logging!") | |
promptLayerLogger.log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
if callback == "supabase" and supabaseClient is not None: | |
print_verbose("reaches supabase for logging!") | |
kwargs = self.model_call_details | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
if self.stream: | |
if "complete_streaming_response" not in kwargs: | |
continue | |
else: | |
print_verbose("reaches supabase for streaming logging!") | |
result = kwargs["complete_streaming_response"] | |
model = kwargs["model"] | |
messages = kwargs["messages"] | |
optional_params = kwargs.get("optional_params", {}) | |
litellm_params = kwargs.get("litellm_params", {}) | |
supabaseClient.log_event( | |
model=model, | |
messages=messages, | |
end_user=optional_params.get("user", "default"), | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
litellm_call_id=litellm_params.get( | |
"litellm_call_id", str(uuid.uuid4()) | |
), | |
print_verbose=print_verbose, | |
) | |
if callback == "wandb" and weightsBiasesLogger is not None: | |
print_verbose("reaches wandb for logging!") | |
weightsBiasesLogger.log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
if callback == "logfire" and logfireLogger is not None: | |
verbose_logger.debug("reaches logfire for success logging!") | |
kwargs = {} | |
for k, v in self.model_call_details.items(): | |
if ( | |
k != "original_response" | |
): # copy.deepcopy raises errors as this could be a coroutine | |
kwargs[k] = v | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
if self.stream: | |
if "complete_streaming_response" not in kwargs: | |
continue | |
else: | |
print_verbose("reaches logfire for streaming logging!") | |
result = kwargs["complete_streaming_response"] | |
logfireLogger.log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
level=LogfireLevel.INFO.value, # type: ignore | |
) | |
if callback == "lunary" and lunaryLogger is not None: | |
print_verbose("reaches lunary for logging!") | |
model = self.model | |
kwargs = self.model_call_details | |
input = kwargs.get("messages", kwargs.get("input", None)) | |
type = ( | |
"embed" | |
if self.call_type == CallTypes.embedding.value | |
else "llm" | |
) | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
if self.stream: | |
if "complete_streaming_response" not in kwargs: | |
continue | |
else: | |
result = kwargs["complete_streaming_response"] | |
lunaryLogger.log_event( | |
type=type, | |
kwargs=kwargs, | |
event="end", | |
model=model, | |
input=input, | |
user_id=kwargs.get("user", None), | |
# user_props=self.model_call_details.get("user_props", None), | |
extra=kwargs.get("optional_params", {}), | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
run_id=self.litellm_call_id, | |
print_verbose=print_verbose, | |
) | |
if callback == "helicone" and heliconeLogger is not None: | |
print_verbose("reaches helicone for logging!") | |
model = self.model | |
messages = self.model_call_details["input"] | |
kwargs = self.model_call_details | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
if self.stream: | |
if "complete_streaming_response" not in kwargs: | |
continue | |
else: | |
print_verbose("reaches helicone for streaming logging!") | |
result = kwargs["complete_streaming_response"] | |
heliconeLogger.log_success( | |
model=model, | |
messages=messages, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
kwargs=kwargs, | |
) | |
if callback == "langfuse": | |
global langFuseLogger | |
print_verbose("reaches langfuse for success logging!") | |
kwargs = {} | |
for k, v in self.model_call_details.items(): | |
if ( | |
k != "original_response" | |
): # copy.deepcopy raises errors as this could be a coroutine | |
kwargs[k] = v | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
if self.stream: | |
verbose_logger.debug( | |
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" | |
) | |
if complete_streaming_response is None: | |
continue | |
else: | |
print_verbose("reaches langfuse for streaming logging!") | |
result = kwargs["complete_streaming_response"] | |
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( | |
globalLangfuseLogger=langFuseLogger, | |
standard_callback_dynamic_params=self.standard_callback_dynamic_params, | |
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, | |
) | |
if langfuse_logger_to_use is not None: | |
_response = langfuse_logger_to_use.log_event_on_langfuse( | |
kwargs=kwargs, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
user_id=kwargs.get("user", None), | |
) | |
if _response is not None and isinstance(_response, dict): | |
_trace_id = _response.get("trace_id", None) | |
if _trace_id is not None: | |
in_memory_trace_id_cache.set_cache( | |
litellm_call_id=self.litellm_call_id, | |
service_name="langfuse", | |
trace_id=_trace_id, | |
) | |
if callback == "generic": | |
global genericAPILogger | |
verbose_logger.debug("reaches langfuse for success logging!") | |
kwargs = {} | |
for k, v in self.model_call_details.items(): | |
if ( | |
k != "original_response" | |
): # copy.deepcopy raises errors as this could be a coroutine | |
kwargs[k] = v | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
if self.stream: | |
verbose_logger.debug( | |
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" | |
) | |
if complete_streaming_response is None: | |
continue | |
else: | |
print_verbose("reaches langfuse for streaming logging!") | |
result = kwargs["complete_streaming_response"] | |
if genericAPILogger is None: | |
genericAPILogger = GenericAPILogger() # type: ignore | |
genericAPILogger.log_event( | |
kwargs=kwargs, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
user_id=kwargs.get("user", None), | |
print_verbose=print_verbose, | |
) | |
if callback == "greenscale" and greenscaleLogger is not None: | |
kwargs = {} | |
for k, v in self.model_call_details.items(): | |
if ( | |
k != "original_response" | |
): # copy.deepcopy raises errors as this could be a coroutine | |
kwargs[k] = v | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
if self.stream: | |
verbose_logger.debug( | |
f"is complete_streaming_response in kwargs: {kwargs.get('complete_streaming_response', None)}" | |
) | |
if complete_streaming_response is None: | |
continue | |
else: | |
print_verbose( | |
"reaches greenscale for streaming logging!" | |
) | |
result = kwargs["complete_streaming_response"] | |
greenscaleLogger.log_event( | |
kwargs=kwargs, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
if callback == "athina" and athinaLogger is not None: | |
deep_copy = {} | |
for k, v in self.model_call_details.items(): | |
deep_copy[k] = v | |
athinaLogger.log_event( | |
kwargs=deep_copy, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
if callback == "traceloop": | |
deep_copy = {} | |
for k, v in self.model_call_details.items(): | |
if k != "original_response": | |
deep_copy[k] = v | |
traceloopLogger.log_event( | |
kwargs=deep_copy, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
user_id=kwargs.get("user", None), | |
print_verbose=print_verbose, | |
) | |
if callback == "s3": | |
global s3Logger | |
if s3Logger is None: | |
s3Logger = S3Logger() | |
if self.stream: | |
if "complete_streaming_response" in self.model_call_details: | |
print_verbose( | |
"S3Logger Logger: Got Stream Event - Completed Stream Response" | |
) | |
s3Logger.log_event( | |
kwargs=self.model_call_details, | |
response_obj=self.model_call_details[ | |
"complete_streaming_response" | |
], | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
else: | |
print_verbose( | |
"S3Logger Logger: Got Stream Event - No complete stream response as yet" | |
) | |
else: | |
s3Logger.log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
if ( | |
callback == "openmeter" | |
and self.model_call_details.get("litellm_params", {}).get( | |
"acompletion", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"aembedding", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"aimage_generation", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"atranscription", False | |
) | |
is not True | |
): | |
global openMeterLogger | |
if openMeterLogger is None: | |
print_verbose("Instantiates openmeter client") | |
openMeterLogger = OpenMeterLogger() | |
if self.stream and complete_streaming_response is None: | |
openMeterLogger.log_stream_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
else: | |
if self.stream and complete_streaming_response: | |
self.model_call_details["complete_response"] = ( | |
self.model_call_details.get( | |
"complete_streaming_response", {} | |
) | |
) | |
result = self.model_call_details["complete_response"] | |
openMeterLogger.log_success_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
if ( | |
isinstance(callback, CustomLogger) | |
and self.model_call_details.get("litellm_params", {}).get( | |
"acompletion", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"aembedding", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"aimage_generation", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"atranscription", False | |
) | |
is not True | |
and self.call_type | |
!= CallTypes.pass_through.value # pass-through endpoints call async_log_success_event | |
): # custom logger class | |
if self.stream and complete_streaming_response is None: | |
callback.log_stream_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
else: | |
if self.stream and complete_streaming_response: | |
self.model_call_details["complete_response"] = ( | |
self.model_call_details.get( | |
"complete_streaming_response", {} | |
) | |
) | |
result = self.model_call_details["complete_response"] | |
callback.log_success_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
if ( | |
callable(callback) is True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"acompletion", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"aembedding", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"aimage_generation", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"atranscription", False | |
) | |
is not True | |
and customLogger is not None | |
): # custom logger functions | |
print_verbose( | |
"success callbacks: Running Custom Callback Function - {}".format( | |
callback | |
) | |
) | |
customLogger.log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
callback_func=callback, | |
) | |
except Exception as e: | |
print_verbose( | |
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging with integrations {traceback.format_exc()}" | |
) | |
print_verbose( | |
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" | |
) | |
if capture_exception: # log this error to sentry for debugging | |
capture_exception(e) | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {}".format( | |
str(e) | |
), | |
) | |
async def async_success_handler( # noqa: PLR0915 | |
self, result=None, start_time=None, end_time=None, cache_hit=None, **kwargs | |
): | |
""" | |
Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions. | |
""" | |
print_verbose( | |
"Logging Details LiteLLM-Async Success Call, cache_hit={}".format(cache_hit) | |
) | |
## CALCULATE COST FOR BATCH JOBS | |
if self.call_type == CallTypes.aretrieve_batch.value and isinstance( | |
result, LiteLLMBatch | |
): | |
response_cost, batch_usage, batch_models = await _handle_completed_batch( | |
batch=result, custom_llm_provider=self.custom_llm_provider | |
) | |
result._hidden_params["response_cost"] = response_cost | |
result._hidden_params["batch_models"] = batch_models | |
result.usage = batch_usage | |
start_time, end_time, result = self._success_handler_helper_fn( | |
start_time=start_time, | |
end_time=end_time, | |
result=result, | |
cache_hit=cache_hit, | |
standard_logging_object=kwargs.get("standard_logging_object", None), | |
) | |
## BUILD COMPLETE STREAMED RESPONSE | |
if "async_complete_streaming_response" in self.model_call_details: | |
return # break out of this. | |
complete_streaming_response: Optional[ | |
Union[ModelResponse, TextCompletionResponse, ResponsesAPIResponse] | |
] = self._get_assembled_streaming_response( | |
result=result, | |
start_time=start_time, | |
end_time=end_time, | |
is_async=True, | |
streaming_chunks=self.streaming_chunks, | |
) | |
if complete_streaming_response is not None: | |
print_verbose("Async success callbacks: Got a complete streaming response") | |
self.model_call_details["async_complete_streaming_response"] = ( | |
complete_streaming_response | |
) | |
try: | |
if self.model_call_details.get("cache_hit", False) is True: | |
self.model_call_details["response_cost"] = 0.0 | |
else: | |
# check if base_model set on azure | |
_get_base_model_from_metadata( | |
model_call_details=self.model_call_details | |
) | |
# base_model defaults to None if not set on model_info | |
self.model_call_details["response_cost"] = ( | |
self._response_cost_calculator( | |
result=complete_streaming_response | |
) | |
) | |
verbose_logger.debug( | |
f"Model={self.model}; cost={self.model_call_details['response_cost']}" | |
) | |
except litellm.NotFoundError: | |
verbose_logger.warning( | |
f"Model={self.model} not found in completion cost map. Setting 'response_cost' to None" | |
) | |
self.model_call_details["response_cost"] = None | |
## STANDARDIZED LOGGING PAYLOAD | |
self.model_call_details["standard_logging_object"] = ( | |
get_standard_logging_object_payload( | |
kwargs=self.model_call_details, | |
init_response_obj=complete_streaming_response, | |
start_time=start_time, | |
end_time=end_time, | |
logging_obj=self, | |
status="success", | |
standard_built_in_tools_params=self.standard_built_in_tools_params, | |
) | |
) | |
callbacks = self.get_combined_callback_list( | |
dynamic_success_callbacks=self.dynamic_async_success_callbacks, | |
global_callbacks=litellm._async_success_callback, | |
) | |
result = redact_message_input_output_from_logging( | |
model_call_details=( | |
self.model_call_details if hasattr(self, "model_call_details") else {} | |
), | |
result=result, | |
) | |
## LOGGING HOOK ## | |
for callback in callbacks: | |
if isinstance(callback, CustomGuardrail): | |
from litellm.types.guardrails import GuardrailEventHooks | |
if ( | |
callback.should_run_guardrail( | |
data=self.model_call_details, | |
event_type=GuardrailEventHooks.logging_only, | |
) | |
is not True | |
): | |
continue | |
self.model_call_details, result = await callback.async_logging_hook( | |
kwargs=self.model_call_details, | |
result=result, | |
call_type=self.call_type, | |
) | |
elif isinstance(callback, CustomLogger): | |
result = redact_message_input_output_from_custom_logger( | |
result=result, litellm_logging_obj=self, custom_logger=callback | |
) | |
self.model_call_details, result = await callback.async_logging_hook( | |
kwargs=self.model_call_details, | |
result=result, | |
call_type=self.call_type, | |
) | |
for callback in callbacks: | |
# check if callback can run for this request | |
litellm_params = self.model_call_details.get("litellm_params", {}) | |
should_run = self.should_run_callback( | |
callback=callback, | |
litellm_params=litellm_params, | |
event_hook="async_success_handler", | |
) | |
if not should_run: | |
continue | |
try: | |
if callback == "openmeter" and openMeterLogger is not None: | |
if self.stream is True: | |
if ( | |
"async_complete_streaming_response" | |
in self.model_call_details | |
): | |
await openMeterLogger.async_log_success_event( | |
kwargs=self.model_call_details, | |
response_obj=self.model_call_details[ | |
"async_complete_streaming_response" | |
], | |
start_time=start_time, | |
end_time=end_time, | |
) | |
else: | |
await openMeterLogger.async_log_stream_event( # [TODO]: move this to being an async log stream event function | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
else: | |
await openMeterLogger.async_log_success_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
if isinstance(callback, CustomLogger): # custom logger class | |
if self.stream is True: | |
if ( | |
"async_complete_streaming_response" | |
in self.model_call_details | |
): | |
await callback.async_log_success_event( | |
kwargs=self.model_call_details, | |
response_obj=self.model_call_details[ | |
"async_complete_streaming_response" | |
], | |
start_time=start_time, | |
end_time=end_time, | |
) | |
else: | |
await callback.async_log_stream_event( # [TODO]: move this to being an async log stream event function | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
else: | |
await callback.async_log_success_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
if callable(callback): # custom logger functions | |
global customLogger | |
if customLogger is None: | |
customLogger = CustomLogger() | |
if self.stream: | |
if ( | |
"async_complete_streaming_response" | |
in self.model_call_details | |
): | |
await customLogger.async_log_event( | |
kwargs=self.model_call_details, | |
response_obj=self.model_call_details[ | |
"async_complete_streaming_response" | |
], | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
callback_func=callback, | |
) | |
else: | |
await customLogger.async_log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
callback_func=callback, | |
) | |
if callback == "dynamodb": | |
global dynamoLogger | |
if dynamoLogger is None: | |
dynamoLogger = DyanmoDBLogger() | |
if self.stream: | |
if ( | |
"async_complete_streaming_response" | |
in self.model_call_details | |
): | |
print_verbose( | |
"DynamoDB Logger: Got Stream Event - Completed Stream Response" | |
) | |
await dynamoLogger._async_log_event( | |
kwargs=self.model_call_details, | |
response_obj=self.model_call_details[ | |
"async_complete_streaming_response" | |
], | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
else: | |
print_verbose( | |
"DynamoDB Logger: Got Stream Event - No complete stream response as yet" | |
) | |
else: | |
await dynamoLogger._async_log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
except Exception: | |
verbose_logger.error( | |
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}" | |
) | |
pass | |
def _failure_handler_helper_fn( | |
self, exception, traceback_exception, start_time=None, end_time=None | |
): | |
if start_time is None: | |
start_time = self.start_time | |
if end_time is None: | |
end_time = datetime.datetime.now() | |
# on some exceptions, model_call_details is not always initialized, this ensures that we still log those exceptions | |
if not hasattr(self, "model_call_details"): | |
self.model_call_details = {} | |
self.model_call_details["log_event_type"] = "failed_api_call" | |
self.model_call_details["exception"] = exception | |
self.model_call_details["traceback_exception"] = traceback_exception | |
self.model_call_details["end_time"] = end_time | |
self.model_call_details.setdefault("original_response", None) | |
self.model_call_details["response_cost"] = 0 | |
if hasattr(exception, "headers") and isinstance(exception.headers, dict): | |
self.model_call_details.setdefault("litellm_params", {}) | |
metadata = ( | |
self.model_call_details["litellm_params"].get("metadata", {}) or {} | |
) | |
metadata.update(exception.headers) | |
## STANDARDIZED LOGGING PAYLOAD | |
self.model_call_details["standard_logging_object"] = ( | |
get_standard_logging_object_payload( | |
kwargs=self.model_call_details, | |
init_response_obj={}, | |
start_time=start_time, | |
end_time=end_time, | |
logging_obj=self, | |
status="failure", | |
error_str=str(exception), | |
original_exception=exception, | |
standard_built_in_tools_params=self.standard_built_in_tools_params, | |
) | |
) | |
return start_time, end_time | |
async def special_failure_handlers(self, exception: Exception): | |
""" | |
Custom events, emitted for specific failures. | |
Currently just for router model group rate limit error | |
""" | |
from litellm.types.router import RouterErrors | |
litellm_params: dict = self.model_call_details.get("litellm_params") or {} | |
metadata = litellm_params.get("metadata") or {} | |
## BASE CASE ## check if rate limit error for model group size 1 | |
is_base_case = False | |
if metadata.get("model_group_size") is not None: | |
model_group_size = metadata.get("model_group_size") | |
if isinstance(model_group_size, int) and model_group_size == 1: | |
is_base_case = True | |
## check if special error ## | |
if ( | |
RouterErrors.no_deployments_available.value not in str(exception) | |
and is_base_case is False | |
): | |
return | |
## get original model group ## | |
model_group = metadata.get("model_group") or None | |
for callback in litellm._async_failure_callback: | |
if isinstance(callback, CustomLogger): # custom logger class | |
await callback.log_model_group_rate_limit_error( | |
exception=exception, | |
original_model_group=model_group, | |
kwargs=self.model_call_details, | |
) # type: ignore | |
def failure_handler( # noqa: PLR0915 | |
self, exception, traceback_exception, start_time=None, end_time=None | |
): | |
verbose_logger.debug( | |
f"Logging Details LiteLLM-Failure Call: {litellm.failure_callback}" | |
) | |
try: | |
start_time, end_time = self._failure_handler_helper_fn( | |
exception=exception, | |
traceback_exception=traceback_exception, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
callbacks = self.get_combined_callback_list( | |
dynamic_success_callbacks=self.dynamic_failure_callbacks, | |
global_callbacks=litellm.failure_callback, | |
) | |
result = None # result sent to all loggers, init this to None incase it's not created | |
result = redact_message_input_output_from_logging( | |
model_call_details=( | |
self.model_call_details | |
if hasattr(self, "model_call_details") | |
else {} | |
), | |
result=result, | |
) | |
for callback in callbacks: | |
try: | |
if callback == "lunary" and lunaryLogger is not None: | |
print_verbose("reaches lunary for logging error!") | |
model = self.model | |
input = self.model_call_details["input"] | |
_type = ( | |
"embed" | |
if self.call_type == CallTypes.embedding.value | |
else "llm" | |
) | |
lunaryLogger.log_event( | |
kwargs=self.model_call_details, | |
type=_type, | |
event="error", | |
user_id=self.model_call_details.get("user", "default"), | |
model=model, | |
input=input, | |
error=traceback_exception, | |
run_id=self.litellm_call_id, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
) | |
if callback == "sentry": | |
print_verbose("sending exception to sentry") | |
if capture_exception: | |
capture_exception(exception) | |
else: | |
print_verbose( | |
f"capture exception not initialized: {capture_exception}" | |
) | |
elif callback == "supabase" and supabaseClient is not None: | |
print_verbose("reaches supabase for logging!") | |
print_verbose(f"supabaseClient: {supabaseClient}") | |
supabaseClient.log_event( | |
model=self.model if hasattr(self, "model") else "", | |
messages=self.messages, | |
end_user=self.model_call_details.get("user", "default"), | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
litellm_call_id=self.model_call_details["litellm_call_id"], | |
print_verbose=print_verbose, | |
) | |
if ( | |
callable(callback) and customLogger is not None | |
): # custom logger functions | |
customLogger.log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
callback_func=callback, | |
) | |
if ( | |
isinstance(callback, CustomLogger) | |
and self.model_call_details.get("litellm_params", {}).get( | |
"acompletion", False | |
) | |
is not True | |
and self.model_call_details.get("litellm_params", {}).get( | |
"aembedding", False | |
) | |
is not True | |
): # custom logger class | |
callback.log_failure_event( | |
start_time=start_time, | |
end_time=end_time, | |
response_obj=result, | |
kwargs=self.model_call_details, | |
) | |
if callback == "langfuse": | |
global langFuseLogger | |
verbose_logger.debug("reaches langfuse for logging failure") | |
kwargs = {} | |
for k, v in self.model_call_details.items(): | |
if ( | |
k != "original_response" | |
): # copy.deepcopy raises errors as this could be a coroutine | |
kwargs[k] = v | |
# this only logs streaming once, complete_streaming_response exists i.e when stream ends | |
langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( | |
globalLangfuseLogger=langFuseLogger, | |
standard_callback_dynamic_params=self.standard_callback_dynamic_params, | |
in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, | |
) | |
_response = langfuse_logger_to_use.log_event_on_langfuse( | |
start_time=start_time, | |
end_time=end_time, | |
response_obj=None, | |
user_id=kwargs.get("user", None), | |
status_message=str(exception), | |
level="ERROR", | |
kwargs=self.model_call_details, | |
) | |
if _response is not None and isinstance(_response, dict): | |
_trace_id = _response.get("trace_id", None) | |
if _trace_id is not None: | |
in_memory_trace_id_cache.set_cache( | |
litellm_call_id=self.litellm_call_id, | |
service_name="langfuse", | |
trace_id=_trace_id, | |
) | |
if callback == "traceloop": | |
traceloopLogger.log_event( | |
start_time=start_time, | |
end_time=end_time, | |
response_obj=None, | |
user_id=self.model_call_details.get("user", None), | |
print_verbose=print_verbose, | |
status_message=str(exception), | |
level="ERROR", | |
kwargs=self.model_call_details, | |
) | |
if callback == "logfire" and logfireLogger is not None: | |
verbose_logger.debug("reaches logfire for failure logging!") | |
kwargs = {} | |
for k, v in self.model_call_details.items(): | |
if ( | |
k != "original_response" | |
): # copy.deepcopy raises errors as this could be a coroutine | |
kwargs[k] = v | |
kwargs["exception"] = exception | |
logfireLogger.log_event( | |
kwargs=kwargs, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
level=LogfireLevel.ERROR.value, # type: ignore | |
print_verbose=print_verbose, | |
) | |
except Exception as e: | |
print_verbose( | |
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging with integrations {str(e)}" | |
) | |
print_verbose( | |
f"LiteLLM.Logging: is sentry capture exception initialized {capture_exception}" | |
) | |
if capture_exception: # log this error to sentry for debugging | |
capture_exception(e) | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure logging {}".format( | |
str(e) | |
) | |
) | |
async def async_failure_handler( | |
self, exception, traceback_exception, start_time=None, end_time=None | |
): | |
""" | |
Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions. | |
""" | |
await self.special_failure_handlers(exception=exception) | |
start_time, end_time = self._failure_handler_helper_fn( | |
exception=exception, | |
traceback_exception=traceback_exception, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
callbacks = self.get_combined_callback_list( | |
dynamic_success_callbacks=self.dynamic_async_failure_callbacks, | |
global_callbacks=litellm._async_failure_callback, | |
) | |
result = None # result sent to all loggers, init this to None incase it's not created | |
for callback in callbacks: | |
try: | |
if isinstance(callback, CustomLogger): # custom logger class | |
await callback.async_log_failure_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
) # type: ignore | |
if ( | |
callable(callback) and customLogger is not None | |
): # custom logger functions | |
await customLogger.async_log_event( | |
kwargs=self.model_call_details, | |
response_obj=result, | |
start_time=start_time, | |
end_time=end_time, | |
print_verbose=print_verbose, | |
callback_func=callback, | |
) | |
except Exception as e: | |
verbose_logger.exception( | |
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure \ | |
logging {}\nCallback={}".format( | |
str(e), callback | |
) | |
) | |
def _get_trace_id(self, service_name: Literal["langfuse"]) -> Optional[str]: | |
""" | |
For the given service (e.g. langfuse), return the trace_id actually logged. | |
Used for constructing the url in slack alerting. | |
Returns: | |
- str: The logged trace id | |
- None: If trace id not yet emitted. | |
""" | |
trace_id: Optional[str] = None | |
if service_name == "langfuse": | |
trace_id = in_memory_trace_id_cache.get_cache( | |
litellm_call_id=self.litellm_call_id, service_name=service_name | |
) | |
return trace_id | |
def _get_callback_object(self, service_name: Literal["langfuse"]) -> Optional[Any]: | |
""" | |
Return dynamic callback object. | |
Meant to solve issue when doing key-based/team-based logging | |
""" | |
global langFuseLogger | |
if service_name == "langfuse": | |
if langFuseLogger is None or ( | |
( | |
self.standard_callback_dynamic_params.get("langfuse_public_key") | |
is not None | |
and self.standard_callback_dynamic_params.get("langfuse_public_key") | |
!= langFuseLogger.public_key | |
) | |
or ( | |
self.standard_callback_dynamic_params.get("langfuse_public_key") | |
is not None | |
and self.standard_callback_dynamic_params.get("langfuse_public_key") | |
!= langFuseLogger.public_key | |
) | |
or ( | |
self.standard_callback_dynamic_params.get("langfuse_host") | |
is not None | |
and self.standard_callback_dynamic_params.get("langfuse_host") | |
!= langFuseLogger.langfuse_host | |
) | |
): | |
return LangFuseLogger( | |
langfuse_public_key=self.standard_callback_dynamic_params.get( | |
"langfuse_public_key" | |
), | |
langfuse_secret=self.standard_callback_dynamic_params.get( | |
"langfuse_secret" | |
), | |
langfuse_host=self.standard_callback_dynamic_params.get( | |
"langfuse_host" | |
), | |
) | |
return langFuseLogger | |
return None | |
def handle_sync_success_callbacks_for_async_calls( | |
self, | |
result: Any, | |
start_time: datetime.datetime, | |
end_time: datetime.datetime, | |
) -> None: | |
""" | |
Handles calling success callbacks for Async calls. | |
Why: Some callbacks - `langfuse`, `s3` are sync callbacks. We need to call them in the executor. | |
""" | |
if self._should_run_sync_callbacks_for_async_calls() is False: | |
return | |
executor.submit( | |
self.success_handler, | |
result, | |
start_time, | |
end_time, | |
) | |
def _should_run_sync_callbacks_for_async_calls(self) -> bool: | |
""" | |
Returns: | |
- bool: True if sync callbacks should be run for async calls. eg. `langfuse`, `s3` | |
""" | |
_combined_sync_callbacks = self.get_combined_callback_list( | |
dynamic_success_callbacks=self.dynamic_success_callbacks, | |
global_callbacks=litellm.success_callback, | |
) | |
_filtered_success_callbacks = self._remove_internal_custom_logger_callbacks( | |
_combined_sync_callbacks | |
) | |
_filtered_success_callbacks = self._remove_internal_litellm_callbacks( | |
_filtered_success_callbacks | |
) | |
return len(_filtered_success_callbacks) > 0 | |
def get_combined_callback_list( | |
self, dynamic_success_callbacks: Optional[List], global_callbacks: List | |
) -> List: | |
if dynamic_success_callbacks is None: | |
return global_callbacks | |
return list(set(dynamic_success_callbacks + global_callbacks)) | |
def _remove_internal_litellm_callbacks(self, callbacks: List) -> List: | |
""" | |
Creates a filtered list of callbacks, excluding internal LiteLLM callbacks. | |
Args: | |
callbacks: List of callback functions/strings to filter | |
Returns: | |
List of filtered callbacks with internal ones removed | |
""" | |
filtered = [ | |
cb for cb in callbacks if not self._is_internal_litellm_proxy_callback(cb) | |
] | |
verbose_logger.debug(f"Filtered callbacks: {filtered}") | |
return filtered | |
def _get_callback_name(self, cb) -> str: | |
""" | |
Helper to get the name of a callback function | |
Args: | |
cb: The callback function/string to get the name of | |
Returns: | |
The name of the callback | |
""" | |
if hasattr(cb, "__name__"): | |
return cb.__name__ | |
if hasattr(cb, "__func__"): | |
return cb.__func__.__name__ | |
return str(cb) | |
def _is_internal_litellm_proxy_callback(self, cb) -> bool: | |
"""Helper to check if a callback is internal""" | |
INTERNAL_PREFIXES = [ | |
"_PROXY", | |
"_service_logger.ServiceLogging", | |
"sync_deployment_callback_on_success", | |
] | |
if isinstance(cb, str): | |
return False | |
if not callable(cb): | |
return True | |
cb_name = self._get_callback_name(cb) | |
return any(prefix in cb_name for prefix in INTERNAL_PREFIXES) | |
def _remove_internal_custom_logger_callbacks(self, callbacks: List) -> List: | |
""" | |
Removes internal custom logger callbacks from the list. | |
""" | |
_new_callbacks = [] | |
for _c in callbacks: | |
if isinstance(_c, CustomLogger): | |
continue | |
elif ( | |
isinstance(_c, str) | |
and _c in litellm._known_custom_logger_compatible_callbacks | |
): | |
continue | |
_new_callbacks.append(_c) | |
return _new_callbacks | |
def _get_assembled_streaming_response( | |
self, | |
result: Union[ | |
ModelResponse, | |
TextCompletionResponse, | |
ModelResponseStream, | |
ResponseCompletedEvent, | |
Any, | |
], | |
start_time: datetime.datetime, | |
end_time: datetime.datetime, | |
is_async: bool, | |
streaming_chunks: List[Any], | |
) -> Optional[Union[ModelResponse, TextCompletionResponse, ResponsesAPIResponse]]: | |
if isinstance(result, ModelResponse): | |
return result | |
elif isinstance(result, TextCompletionResponse): | |
return result | |
elif isinstance(result, ResponseCompletedEvent): | |
return result.response | |
return None | |
def _handle_anthropic_messages_response_logging(self, result: Any) -> ModelResponse: | |
""" | |
Handles logging for Anthropic messages responses. | |
Args: | |
result: The response object from the model call | |
Returns: | |
The the response object from the model call | |
- For Non-streaming responses, we need to transform the response to a ModelResponse object. | |
- For streaming responses, anthropic_messages handler calls success_handler with a assembled ModelResponse. | |
""" | |
if self.stream and isinstance(result, ModelResponse): | |
return result | |
result = litellm.AnthropicConfig().transform_response( | |
raw_response=self.model_call_details["httpx_response"], | |
model_response=litellm.ModelResponse(), | |
model=self.model, | |
messages=[], | |
logging_obj=self, | |
optional_params={}, | |
api_key="", | |
request_data={}, | |
encoding=litellm.encoding, | |
json_mode=False, | |
litellm_params={}, | |
) | |
return result | |
def _get_masked_values( | |
sensitive_object: dict, | |
ignore_sensitive_values: bool = False, | |
mask_all_values: bool = False, | |
unmasked_length: int = 4, | |
number_of_asterisks: Optional[int] = 4, | |
) -> dict: | |
""" | |
Internal debugging helper function | |
Masks the headers of the request sent from LiteLLM | |
Args: | |
masked_length: Optional length for the masked portion (number of *). If set, will use exactly this many * | |
regardless of original string length. The total length will be unmasked_length + masked_length. | |
""" | |
sensitive_keywords = [ | |
"authorization", | |
"token", | |
"key", | |
"secret", | |
] | |
return { | |
k: ( | |
( | |
v[: unmasked_length // 2] | |
+ "*" * number_of_asterisks | |
+ v[-unmasked_length // 2 :] | |
) | |
if ( | |
isinstance(v, str) | |
and len(v) > unmasked_length | |
and number_of_asterisks is not None | |
) | |
else ( | |
( | |
v[: unmasked_length // 2] | |
+ "*" * (len(v) - unmasked_length) | |
+ v[-unmasked_length // 2 :] | |
) | |
if (isinstance(v, str) and len(v) > unmasked_length) | |
else "*****" | |
) | |
) | |
for k, v in sensitive_object.items() | |
if not ignore_sensitive_values | |
or not any( | |
sensitive_keyword in k.lower() for sensitive_keyword in sensitive_keywords | |
) | |
} | |
def set_callbacks(callback_list, function_id=None): # noqa: PLR0915 | |
""" | |
Globally sets the callback client | |
""" | |
global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, supabaseClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger | |
try: | |
for callback in callback_list: | |
if callback == "sentry": | |
try: | |
import sentry_sdk | |
except ImportError: | |
print_verbose("Package 'sentry_sdk' is missing. Installing it...") | |
subprocess.check_call( | |
[sys.executable, "-m", "pip", "install", "sentry_sdk"] | |
) | |
import sentry_sdk | |
sentry_sdk_instance = sentry_sdk | |
sentry_trace_rate = ( | |
os.environ.get("SENTRY_API_TRACE_RATE") | |
if "SENTRY_API_TRACE_RATE" in os.environ | |
else "1.0" | |
) | |
sentry_sdk_instance.init( | |
dsn=os.environ.get("SENTRY_DSN"), | |
traces_sample_rate=float(sentry_trace_rate), # type: ignore | |
) | |
capture_exception = sentry_sdk_instance.capture_exception | |
add_breadcrumb = sentry_sdk_instance.add_breadcrumb | |
elif callback == "posthog": | |
try: | |
from posthog import Posthog | |
except ImportError: | |
print_verbose("Package 'posthog' is missing. Installing it...") | |
subprocess.check_call( | |
[sys.executable, "-m", "pip", "install", "posthog"] | |
) | |
from posthog import Posthog | |
posthog = Posthog( | |
project_api_key=os.environ.get("POSTHOG_API_KEY"), | |
host=os.environ.get("POSTHOG_API_URL"), | |
) | |
elif callback == "slack": | |
try: | |
from slack_bolt import App | |
except ImportError: | |
print_verbose("Package 'slack_bolt' is missing. Installing it...") | |
subprocess.check_call( | |
[sys.executable, "-m", "pip", "install", "slack_bolt"] | |
) | |
from slack_bolt import App | |
slack_app = App( | |
token=os.environ.get("SLACK_API_TOKEN"), | |
signing_secret=os.environ.get("SLACK_API_SECRET"), | |
) | |
alerts_channel = os.environ["SLACK_API_CHANNEL"] | |
print_verbose(f"Initialized Slack App: {slack_app}") | |
elif callback == "traceloop": | |
traceloopLogger = TraceloopLogger() | |
elif callback == "athina": | |
athinaLogger = AthinaLogger() | |
print_verbose("Initialized Athina Logger") | |
elif callback == "helicone": | |
heliconeLogger = HeliconeLogger() | |
elif callback == "lunary": | |
lunaryLogger = LunaryLogger() | |
elif callback == "promptlayer": | |
promptLayerLogger = PromptLayerLogger() | |
elif callback == "langfuse": | |
langFuseLogger = LangFuseLogger( | |
langfuse_public_key=None, langfuse_secret=None, langfuse_host=None | |
) | |
elif callback == "openmeter": | |
openMeterLogger = OpenMeterLogger() | |
elif callback == "datadog": | |
dataDogLogger = DataDogLogger() | |
elif callback == "dynamodb": | |
dynamoLogger = DyanmoDBLogger() | |
elif callback == "s3": | |
s3Logger = S3Logger() | |
elif callback == "wandb": | |
weightsBiasesLogger = WeightsBiasesLogger() | |
elif callback == "logfire": | |
logfireLogger = LogfireLogger() | |
elif callback == "supabase": | |
print_verbose("instantiating supabase") | |
supabaseClient = Supabase() | |
elif callback == "greenscale": | |
greenscaleLogger = GreenscaleLogger() | |
print_verbose("Initialized Greenscale Logger") | |
elif callable(callback): | |
customLogger = CustomLogger() | |
except Exception as e: | |
raise e | |
def _init_custom_logger_compatible_class( # noqa: PLR0915 | |
logging_integration: _custom_logger_compatible_callbacks_literal, | |
internal_usage_cache: Optional[DualCache], | |
llm_router: Optional[ | |
Any | |
], # expect litellm.Router, but typing errors due to circular import | |
custom_logger_init_args: Optional[dict] = {}, | |
) -> Optional[CustomLogger]: | |
""" | |
Initialize a custom logger compatible class | |
""" | |
try: | |
custom_logger_init_args = custom_logger_init_args or {} | |
if logging_integration == "agentops": # Add AgentOps initialization | |
for callback in _in_memory_loggers: | |
if isinstance(callback, AgentOps): | |
return callback # type: ignore | |
agentops_logger = AgentOps() | |
_in_memory_loggers.append(agentops_logger) | |
return agentops_logger # type: ignore | |
elif logging_integration == "lago": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LagoLogger): | |
return callback # type: ignore | |
lago_logger = LagoLogger() | |
_in_memory_loggers.append(lago_logger) | |
return lago_logger # type: ignore | |
elif logging_integration == "openmeter": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpenMeterLogger): | |
return callback # type: ignore | |
_openmeter_logger = OpenMeterLogger() | |
_in_memory_loggers.append(_openmeter_logger) | |
return _openmeter_logger # type: ignore | |
elif logging_integration == "braintrust": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, BraintrustLogger): | |
return callback # type: ignore | |
braintrust_logger = BraintrustLogger() | |
_in_memory_loggers.append(braintrust_logger) | |
return braintrust_logger # type: ignore | |
elif logging_integration == "langsmith": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LangsmithLogger): | |
return callback # type: ignore | |
_langsmith_logger = LangsmithLogger() | |
_in_memory_loggers.append(_langsmith_logger) | |
return _langsmith_logger # type: ignore | |
elif logging_integration == "argilla": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, ArgillaLogger): | |
return callback # type: ignore | |
_argilla_logger = ArgillaLogger() | |
_in_memory_loggers.append(_argilla_logger) | |
return _argilla_logger # type: ignore | |
elif logging_integration == "literalai": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LiteralAILogger): | |
return callback # type: ignore | |
_literalai_logger = LiteralAILogger() | |
_in_memory_loggers.append(_literalai_logger) | |
return _literalai_logger # type: ignore | |
elif logging_integration == "prometheus": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, PrometheusLogger): | |
return callback # type: ignore | |
_prometheus_logger = PrometheusLogger() | |
_in_memory_loggers.append(_prometheus_logger) | |
return _prometheus_logger # type: ignore | |
elif logging_integration == "datadog": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, DataDogLogger): | |
return callback # type: ignore | |
_datadog_logger = DataDogLogger() | |
_in_memory_loggers.append(_datadog_logger) | |
return _datadog_logger # type: ignore | |
elif logging_integration == "datadog_llm_observability": | |
_datadog_llm_obs_logger = DataDogLLMObsLogger() | |
_in_memory_loggers.append(_datadog_llm_obs_logger) | |
return _datadog_llm_obs_logger # type: ignore | |
elif logging_integration == "gcs_bucket": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, GCSBucketLogger): | |
return callback # type: ignore | |
_gcs_bucket_logger = GCSBucketLogger() | |
_in_memory_loggers.append(_gcs_bucket_logger) | |
return _gcs_bucket_logger # type: ignore | |
elif logging_integration == "azure_storage": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, AzureBlobStorageLogger): | |
return callback # type: ignore | |
_azure_storage_logger = AzureBlobStorageLogger() | |
_in_memory_loggers.append(_azure_storage_logger) | |
return _azure_storage_logger # type: ignore | |
elif logging_integration == "opik": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpikLogger): | |
return callback # type: ignore | |
_opik_logger = OpikLogger() | |
_in_memory_loggers.append(_opik_logger) | |
return _opik_logger # type: ignore | |
elif logging_integration == "arize": | |
from litellm.integrations.opentelemetry import ( | |
OpenTelemetry, | |
OpenTelemetryConfig, | |
) | |
arize_config = ArizeLogger.get_arize_config() | |
if arize_config.endpoint is None: | |
raise ValueError( | |
"No valid endpoint found for Arize, please set 'ARIZE_ENDPOINT' to your GRPC endpoint or 'ARIZE_HTTP_ENDPOINT' to your HTTP endpoint" | |
) | |
otel_config = OpenTelemetryConfig( | |
exporter=arize_config.protocol, | |
endpoint=arize_config.endpoint, | |
) | |
os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ( | |
f"space_key={arize_config.space_key},api_key={arize_config.api_key}" | |
) | |
for callback in _in_memory_loggers: | |
if ( | |
isinstance(callback, ArizeLogger) | |
and callback.callback_name == "arize" | |
): | |
return callback # type: ignore | |
_arize_otel_logger = ArizeLogger(config=otel_config, callback_name="arize") | |
_in_memory_loggers.append(_arize_otel_logger) | |
return _arize_otel_logger # type: ignore | |
elif logging_integration == "arize_phoenix": | |
from litellm.integrations.opentelemetry import ( | |
OpenTelemetry, | |
OpenTelemetryConfig, | |
) | |
arize_phoenix_config = ArizePhoenixLogger.get_arize_phoenix_config() | |
otel_config = OpenTelemetryConfig( | |
exporter=arize_phoenix_config.protocol, | |
endpoint=arize_phoenix_config.endpoint, | |
) | |
# auth can be disabled on local deployments of arize phoenix | |
if arize_phoenix_config.otlp_auth_headers is not None: | |
os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ( | |
arize_phoenix_config.otlp_auth_headers | |
) | |
for callback in _in_memory_loggers: | |
if ( | |
isinstance(callback, OpenTelemetry) | |
and callback.callback_name == "arize_phoenix" | |
): | |
return callback # type: ignore | |
_otel_logger = OpenTelemetry( | |
config=otel_config, callback_name="arize_phoenix" | |
) | |
_in_memory_loggers.append(_otel_logger) | |
return _otel_logger # type: ignore | |
elif logging_integration == "otel": | |
from litellm.integrations.opentelemetry import OpenTelemetry | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpenTelemetry): | |
return callback # type: ignore | |
otel_logger = OpenTelemetry( | |
**_get_custom_logger_settings_from_proxy_server( | |
callback_name=logging_integration | |
) | |
) | |
_in_memory_loggers.append(otel_logger) | |
return otel_logger # type: ignore | |
elif logging_integration == "galileo": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, GalileoObserve): | |
return callback # type: ignore | |
galileo_logger = GalileoObserve() | |
_in_memory_loggers.append(galileo_logger) | |
return galileo_logger # type: ignore | |
elif logging_integration == "logfire": | |
if "LOGFIRE_TOKEN" not in os.environ: | |
raise ValueError("LOGFIRE_TOKEN not found in environment variables") | |
from litellm.integrations.opentelemetry import ( | |
OpenTelemetry, | |
OpenTelemetryConfig, | |
) | |
otel_config = OpenTelemetryConfig( | |
exporter="otlp_http", | |
endpoint="https://logfire-api.pydantic.dev/v1/traces", | |
headers=f"Authorization={os.getenv('LOGFIRE_TOKEN')}", | |
) | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpenTelemetry): | |
return callback # type: ignore | |
_otel_logger = OpenTelemetry(config=otel_config) | |
_in_memory_loggers.append(_otel_logger) | |
return _otel_logger # type: ignore | |
elif logging_integration == "dynamic_rate_limiter": | |
from litellm.proxy.hooks.dynamic_rate_limiter import ( | |
_PROXY_DynamicRateLimitHandler, | |
) | |
for callback in _in_memory_loggers: | |
if isinstance(callback, _PROXY_DynamicRateLimitHandler): | |
return callback # type: ignore | |
if internal_usage_cache is None: | |
raise Exception( | |
"Internal Error: Cache cannot be empty - internal_usage_cache={}".format( | |
internal_usage_cache | |
) | |
) | |
dynamic_rate_limiter_obj = _PROXY_DynamicRateLimitHandler( | |
internal_usage_cache=internal_usage_cache | |
) | |
if llm_router is not None and isinstance(llm_router, litellm.Router): | |
dynamic_rate_limiter_obj.update_variables(llm_router=llm_router) | |
_in_memory_loggers.append(dynamic_rate_limiter_obj) | |
return dynamic_rate_limiter_obj # type: ignore | |
elif logging_integration == "langtrace": | |
if "LANGTRACE_API_KEY" not in os.environ: | |
raise ValueError("LANGTRACE_API_KEY not found in environment variables") | |
from litellm.integrations.opentelemetry import ( | |
OpenTelemetry, | |
OpenTelemetryConfig, | |
) | |
otel_config = OpenTelemetryConfig( | |
exporter="otlp_http", | |
endpoint="https://langtrace.ai/api/trace", | |
) | |
os.environ["OTEL_EXPORTER_OTLP_TRACES_HEADERS"] = ( | |
f"api_key={os.getenv('LANGTRACE_API_KEY')}" | |
) | |
for callback in _in_memory_loggers: | |
if ( | |
isinstance(callback, OpenTelemetry) | |
and callback.callback_name == "langtrace" | |
): | |
return callback # type: ignore | |
_otel_logger = OpenTelemetry(config=otel_config, callback_name="langtrace") | |
_in_memory_loggers.append(_otel_logger) | |
return _otel_logger # type: ignore | |
elif logging_integration == "mlflow": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, MlflowLogger): | |
return callback # type: ignore | |
_mlflow_logger = MlflowLogger() | |
_in_memory_loggers.append(_mlflow_logger) | |
return _mlflow_logger # type: ignore | |
elif logging_integration == "langfuse": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LangfusePromptManagement): | |
return callback | |
langfuse_logger = LangfusePromptManagement() | |
_in_memory_loggers.append(langfuse_logger) | |
return langfuse_logger # type: ignore | |
elif logging_integration == "pagerduty": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, PagerDutyAlerting): | |
return callback | |
pagerduty_logger = PagerDutyAlerting(**custom_logger_init_args) | |
_in_memory_loggers.append(pagerduty_logger) | |
return pagerduty_logger # type: ignore | |
elif logging_integration == "anthropic_cache_control_hook": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, AnthropicCacheControlHook): | |
return callback | |
anthropic_cache_control_hook = AnthropicCacheControlHook() | |
_in_memory_loggers.append(anthropic_cache_control_hook) | |
return anthropic_cache_control_hook # type: ignore | |
elif logging_integration == "bedrock_knowledgebase_hook": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, BedrockKnowledgeBaseHook): | |
return callback | |
bedrock_knowledgebase_hook = BedrockKnowledgeBaseHook() | |
_in_memory_loggers.append(bedrock_knowledgebase_hook) | |
return bedrock_knowledgebase_hook # type: ignore | |
elif logging_integration == "gcs_pubsub": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, GcsPubSubLogger): | |
return callback | |
_gcs_pubsub_logger = GcsPubSubLogger() | |
_in_memory_loggers.append(_gcs_pubsub_logger) | |
return _gcs_pubsub_logger # type: ignore | |
elif logging_integration == "humanloop": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, HumanloopLogger): | |
return callback | |
humanloop_logger = HumanloopLogger() | |
_in_memory_loggers.append(humanloop_logger) | |
return humanloop_logger # type: ignore | |
except Exception as e: | |
verbose_logger.exception( | |
f"[Non-Blocking Error] Error initializing custom logger: {e}" | |
) | |
return None | |
def get_custom_logger_compatible_class( # noqa: PLR0915 | |
logging_integration: _custom_logger_compatible_callbacks_literal, | |
) -> Optional[CustomLogger]: | |
try: | |
if logging_integration == "lago": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LagoLogger): | |
return callback | |
elif logging_integration == "openmeter": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpenMeterLogger): | |
return callback | |
elif logging_integration == "braintrust": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, BraintrustLogger): | |
return callback | |
elif logging_integration == "galileo": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, GalileoObserve): | |
return callback | |
elif logging_integration == "langsmith": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LangsmithLogger): | |
return callback | |
elif logging_integration == "argilla": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, ArgillaLogger): | |
return callback | |
elif logging_integration == "literalai": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LiteralAILogger): | |
return callback | |
elif logging_integration == "prometheus": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, PrometheusLogger): | |
return callback | |
elif logging_integration == "datadog": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, DataDogLogger): | |
return callback | |
elif logging_integration == "datadog_llm_observability": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, DataDogLLMObsLogger): | |
return callback | |
elif logging_integration == "gcs_bucket": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, GCSBucketLogger): | |
return callback | |
elif logging_integration == "azure_storage": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, AzureBlobStorageLogger): | |
return callback | |
elif logging_integration == "opik": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpikLogger): | |
return callback | |
elif logging_integration == "langfuse": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, LangfusePromptManagement): | |
return callback | |
elif logging_integration == "otel": | |
from litellm.integrations.opentelemetry import OpenTelemetry | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpenTelemetry): | |
return callback | |
elif logging_integration == "arize": | |
if "ARIZE_SPACE_KEY" not in os.environ: | |
raise ValueError("ARIZE_SPACE_KEY not found in environment variables") | |
if "ARIZE_API_KEY" not in os.environ: | |
raise ValueError("ARIZE_API_KEY not found in environment variables") | |
for callback in _in_memory_loggers: | |
if ( | |
isinstance(callback, ArizeLogger) | |
and callback.callback_name == "arize" | |
): | |
return callback | |
elif logging_integration == "logfire": | |
if "LOGFIRE_TOKEN" not in os.environ: | |
raise ValueError("LOGFIRE_TOKEN not found in environment variables") | |
from litellm.integrations.opentelemetry import OpenTelemetry | |
for callback in _in_memory_loggers: | |
if isinstance(callback, OpenTelemetry): | |
return callback # type: ignore | |
elif logging_integration == "dynamic_rate_limiter": | |
from litellm.proxy.hooks.dynamic_rate_limiter import ( | |
_PROXY_DynamicRateLimitHandler, | |
) | |
for callback in _in_memory_loggers: | |
if isinstance(callback, _PROXY_DynamicRateLimitHandler): | |
return callback # type: ignore | |
elif logging_integration == "langtrace": | |
from litellm.integrations.opentelemetry import OpenTelemetry | |
if "LANGTRACE_API_KEY" not in os.environ: | |
raise ValueError("LANGTRACE_API_KEY not found in environment variables") | |
for callback in _in_memory_loggers: | |
if ( | |
isinstance(callback, OpenTelemetry) | |
and callback.callback_name == "langtrace" | |
): | |
return callback | |
elif logging_integration == "mlflow": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, MlflowLogger): | |
return callback | |
elif logging_integration == "pagerduty": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, PagerDutyAlerting): | |
return callback | |
elif logging_integration == "anthropic_cache_control_hook": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, AnthropicCacheControlHook): | |
return callback | |
elif logging_integration == "bedrock_knowledgebase_hook": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, BedrockKnowledgeBaseHook): | |
return callback | |
elif logging_integration == "gcs_pubsub": | |
for callback in _in_memory_loggers: | |
if isinstance(callback, GcsPubSubLogger): | |
return callback | |
return None | |
except Exception as e: | |
verbose_logger.exception( | |
f"[Non-Blocking Error] Error getting custom logger: {e}" | |
) | |
return None | |
def _get_custom_logger_settings_from_proxy_server(callback_name: str) -> Dict: | |
""" | |
Get the settings for a custom logger from the proxy server config.yaml | |
Proxy server config.yaml defines callback_settings as: | |
callback_settings: | |
otel: | |
message_logging: False | |
""" | |
from litellm.proxy.proxy_server import callback_settings | |
if callback_settings: | |
return dict(callback_settings.get(callback_name, {})) | |
return {} | |
def use_custom_pricing_for_model(litellm_params: Optional[dict]) -> bool: | |
""" | |
Check if the model uses custom pricing | |
Returns True if any of `SPECIAL_MODEL_INFO_PARAMS` are present in `litellm_params` or `model_info` | |
""" | |
if litellm_params is None: | |
return False | |
metadata: dict = litellm_params.get("metadata", {}) or {} | |
model_info: dict = metadata.get("model_info", {}) or {} | |
custom_pricing_keys = CustomPricingLiteLLMParams.model_fields.keys() | |
for key in custom_pricing_keys: | |
if litellm_params.get(key, None) is not None: | |
return True | |
elif model_info.get(key, None) is not None: | |
return True | |
return False | |
def is_valid_sha256_hash(value: str) -> bool: | |
# Check if the value is a valid SHA-256 hash (64 hexadecimal characters) | |
return bool(re.fullmatch(r"[a-fA-F0-9]{64}", value)) | |
class StandardLoggingPayloadSetup: | |
def cleanup_timestamps( | |
start_time: Union[dt_object, float], | |
end_time: Union[dt_object, float], | |
completion_start_time: Union[dt_object, float], | |
) -> Tuple[float, float, float]: | |
""" | |
Convert datetime objects to floats | |
Args: | |
start_time: Union[dt_object, float] | |
end_time: Union[dt_object, float] | |
completion_start_time: Union[dt_object, float] | |
Returns: | |
Tuple[float, float, float]: A tuple containing the start time, end time, and completion start time as floats. | |
""" | |
if isinstance(start_time, datetime.datetime): | |
start_time_float = start_time.timestamp() | |
elif isinstance(start_time, float): | |
start_time_float = start_time | |
else: | |
raise ValueError( | |
f"start_time is required, got={start_time} of type {type(start_time)}" | |
) | |
if isinstance(end_time, datetime.datetime): | |
end_time_float = end_time.timestamp() | |
elif isinstance(end_time, float): | |
end_time_float = end_time | |
else: | |
raise ValueError( | |
f"end_time is required, got={end_time} of type {type(end_time)}" | |
) | |
if isinstance(completion_start_time, datetime.datetime): | |
completion_start_time_float = completion_start_time.timestamp() | |
elif isinstance(completion_start_time, float): | |
completion_start_time_float = completion_start_time | |
else: | |
completion_start_time_float = end_time_float | |
return start_time_float, end_time_float, completion_start_time_float | |
def get_standard_logging_metadata( | |
metadata: Optional[Dict[str, Any]], | |
litellm_params: Optional[dict] = None, | |
prompt_integration: Optional[str] = None, | |
applied_guardrails: Optional[List[str]] = None, | |
mcp_tool_call_metadata: Optional[StandardLoggingMCPToolCall] = None, | |
usage_object: Optional[dict] = None, | |
) -> StandardLoggingMetadata: | |
""" | |
Clean and filter the metadata dictionary to include only the specified keys in StandardLoggingMetadata. | |
Args: | |
metadata (Optional[Dict[str, Any]]): The original metadata dictionary. | |
Returns: | |
StandardLoggingMetadata: A StandardLoggingMetadata object containing the cleaned metadata. | |
Note: | |
- If the input metadata is None or not a dictionary, an empty StandardLoggingMetadata object is returned. | |
- If 'user_api_key' is present in metadata and is a valid SHA256 hash, it's stored as 'user_api_key_hash'. | |
""" | |
prompt_management_metadata: Optional[ | |
StandardLoggingPromptManagementMetadata | |
] = None | |
if litellm_params is not None: | |
prompt_id = cast(Optional[str], litellm_params.get("prompt_id", None)) | |
prompt_variables = cast( | |
Optional[dict], litellm_params.get("prompt_variables", None) | |
) | |
if prompt_id is not None and prompt_integration is not None: | |
prompt_management_metadata = StandardLoggingPromptManagementMetadata( | |
prompt_id=prompt_id, | |
prompt_variables=prompt_variables, | |
prompt_integration=prompt_integration, | |
) | |
# Initialize with default values | |
clean_metadata = StandardLoggingMetadata( | |
user_api_key_hash=None, | |
user_api_key_alias=None, | |
user_api_key_team_id=None, | |
user_api_key_org_id=None, | |
user_api_key_user_id=None, | |
user_api_key_team_alias=None, | |
user_api_key_user_email=None, | |
spend_logs_metadata=None, | |
requester_ip_address=None, | |
requester_metadata=None, | |
user_api_key_end_user_id=None, | |
prompt_management_metadata=prompt_management_metadata, | |
applied_guardrails=applied_guardrails, | |
mcp_tool_call_metadata=mcp_tool_call_metadata, | |
usage_object=usage_object, | |
) | |
if isinstance(metadata, dict): | |
# Filter the metadata dictionary to include only the specified keys | |
supported_keys = StandardLoggingMetadata.__annotations__.keys() | |
for key in supported_keys: | |
if key in metadata: | |
clean_metadata[key] = metadata[key] # type: ignore | |
if metadata.get("user_api_key") is not None: | |
if is_valid_sha256_hash(str(metadata.get("user_api_key"))): | |
clean_metadata["user_api_key_hash"] = metadata.get( | |
"user_api_key" | |
) # this is the hash | |
_potential_requester_metadata = metadata.get( | |
"metadata", None | |
) # check if user passed metadata in the sdk request - e.g. metadata for langsmith logging - https://docs.litellm.ai/docs/observability/langsmith_integration#set-langsmith-fields | |
if ( | |
clean_metadata["requester_metadata"] is None | |
and _potential_requester_metadata is not None | |
and isinstance(_potential_requester_metadata, dict) | |
): | |
clean_metadata["requester_metadata"] = _potential_requester_metadata | |
return clean_metadata | |
def get_usage_from_response_obj( | |
response_obj: Optional[dict], combined_usage_object: Optional[Usage] = None | |
) -> Usage: | |
## BASE CASE ## | |
if combined_usage_object is not None: | |
return combined_usage_object | |
if response_obj is None: | |
return Usage( | |
prompt_tokens=0, | |
completion_tokens=0, | |
total_tokens=0, | |
) | |
usage = response_obj.get("usage", None) or {} | |
if usage is None or ( | |
not isinstance(usage, dict) and not isinstance(usage, Usage) | |
): | |
return Usage( | |
prompt_tokens=0, | |
completion_tokens=0, | |
total_tokens=0, | |
) | |
elif isinstance(usage, Usage): | |
return usage | |
elif isinstance(usage, dict): | |
if ResponseAPILoggingUtils._is_response_api_usage(usage): | |
return ( | |
ResponseAPILoggingUtils._transform_response_api_usage_to_chat_usage( | |
usage | |
) | |
) | |
return Usage(**usage) | |
raise ValueError(f"usage is required, got={usage} of type {type(usage)}") | |
def get_model_cost_information( | |
base_model: Optional[str], | |
custom_pricing: Optional[bool], | |
custom_llm_provider: Optional[str], | |
init_response_obj: Union[Any, BaseModel, dict], | |
) -> StandardLoggingModelInformation: | |
model_cost_name = _select_model_name_for_cost_calc( | |
model=None, | |
completion_response=init_response_obj, # type: ignore | |
base_model=base_model, | |
custom_pricing=custom_pricing, | |
) | |
if model_cost_name is None: | |
model_cost_information = StandardLoggingModelInformation( | |
model_map_key="", model_map_value=None | |
) | |
else: | |
try: | |
_model_cost_information = litellm.get_model_info( | |
model=model_cost_name, custom_llm_provider=custom_llm_provider | |
) | |
model_cost_information = StandardLoggingModelInformation( | |
model_map_key=model_cost_name, | |
model_map_value=_model_cost_information, | |
) | |
except Exception: | |
verbose_logger.debug( # keep in debug otherwise it will trigger on every call | |
"Model={} is not mapped in model cost map. Defaulting to None model_cost_information for standard_logging_payload".format( | |
model_cost_name | |
) | |
) | |
model_cost_information = StandardLoggingModelInformation( | |
model_map_key=model_cost_name, model_map_value=None | |
) | |
return model_cost_information | |
def get_final_response_obj( | |
response_obj: dict, init_response_obj: Union[Any, BaseModel, dict], kwargs: dict | |
) -> Optional[Union[dict, str, list]]: | |
""" | |
Get final response object after redacting the message input/output from logging | |
""" | |
if response_obj is not None: | |
final_response_obj: Optional[Union[dict, str, list]] = response_obj | |
elif isinstance(init_response_obj, list) or isinstance(init_response_obj, str): | |
final_response_obj = init_response_obj | |
else: | |
final_response_obj = None | |
modified_final_response_obj = redact_message_input_output_from_logging( | |
model_call_details=kwargs, | |
result=final_response_obj, | |
) | |
if modified_final_response_obj is not None and isinstance( | |
modified_final_response_obj, BaseModel | |
): | |
final_response_obj = modified_final_response_obj.model_dump() | |
else: | |
final_response_obj = modified_final_response_obj | |
return final_response_obj | |
def get_additional_headers( | |
additiona_headers: Optional[dict], | |
) -> Optional[StandardLoggingAdditionalHeaders]: | |
if additiona_headers is None: | |
return None | |
additional_logging_headers: StandardLoggingAdditionalHeaders = {} | |
for key in StandardLoggingAdditionalHeaders.__annotations__.keys(): | |
_key = key.lower() | |
_key = _key.replace("_", "-") | |
if _key in additiona_headers: | |
try: | |
additional_logging_headers[key] = int(additiona_headers[_key]) # type: ignore | |
except (ValueError, TypeError): | |
verbose_logger.debug( | |
f"Could not convert {additiona_headers[_key]} to int for key {key}." | |
) | |
return additional_logging_headers | |
def get_hidden_params( | |
hidden_params: Optional[dict], | |
) -> StandardLoggingHiddenParams: | |
clean_hidden_params = StandardLoggingHiddenParams( | |
model_id=None, | |
cache_key=None, | |
api_base=None, | |
response_cost=None, | |
additional_headers=None, | |
litellm_overhead_time_ms=None, | |
batch_models=None, | |
litellm_model_name=None, | |
usage_object=None, | |
) | |
if hidden_params is not None: | |
for key in StandardLoggingHiddenParams.__annotations__.keys(): | |
if key in hidden_params: | |
if key == "additional_headers": | |
clean_hidden_params["additional_headers"] = ( | |
StandardLoggingPayloadSetup.get_additional_headers( | |
hidden_params[key] | |
) | |
) | |
else: | |
clean_hidden_params[key] = hidden_params[key] # type: ignore | |
return clean_hidden_params | |
def strip_trailing_slash(api_base: Optional[str]) -> Optional[str]: | |
if api_base: | |
return api_base.rstrip("/") | |
return api_base | |
def get_error_information( | |
original_exception: Optional[Exception], | |
) -> StandardLoggingPayloadErrorInformation: | |
error_status: str = str(getattr(original_exception, "status_code", "")) | |
error_class: str = ( | |
str(original_exception.__class__.__name__) if original_exception else "" | |
) | |
_llm_provider_in_exception = getattr(original_exception, "llm_provider", "") | |
# Get traceback information (first 100 lines) | |
traceback_info = "" | |
if original_exception: | |
tb = getattr(original_exception, "__traceback__", None) | |
if tb: | |
import traceback | |
tb_lines = traceback.format_tb(tb) | |
traceback_info = "".join(tb_lines[:100]) # Limit to first 100 lines | |
# Get additional error details | |
error_message = str(original_exception) | |
return StandardLoggingPayloadErrorInformation( | |
error_code=error_status, | |
error_class=error_class, | |
llm_provider=_llm_provider_in_exception, | |
traceback=traceback_info, | |
error_message=error_message if original_exception else "", | |
) | |
def get_response_time( | |
start_time_float: float, | |
end_time_float: float, | |
completion_start_time_float: float, | |
stream: bool, | |
) -> float: | |
""" | |
Get the response time for the LLM response | |
Args: | |
start_time_float: float - start time of the LLM call | |
end_time_float: float - end time of the LLM call | |
completion_start_time_float: float - time to first token of the LLM response (for streaming responses) | |
stream: bool - True when a stream response is returned | |
Returns: | |
float: The response time for the LLM response | |
""" | |
if stream is True: | |
return completion_start_time_float - start_time_float | |
else: | |
return end_time_float - start_time_float | |
def _get_standard_logging_payload_trace_id( | |
logging_obj: Logging, | |
litellm_params: dict, | |
) -> str: | |
""" | |
Returns the `litellm_trace_id` for this request | |
This helps link sessions when multiple requests are made in a single session | |
""" | |
dynamic_litellm_session_id = litellm_params.get("litellm_session_id") | |
dynamic_litellm_trace_id = litellm_params.get("litellm_trace_id") | |
# Note: we recommend using `litellm_session_id` for session tracking | |
# `litellm_trace_id` is an internal litellm param | |
if dynamic_litellm_session_id: | |
return str(dynamic_litellm_session_id) | |
elif dynamic_litellm_trace_id: | |
return str(dynamic_litellm_trace_id) | |
else: | |
return logging_obj.litellm_trace_id | |
def get_standard_logging_object_payload( | |
kwargs: Optional[dict], | |
init_response_obj: Union[Any, BaseModel, dict], | |
start_time: dt_object, | |
end_time: dt_object, | |
logging_obj: Logging, | |
status: StandardLoggingPayloadStatus, | |
error_str: Optional[str] = None, | |
original_exception: Optional[Exception] = None, | |
standard_built_in_tools_params: Optional[StandardBuiltInToolsParams] = None, | |
) -> Optional[StandardLoggingPayload]: | |
try: | |
kwargs = kwargs or {} | |
hidden_params: Optional[dict] = None | |
if init_response_obj is None: | |
response_obj = {} | |
elif isinstance(init_response_obj, BaseModel): | |
response_obj = init_response_obj.model_dump() | |
hidden_params = getattr(init_response_obj, "_hidden_params", None) | |
elif isinstance(init_response_obj, dict): | |
response_obj = init_response_obj | |
else: | |
response_obj = {} | |
if original_exception is not None and hidden_params is None: | |
response_headers = _get_response_headers(original_exception) | |
if response_headers is not None: | |
hidden_params = dict( | |
StandardLoggingHiddenParams( | |
additional_headers=StandardLoggingPayloadSetup.get_additional_headers( | |
dict(response_headers) | |
), | |
model_id=None, | |
cache_key=None, | |
api_base=None, | |
response_cost=None, | |
litellm_overhead_time_ms=None, | |
batch_models=None, | |
litellm_model_name=None, | |
usage_object=None, | |
) | |
) | |
# standardize this function to be used across, s3, dynamoDB, langfuse logging | |
litellm_params = kwargs.get("litellm_params", {}) | |
proxy_server_request = litellm_params.get("proxy_server_request") or {} | |
metadata: dict = ( | |
litellm_params.get("litellm_metadata") | |
or litellm_params.get("metadata", None) | |
or {} | |
) | |
completion_start_time = kwargs.get("completion_start_time", end_time) | |
call_type = kwargs.get("call_type") | |
cache_hit = kwargs.get("cache_hit", False) | |
usage = StandardLoggingPayloadSetup.get_usage_from_response_obj( | |
response_obj=response_obj, | |
combined_usage_object=cast( | |
Optional[Usage], kwargs.get("combined_usage_object") | |
), | |
) | |
id = response_obj.get("id", kwargs.get("litellm_call_id")) | |
_model_id = metadata.get("model_info", {}).get("id", "") | |
_model_group = metadata.get("model_group", "") | |
request_tags = ( | |
metadata.get("tags", []) | |
if isinstance(metadata.get("tags", []), list) | |
else [] | |
) | |
# cleanup timestamps | |
( | |
start_time_float, | |
end_time_float, | |
completion_start_time_float, | |
) = StandardLoggingPayloadSetup.cleanup_timestamps( | |
start_time=start_time, | |
end_time=end_time, | |
completion_start_time=completion_start_time, | |
) | |
response_time = StandardLoggingPayloadSetup.get_response_time( | |
start_time_float=start_time_float, | |
end_time_float=end_time_float, | |
completion_start_time_float=completion_start_time_float, | |
stream=kwargs.get("stream", False), | |
) | |
# clean up litellm hidden params | |
clean_hidden_params = StandardLoggingPayloadSetup.get_hidden_params( | |
hidden_params | |
) | |
# clean up litellm metadata | |
clean_metadata = StandardLoggingPayloadSetup.get_standard_logging_metadata( | |
metadata=metadata, | |
litellm_params=litellm_params, | |
prompt_integration=kwargs.get("prompt_integration", None), | |
applied_guardrails=kwargs.get("applied_guardrails", None), | |
mcp_tool_call_metadata=kwargs.get("mcp_tool_call_metadata", None), | |
usage_object=usage.model_dump(), | |
) | |
_request_body = proxy_server_request.get("body", {}) | |
end_user_id = clean_metadata["user_api_key_end_user_id"] or _request_body.get( | |
"user", None | |
) # maintain backwards compatibility with old request body check | |
saved_cache_cost: float = 0.0 | |
if cache_hit is True: | |
id = f"{id}_cache_hit{time.time()}" # do not duplicate the request id | |
saved_cache_cost = ( | |
logging_obj._response_cost_calculator( | |
result=init_response_obj, cache_hit=False # type: ignore | |
) | |
or 0.0 | |
) | |
## Get model cost information ## | |
base_model = _get_base_model_from_metadata(model_call_details=kwargs) | |
custom_pricing = use_custom_pricing_for_model(litellm_params=litellm_params) | |
model_cost_information = StandardLoggingPayloadSetup.get_model_cost_information( | |
base_model=base_model, | |
custom_pricing=custom_pricing, | |
custom_llm_provider=kwargs.get("custom_llm_provider"), | |
init_response_obj=init_response_obj, | |
) | |
response_cost: float = kwargs.get("response_cost", 0) or 0.0 | |
error_information = StandardLoggingPayloadSetup.get_error_information( | |
original_exception=original_exception, | |
) | |
## get final response object ## | |
final_response_obj = StandardLoggingPayloadSetup.get_final_response_obj( | |
response_obj=response_obj, | |
init_response_obj=init_response_obj, | |
kwargs=kwargs, | |
) | |
stream: Optional[bool] = None | |
if ( | |
kwargs.get("complete_streaming_response") is not None | |
or kwargs.get("async_complete_streaming_response") is not None | |
): | |
stream = True | |
payload: StandardLoggingPayload = StandardLoggingPayload( | |
id=str(id), | |
trace_id=StandardLoggingPayloadSetup._get_standard_logging_payload_trace_id( | |
logging_obj=logging_obj, | |
litellm_params=litellm_params, | |
), | |
call_type=call_type or "", | |
cache_hit=cache_hit, | |
stream=stream, | |
status=status, | |
custom_llm_provider=cast(Optional[str], kwargs.get("custom_llm_provider")), | |
saved_cache_cost=saved_cache_cost, | |
startTime=start_time_float, | |
endTime=end_time_float, | |
completionStartTime=completion_start_time_float, | |
response_time=response_time, | |
model=kwargs.get("model", "") or "", | |
metadata=clean_metadata, | |
cache_key=clean_hidden_params["cache_key"], | |
response_cost=response_cost, | |
total_tokens=usage.total_tokens, | |
prompt_tokens=usage.prompt_tokens, | |
completion_tokens=usage.completion_tokens, | |
request_tags=request_tags, | |
end_user=end_user_id or "", | |
api_base=StandardLoggingPayloadSetup.strip_trailing_slash( | |
litellm_params.get("api_base", "") | |
) | |
or "", | |
model_group=_model_group, | |
model_id=_model_id, | |
requester_ip_address=clean_metadata.get("requester_ip_address", None), | |
messages=kwargs.get("messages"), | |
response=final_response_obj, | |
model_parameters=ModelParamHelper.get_standard_logging_model_parameters( | |
kwargs.get("optional_params", None) or {} | |
), | |
hidden_params=clean_hidden_params, | |
model_map_information=model_cost_information, | |
error_str=error_str, | |
error_information=error_information, | |
response_cost_failure_debug_info=kwargs.get( | |
"response_cost_failure_debug_information" | |
), | |
guardrail_information=metadata.get( | |
"standard_logging_guardrail_information", None | |
), | |
standard_built_in_tools_params=standard_built_in_tools_params, | |
) | |
emit_standard_logging_payload(payload) | |
return payload | |
except Exception as e: | |
verbose_logger.exception( | |
"Error creating standard logging object - {}".format(str(e)) | |
) | |
return None | |
def emit_standard_logging_payload(payload: StandardLoggingPayload): | |
if os.getenv("LITELLM_PRINT_STANDARD_LOGGING_PAYLOAD"): | |
verbose_logger.info(json.dumps(payload, indent=4)) | |
def get_standard_logging_metadata( | |
metadata: Optional[Dict[str, Any]], | |
) -> StandardLoggingMetadata: | |
""" | |
Clean and filter the metadata dictionary to include only the specified keys in StandardLoggingMetadata. | |
Args: | |
metadata (Optional[Dict[str, Any]]): The original metadata dictionary. | |
Returns: | |
StandardLoggingMetadata: A StandardLoggingMetadata object containing the cleaned metadata. | |
Note: | |
- If the input metadata is None or not a dictionary, an empty StandardLoggingMetadata object is returned. | |
- If 'user_api_key' is present in metadata and is a valid SHA256 hash, it's stored as 'user_api_key_hash'. | |
""" | |
# Initialize with default values | |
clean_metadata = StandardLoggingMetadata( | |
user_api_key_hash=None, | |
user_api_key_alias=None, | |
user_api_key_team_id=None, | |
user_api_key_org_id=None, | |
user_api_key_user_id=None, | |
user_api_key_user_email=None, | |
user_api_key_team_alias=None, | |
spend_logs_metadata=None, | |
requester_ip_address=None, | |
requester_metadata=None, | |
user_api_key_end_user_id=None, | |
prompt_management_metadata=None, | |
applied_guardrails=None, | |
mcp_tool_call_metadata=None, | |
usage_object=None, | |
) | |
if isinstance(metadata, dict): | |
# Filter the metadata dictionary to include only the specified keys | |
clean_metadata = StandardLoggingMetadata( | |
**{ # type: ignore | |
key: metadata[key] | |
for key in StandardLoggingMetadata.__annotations__.keys() | |
if key in metadata | |
} | |
) | |
if metadata.get("user_api_key") is not None: | |
if is_valid_sha256_hash(str(metadata.get("user_api_key"))): | |
clean_metadata["user_api_key_hash"] = metadata.get( | |
"user_api_key" | |
) # this is the hash | |
return clean_metadata | |
def scrub_sensitive_keys_in_metadata(litellm_params: Optional[dict]): | |
if litellm_params is None: | |
litellm_params = {} | |
metadata = litellm_params.get("metadata", {}) or {} | |
## check user_api_key_metadata for sensitive logging keys | |
cleaned_user_api_key_metadata = {} | |
if "user_api_key_metadata" in metadata and isinstance( | |
metadata["user_api_key_metadata"], dict | |
): | |
for k, v in metadata["user_api_key_metadata"].items(): | |
if k == "logging": # prevent logging user logging keys | |
cleaned_user_api_key_metadata[k] = ( | |
"scrubbed_by_litellm_for_sensitive_keys" | |
) | |
else: | |
cleaned_user_api_key_metadata[k] = v | |
metadata["user_api_key_metadata"] = cleaned_user_api_key_metadata | |
litellm_params["metadata"] = metadata | |
return litellm_params | |
# integration helper function | |
def modify_integration(integration_name, integration_params): | |
global supabaseClient | |
if integration_name == "supabase": | |
if "table_name" in integration_params: | |
Supabase.supabase_table_name = integration_params["table_name"] | |
def _get_traceback_str_for_error(error_str: str) -> str: | |
""" | |
function wrapped with lru_cache to limit the number of times `traceback.format_exc()` is called | |
""" | |
return traceback.format_exc() | |
from decimal import Decimal | |
# used for unit testing | |
from typing import Any, Dict, List, Optional, Union | |
def create_dummy_standard_logging_payload() -> StandardLoggingPayload: | |
# First create the nested objects with proper typing | |
model_info = StandardLoggingModelInformation( | |
model_map_key="gpt-3.5-turbo", model_map_value=None | |
) | |
metadata = StandardLoggingMetadata( # type: ignore | |
user_api_key_hash=str("test_hash"), | |
user_api_key_alias=str("test_alias"), | |
user_api_key_team_id=str("test_team"), | |
user_api_key_user_id=str("test_user"), | |
user_api_key_team_alias=str("test_team_alias"), | |
user_api_key_org_id=None, | |
spend_logs_metadata=None, | |
requester_ip_address=str("127.0.0.1"), | |
requester_metadata=None, | |
user_api_key_end_user_id=str("test_end_user"), | |
) | |
hidden_params = StandardLoggingHiddenParams( | |
model_id=None, | |
cache_key=None, | |
api_base=None, | |
response_cost=None, | |
additional_headers=None, | |
litellm_overhead_time_ms=None, | |
batch_models=None, | |
litellm_model_name=None, | |
usage_object=None, | |
) | |
# Convert numeric values to appropriate types | |
response_cost = Decimal("0.1") | |
start_time = Decimal("1234567890.0") | |
end_time = Decimal("1234567891.0") | |
completion_start_time = Decimal("1234567890.5") | |
saved_cache_cost = Decimal("0.0") | |
# Create messages and response with proper typing | |
messages: List[Dict[str, str]] = [{"role": "user", "content": "Hello, world!"}] | |
response: Dict[str, List[Dict[str, Dict[str, str]]]] = { | |
"choices": [{"message": {"content": "Hi there!"}}] | |
} | |
# Main payload initialization | |
return StandardLoggingPayload( # type: ignore | |
id=str("test_id"), | |
call_type=str("completion"), | |
stream=bool(False), | |
response_cost=response_cost, | |
response_cost_failure_debug_info=None, | |
status=str("success"), | |
total_tokens=int( | |
DEFAULT_MOCK_RESPONSE_PROMPT_TOKEN_COUNT | |
+ DEFAULT_MOCK_RESPONSE_COMPLETION_TOKEN_COUNT | |
), | |
prompt_tokens=int(DEFAULT_MOCK_RESPONSE_PROMPT_TOKEN_COUNT), | |
completion_tokens=int(DEFAULT_MOCK_RESPONSE_COMPLETION_TOKEN_COUNT), | |
startTime=start_time, | |
endTime=end_time, | |
completionStartTime=completion_start_time, | |
model_map_information=model_info, | |
model=str("gpt-3.5-turbo"), | |
model_id=str("model-123"), | |
model_group=str("openai-gpt"), | |
custom_llm_provider=str("openai"), | |
api_base=str("https://api.openai.com"), | |
metadata=metadata, | |
cache_hit=bool(False), | |
cache_key=None, | |
saved_cache_cost=saved_cache_cost, | |
request_tags=[], | |
end_user=None, | |
requester_ip_address=str("127.0.0.1"), | |
messages=messages, | |
response=response, | |
error_str=None, | |
model_parameters={"stream": True}, | |
hidden_params=hidden_params, | |
) | |