#### What this does #### # On success + failure, log events to Supabase import dotenv, os import requests dotenv.load_dotenv() # Loading env variables using dotenv import traceback import datetime, subprocess, sys import litellm, uuid from litellm._logging import print_verbose class S3Logger: # Class variables or attributes def __init__( self, s3_bucket_name=None, s3_region_name=None, s3_api_version=None, s3_use_ssl=True, s3_verify=None, s3_endpoint_url=None, s3_aws_access_key_id=None, s3_aws_secret_access_key=None, s3_aws_session_token=None, s3_config=None, **kwargs, ): import boto3 try: print_verbose("in init s3 logger") if litellm.s3_callback_params is not None: # read in .env variables - example os.environ/AWS_BUCKET_NAME for key, value in litellm.s3_callback_params.items(): if type(value) is str and value.startswith("os.environ/"): litellm.s3_callback_params[key] = litellm.get_secret(value) # now set s3 params from litellm.s3_logger_params s3_bucket_name = litellm.s3_callback_params.get("s3_bucket_name") s3_region_name = litellm.s3_callback_params.get("s3_region_name") s3_api_version = litellm.s3_callback_params.get("s3_api_version") s3_use_ssl = litellm.s3_callback_params.get("s3_use_ssl") s3_verify = litellm.s3_callback_params.get("s3_verify") s3_endpoint_url = litellm.s3_callback_params.get("s3_endpoint_url") s3_aws_access_key_id = litellm.s3_callback_params.get( "s3_aws_access_key_id" ) s3_aws_secret_access_key = litellm.s3_callback_params.get( "s3_aws_secret_access_key" ) s3_aws_session_token = litellm.s3_callback_params.get( "s3_aws_session_token" ) s3_config = litellm.s3_callback_params.get("s3_config") # done reading litellm.s3_callback_params self.bucket_name = s3_bucket_name # Create an S3 client with custom endpoint URL self.s3_client = boto3.client( "s3", region_name=s3_region_name, endpoint_url=s3_endpoint_url, api_version=s3_api_version, use_ssl=s3_use_ssl, verify=s3_verify, aws_access_key_id=s3_aws_access_key_id, aws_secret_access_key=s3_aws_secret_access_key, aws_session_token=s3_aws_session_token, config=s3_config, **kwargs, ) except Exception as e: print_verbose(f"Got exception on init s3 client {str(e)}") raise e async def _async_log_event( self, kwargs, response_obj, start_time, end_time, print_verbose ): self.log_event(kwargs, response_obj, start_time, end_time, print_verbose) def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose): try: print_verbose(f"s3 Logging - Enters logging function for model {kwargs}") # construct payload to send to s3 # follows the same params as langfuse.py litellm_params = kwargs.get("litellm_params", {}) metadata = ( litellm_params.get("metadata", {}) or {} ) # if litellm_params['metadata'] == None messages = kwargs.get("messages") optional_params = kwargs.get("optional_params", {}) call_type = kwargs.get("call_type", "litellm.completion") cache_hit = kwargs.get("cache_hit", False) usage = response_obj["usage"] id = response_obj.get("id", str(uuid.uuid4())) # Build the initial payload payload = { "id": id, "call_type": call_type, "cache_hit": cache_hit, "startTime": start_time, "endTime": end_time, "model": kwargs.get("model", ""), "user": kwargs.get("user", ""), "modelParameters": optional_params, "messages": messages, "response": response_obj, "usage": usage, "metadata": metadata, } # Ensure everything in the payload is converted to str for key, value in payload.items(): try: payload[key] = str(value) except: # non blocking if it can't cast to a str pass s3_object_key = ( payload["id"] + "-time=" + str(start_time) ) # we need the s3 key to include the time, so we log cache hits too import json payload = json.dumps(payload) print_verbose(f"\ns3 Logger - Logging payload = {payload}") response = self.s3_client.put_object( Bucket=self.bucket_name, Key=s3_object_key, Body=payload, ContentType="application/json", ContentLanguage="en", ContentDisposition=f'inline; filename="{key}.json"', ) print_verbose(f"Response from s3:{str(response)}") print_verbose(f"s3 Layer Logging - final response object: {response_obj}") return response except Exception as e: traceback.print_exc() print_verbose(f"s3 Layer Error - {str(e)}\n{traceback.format_exc()}") pass