Spaces:
Sleeping
Sleeping
""" | |
DataDog Integration - sends logs to /api/v2/log | |
DD Reference API: https://docs.datadoghq.com/api/latest/logs | |
`async_log_success_event` - used by litellm proxy to send logs to datadog | |
`log_success_event` - sync version of logging to DataDog, only used on litellm Python SDK, if user opts in to using sync functions | |
async_log_success_event: will store batch of DD_MAX_BATCH_SIZE in memory and flush to Datadog once it reaches DD_MAX_BATCH_SIZE or every 5 seconds | |
async_service_failure_hook: Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog | |
For batching specific details see CustomBatchLogger class | |
""" | |
import asyncio | |
import datetime | |
import json | |
import os | |
import traceback | |
import uuid | |
from datetime import datetime as datetimeObj | |
from typing import Any, List, Optional, Union | |
import httpx | |
from httpx import Response | |
import litellm | |
from litellm._logging import verbose_logger | |
from litellm.integrations.custom_batch_logger import CustomBatchLogger | |
from litellm.llms.custom_httpx.http_handler import ( | |
_get_httpx_client, | |
get_async_httpx_client, | |
httpxSpecialProvider, | |
) | |
from litellm.types.integrations.base_health_check import IntegrationHealthCheckStatus | |
from litellm.types.integrations.datadog import * | |
from litellm.types.services import ServiceLoggerPayload, ServiceTypes | |
from litellm.types.utils import StandardLoggingPayload | |
from ..additional_logging_utils import AdditionalLoggingUtils | |
# max number of logs DD API can accept | |
# specify what ServiceTypes are logged as success events to DD. (We don't want to spam DD traces with large number of service types) | |
DD_LOGGED_SUCCESS_SERVICE_TYPES = [ | |
ServiceTypes.RESET_BUDGET_JOB, | |
] | |
class DataDogLogger( | |
CustomBatchLogger, | |
AdditionalLoggingUtils, | |
): | |
# Class variables or attributes | |
def __init__( | |
self, | |
**kwargs, | |
): | |
""" | |
Initializes the datadog logger, checks if the correct env variables are set | |
Required environment variables: | |
`DD_API_KEY` - your datadog api key | |
`DD_SITE` - your datadog site, example = `"us5.datadoghq.com"` | |
""" | |
try: | |
verbose_logger.debug("Datadog: in init datadog logger") | |
# check if the correct env variables are set | |
if os.getenv("DD_API_KEY", None) is None: | |
raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>") | |
if os.getenv("DD_SITE", None) is None: | |
raise Exception("DD_SITE is not set in .env, set 'DD_SITE=<>") | |
self.async_client = get_async_httpx_client( | |
llm_provider=httpxSpecialProvider.LoggingCallback | |
) | |
self.DD_API_KEY = os.getenv("DD_API_KEY") | |
self.intake_url = ( | |
f"https://http-intake.logs.{os.getenv('DD_SITE')}/api/v2/logs" | |
) | |
################################### | |
# OPTIONAL -only used for testing | |
dd_base_url: Optional[str] = ( | |
os.getenv("_DATADOG_BASE_URL") | |
or os.getenv("DATADOG_BASE_URL") | |
or os.getenv("DD_BASE_URL") | |
) | |
if dd_base_url is not None: | |
self.intake_url = f"{dd_base_url}/api/v2/logs" | |
################################### | |
self.sync_client = _get_httpx_client() | |
asyncio.create_task(self.periodic_flush()) | |
self.flush_lock = asyncio.Lock() | |
super().__init__( | |
**kwargs, flush_lock=self.flush_lock, batch_size=DD_MAX_BATCH_SIZE | |
) | |
except Exception as e: | |
verbose_logger.exception( | |
f"Datadog: Got exception on init Datadog client {str(e)}" | |
) | |
raise e | |
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
""" | |
Async Log success events to Datadog | |
- Creates a Datadog payload | |
- Adds the Payload to the in memory logs queue | |
- Payload is flushed every 10 seconds or when batch size is greater than 100 | |
Raises: | |
Raises a NON Blocking verbose_logger.exception if an error occurs | |
""" | |
try: | |
verbose_logger.debug( | |
"Datadog: Logging - Enters logging function for model %s", kwargs | |
) | |
await self._log_async_event(kwargs, response_obj, start_time, end_time) | |
except Exception as e: | |
verbose_logger.exception( | |
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" | |
) | |
pass | |
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
try: | |
verbose_logger.debug( | |
"Datadog: Logging - Enters logging function for model %s", kwargs | |
) | |
await self._log_async_event(kwargs, response_obj, start_time, end_time) | |
except Exception as e: | |
verbose_logger.exception( | |
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" | |
) | |
pass | |
async def async_send_batch(self): | |
""" | |
Sends the in memory logs queue to datadog api | |
Logs sent to /api/v2/logs | |
DD Ref: https://docs.datadoghq.com/api/latest/logs/ | |
Raises: | |
Raises a NON Blocking verbose_logger.exception if an error occurs | |
""" | |
try: | |
if not self.log_queue: | |
verbose_logger.exception("Datadog: log_queue does not exist") | |
return | |
verbose_logger.debug( | |
"Datadog - about to flush %s events on %s", | |
len(self.log_queue), | |
self.intake_url, | |
) | |
response = await self.async_send_compressed_data(self.log_queue) | |
if response.status_code == 413: | |
verbose_logger.exception(DD_ERRORS.DATADOG_413_ERROR.value) | |
return | |
response.raise_for_status() | |
if response.status_code != 202: | |
raise Exception( | |
f"Response from datadog API status_code: {response.status_code}, text: {response.text}" | |
) | |
verbose_logger.debug( | |
"Datadog: Response from datadog API status_code: %s, text: %s", | |
response.status_code, | |
response.text, | |
) | |
except Exception as e: | |
verbose_logger.exception( | |
f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}" | |
) | |
def log_success_event(self, kwargs, response_obj, start_time, end_time): | |
""" | |
Sync Log success events to Datadog | |
- Creates a Datadog payload | |
- instantly logs it on DD API | |
""" | |
try: | |
if litellm.datadog_use_v1 is True: | |
dd_payload = self._create_v0_logging_payload( | |
kwargs=kwargs, | |
response_obj=response_obj, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
else: | |
dd_payload = self.create_datadog_logging_payload( | |
kwargs=kwargs, | |
response_obj=response_obj, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
response = self.sync_client.post( | |
url=self.intake_url, | |
json=dd_payload, # type: ignore | |
headers={ | |
"DD-API-KEY": self.DD_API_KEY, | |
}, | |
) | |
response.raise_for_status() | |
if response.status_code != 202: | |
raise Exception( | |
f"Response from datadog API status_code: {response.status_code}, text: {response.text}" | |
) | |
verbose_logger.debug( | |
"Datadog: Response from datadog API status_code: %s, text: %s", | |
response.status_code, | |
response.text, | |
) | |
except Exception as e: | |
verbose_logger.exception( | |
f"Datadog Layer Error - {str(e)}\n{traceback.format_exc()}" | |
) | |
pass | |
pass | |
async def _log_async_event(self, kwargs, response_obj, start_time, end_time): | |
dd_payload = self.create_datadog_logging_payload( | |
kwargs=kwargs, | |
response_obj=response_obj, | |
start_time=start_time, | |
end_time=end_time, | |
) | |
self.log_queue.append(dd_payload) | |
verbose_logger.debug( | |
f"Datadog, event added to queue. Will flush in {self.flush_interval} seconds..." | |
) | |
if len(self.log_queue) >= self.batch_size: | |
await self.async_send_batch() | |
def _create_datadog_logging_payload_helper( | |
self, | |
standard_logging_object: StandardLoggingPayload, | |
status: DataDogStatus, | |
) -> DatadogPayload: | |
json_payload = json.dumps(standard_logging_object, default=str) | |
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) | |
dd_payload = DatadogPayload( | |
ddsource=self._get_datadog_source(), | |
ddtags=self._get_datadog_tags( | |
standard_logging_object=standard_logging_object | |
), | |
hostname=self._get_datadog_hostname(), | |
message=json_payload, | |
service=self._get_datadog_service(), | |
status=status, | |
) | |
return dd_payload | |
def create_datadog_logging_payload( | |
self, | |
kwargs: Union[dict, Any], | |
response_obj: Any, | |
start_time: datetime.datetime, | |
end_time: datetime.datetime, | |
) -> DatadogPayload: | |
""" | |
Helper function to create a datadog payload for logging | |
Args: | |
kwargs (Union[dict, Any]): request kwargs | |
response_obj (Any): llm api response | |
start_time (datetime.datetime): start time of request | |
end_time (datetime.datetime): end time of request | |
Returns: | |
DatadogPayload: defined in types.py | |
""" | |
standard_logging_object: Optional[StandardLoggingPayload] = kwargs.get( | |
"standard_logging_object", None | |
) | |
if standard_logging_object is None: | |
raise ValueError("standard_logging_object not found in kwargs") | |
status = DataDogStatus.INFO | |
if standard_logging_object.get("status") == "failure": | |
status = DataDogStatus.ERROR | |
# Build the initial payload | |
self.truncate_standard_logging_payload_content(standard_logging_object) | |
dd_payload = self._create_datadog_logging_payload_helper( | |
standard_logging_object=standard_logging_object, | |
status=status, | |
) | |
return dd_payload | |
async def async_send_compressed_data(self, data: List) -> Response: | |
""" | |
Async helper to send compressed data to datadog self.intake_url | |
Datadog recommends using gzip to compress data | |
https://docs.datadoghq.com/api/latest/logs/ | |
"Datadog recommends sending your logs compressed. Add the Content-Encoding: gzip header to the request when sending" | |
""" | |
import gzip | |
import json | |
compressed_data = gzip.compress(json.dumps(data, default=str).encode("utf-8")) | |
response = await self.async_client.post( | |
url=self.intake_url, | |
data=compressed_data, # type: ignore | |
headers={ | |
"DD-API-KEY": self.DD_API_KEY, | |
"Content-Encoding": "gzip", | |
"Content-Type": "application/json", | |
}, | |
) | |
return response | |
async def async_service_failure_hook( | |
self, | |
payload: ServiceLoggerPayload, | |
error: Optional[str] = "", | |
parent_otel_span: Optional[Any] = None, | |
start_time: Optional[Union[datetimeObj, float]] = None, | |
end_time: Optional[Union[float, datetimeObj]] = None, | |
event_metadata: Optional[dict] = None, | |
): | |
""" | |
Logs failures from Redis, Postgres (Adjacent systems), as 'WARNING' on DataDog | |
- example - Redis is failing / erroring, will be logged on DataDog | |
""" | |
try: | |
_payload_dict = payload.model_dump() | |
_payload_dict.update(event_metadata or {}) | |
_dd_message_str = json.dumps(_payload_dict, default=str) | |
_dd_payload = DatadogPayload( | |
ddsource=self._get_datadog_source(), | |
ddtags=self._get_datadog_tags(), | |
hostname=self._get_datadog_hostname(), | |
message=_dd_message_str, | |
service=self._get_datadog_service(), | |
status=DataDogStatus.WARN, | |
) | |
self.log_queue.append(_dd_payload) | |
except Exception as e: | |
verbose_logger.exception( | |
f"Datadog: Logger - Exception in async_service_failure_hook: {e}" | |
) | |
pass | |
async def async_service_success_hook( | |
self, | |
payload: ServiceLoggerPayload, | |
error: Optional[str] = "", | |
parent_otel_span: Optional[Any] = None, | |
start_time: Optional[Union[datetimeObj, float]] = None, | |
end_time: Optional[Union[float, datetimeObj]] = None, | |
event_metadata: Optional[dict] = None, | |
): | |
""" | |
Logs success from Redis, Postgres (Adjacent systems), as 'INFO' on DataDog | |
No user has asked for this so far, this might be spammy on datatdog. If need arises we can implement this | |
""" | |
try: | |
# intentionally done. Don't want to log all service types to DD | |
if payload.service not in DD_LOGGED_SUCCESS_SERVICE_TYPES: | |
return | |
_payload_dict = payload.model_dump() | |
_payload_dict.update(event_metadata or {}) | |
_dd_message_str = json.dumps(_payload_dict, default=str) | |
_dd_payload = DatadogPayload( | |
ddsource=self._get_datadog_source(), | |
ddtags=self._get_datadog_tags(), | |
hostname=self._get_datadog_hostname(), | |
message=_dd_message_str, | |
service=self._get_datadog_service(), | |
status=DataDogStatus.INFO, | |
) | |
self.log_queue.append(_dd_payload) | |
except Exception as e: | |
verbose_logger.exception( | |
f"Datadog: Logger - Exception in async_service_failure_hook: {e}" | |
) | |
def _create_v0_logging_payload( | |
self, | |
kwargs: Union[dict, Any], | |
response_obj: Any, | |
start_time: datetime.datetime, | |
end_time: datetime.datetime, | |
) -> DatadogPayload: | |
""" | |
Note: This is our V1 Version of DataDog Logging Payload | |
(Not Recommended) If you want this to get logged set `litellm.datadog_use_v1 = True` | |
""" | |
import json | |
litellm_params = kwargs.get("litellm_params", {}) | |
metadata = ( | |
litellm_params.get("metadata", {}) or {} | |
) # if litellm_params['metadata'] == None | |
messages = kwargs.get("messages") | |
optional_params = kwargs.get("optional_params", {}) | |
call_type = kwargs.get("call_type", "litellm.completion") | |
cache_hit = kwargs.get("cache_hit", False) | |
usage = response_obj["usage"] | |
id = response_obj.get("id", str(uuid.uuid4())) | |
usage = dict(usage) | |
try: | |
response_time = (end_time - start_time).total_seconds() * 1000 | |
except Exception: | |
response_time = None | |
try: | |
response_obj = dict(response_obj) | |
except Exception: | |
response_obj = response_obj | |
# Clean Metadata before logging - never log raw metadata | |
# the raw metadata can contain circular references which leads to infinite recursion | |
# we clean out all extra litellm metadata params before logging | |
clean_metadata = {} | |
if isinstance(metadata, dict): | |
for key, value in metadata.items(): | |
# clean litellm metadata before logging | |
if key in [ | |
"endpoint", | |
"caching_groups", | |
"previous_models", | |
]: | |
continue | |
else: | |
clean_metadata[key] = value | |
# Build the initial payload | |
payload = { | |
"id": id, | |
"call_type": call_type, | |
"cache_hit": cache_hit, | |
"start_time": start_time, | |
"end_time": end_time, | |
"response_time": response_time, | |
"model": kwargs.get("model", ""), | |
"user": kwargs.get("user", ""), | |
"model_parameters": optional_params, | |
"spend": kwargs.get("response_cost", 0), | |
"messages": messages, | |
"response": response_obj, | |
"usage": usage, | |
"metadata": clean_metadata, | |
} | |
json_payload = json.dumps(payload, default=str) | |
verbose_logger.debug("Datadog: Logger - Logging payload = %s", json_payload) | |
dd_payload = DatadogPayload( | |
ddsource=self._get_datadog_source(), | |
ddtags=self._get_datadog_tags(), | |
hostname=self._get_datadog_hostname(), | |
message=json_payload, | |
service=self._get_datadog_service(), | |
status=DataDogStatus.INFO, | |
) | |
return dd_payload | |
def _get_datadog_tags( | |
standard_logging_object: Optional[StandardLoggingPayload] = None, | |
) -> str: | |
""" | |
Get the datadog tags for the request | |
DD tags need to be as follows: | |
- tags: ["user_handle:dog@gmail.com", "app_version:1.0.0"] | |
""" | |
base_tags = { | |
"env": os.getenv("DD_ENV", "unknown"), | |
"service": os.getenv("DD_SERVICE", "litellm"), | |
"version": os.getenv("DD_VERSION", "unknown"), | |
"HOSTNAME": DataDogLogger._get_datadog_hostname(), | |
"POD_NAME": os.getenv("POD_NAME", "unknown"), | |
} | |
tags = [f"{k}:{v}" for k, v in base_tags.items()] | |
if standard_logging_object: | |
_request_tags: List[str] = ( | |
standard_logging_object.get("request_tags", []) or [] | |
) | |
request_tags = [f"request_tag:{tag}" for tag in _request_tags] | |
tags.extend(request_tags) | |
return ",".join(tags) | |
def _get_datadog_source(): | |
return os.getenv("DD_SOURCE", "litellm") | |
def _get_datadog_service(): | |
return os.getenv("DD_SERVICE", "litellm-server") | |
def _get_datadog_hostname(): | |
return os.getenv("HOSTNAME", "") | |
def _get_datadog_env(): | |
return os.getenv("DD_ENV", "unknown") | |
def _get_datadog_pod_name(): | |
return os.getenv("POD_NAME", "unknown") | |
async def async_health_check(self) -> IntegrationHealthCheckStatus: | |
""" | |
Check if the service is healthy | |
""" | |
from litellm.litellm_core_utils.litellm_logging import ( | |
create_dummy_standard_logging_payload, | |
) | |
standard_logging_object = create_dummy_standard_logging_payload() | |
dd_payload = self._create_datadog_logging_payload_helper( | |
standard_logging_object=standard_logging_object, | |
status=DataDogStatus.INFO, | |
) | |
log_queue = [dd_payload] | |
response = await self.async_send_compressed_data(log_queue) | |
try: | |
response.raise_for_status() | |
return IntegrationHealthCheckStatus( | |
status="healthy", | |
error_message=None, | |
) | |
except httpx.HTTPStatusError as e: | |
return IntegrationHealthCheckStatus( | |
status="unhealthy", | |
error_message=e.response.text, | |
) | |
except Exception as e: | |
return IntegrationHealthCheckStatus( | |
status="unhealthy", | |
error_message=str(e), | |
) | |
async def get_request_response_payload( | |
self, | |
request_id: str, | |
start_time_utc: Optional[datetimeObj], | |
end_time_utc: Optional[datetimeObj], | |
) -> Optional[dict]: | |
pass | |