### What this tests #### ## This test asserts the type of data passed into each method of the custom callback handler import sys, os, time, inspect, asyncio, traceback from datetime import datetime import pytest sys.path.insert(0, os.path.abspath("../..")) from typing import Optional, Literal, List, Union from litellm import completion, embedding, Cache import litellm from litellm.integrations.custom_logger import CustomLogger # Test Scenarios (test across completion, streaming, embedding) ## 1: Pre-API-Call ## 2: Post-API-Call ## 3: On LiteLLM Call success ## 4: On LiteLLM Call failure ## 5. Caching # Test models ## 1. OpenAI ## 2. Azure OpenAI ## 3. Non-OpenAI/Azure - e.g. Bedrock # Test interfaces ## 1. litellm.completion() + litellm.embeddings() ## refer to test_custom_callback_input_router.py for the router + proxy tests class CompletionCustomHandler( CustomLogger ): # https://docs.litellm.ai/docs/observability/custom_callback#callback-class """ The set of expected inputs to a custom handler for a """ # Class variables or attributes def __init__(self): self.errors = [] self.states: Optional[ List[ Literal[ "sync_pre_api_call", "async_pre_api_call", "post_api_call", "sync_stream", "async_stream", "sync_success", "async_success", "sync_failure", "async_failure", ] ] ] = [] def log_pre_api_call(self, model, messages, kwargs): try: self.states.append("sync_pre_api_call") ## MODEL assert isinstance(model, str) ## MESSAGES assert isinstance(messages, list) ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) except Exception as e: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) def log_post_api_call(self, kwargs, response_obj, start_time, end_time): try: self.states.append("post_api_call") ## START TIME assert isinstance(start_time, datetime) ## END TIME assert end_time == None ## RESPONSE OBJECT assert response_obj == None ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) assert isinstance(kwargs["input"], (list, dict, str)) assert isinstance(kwargs["api_key"], (str, type(None))) assert ( isinstance( kwargs["original_response"], (str, litellm.CustomStreamWrapper) ) or inspect.iscoroutine(kwargs["original_response"]) or inspect.isasyncgen(kwargs["original_response"]) ) assert isinstance(kwargs["additional_args"], (dict, type(None))) assert isinstance(kwargs["log_event_type"], str) except: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time): try: self.states.append("async_stream") ## START TIME assert isinstance(start_time, datetime) ## END TIME assert isinstance(end_time, datetime) ## RESPONSE OBJECT assert isinstance(response_obj, litellm.ModelResponse) ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) and isinstance( kwargs["messages"][0], dict ) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) assert ( isinstance(kwargs["input"], list) and isinstance(kwargs["input"][0], dict) ) or isinstance(kwargs["input"], (dict, str)) assert isinstance(kwargs["api_key"], (str, type(None))) assert ( isinstance( kwargs["original_response"], (str, litellm.CustomStreamWrapper) ) or inspect.isasyncgen(kwargs["original_response"]) or inspect.iscoroutine(kwargs["original_response"]) ) assert isinstance(kwargs["additional_args"], (dict, type(None))) assert isinstance(kwargs["log_event_type"], str) except: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) def log_success_event(self, kwargs, response_obj, start_time, end_time): try: self.states.append("sync_success") ## START TIME assert isinstance(start_time, datetime) ## END TIME assert isinstance(end_time, datetime) ## RESPONSE OBJECT assert isinstance(response_obj, litellm.ModelResponse) ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) and isinstance( kwargs["messages"][0], dict ) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) assert ( isinstance(kwargs["input"], list) and isinstance(kwargs["input"][0], dict) ) or isinstance(kwargs["input"], (dict, str)) assert isinstance(kwargs["api_key"], (str, type(None))) assert isinstance( kwargs["original_response"], (str, litellm.CustomStreamWrapper) ) assert isinstance(kwargs["additional_args"], (dict, type(None))) assert isinstance(kwargs["log_event_type"], str) except: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) def log_failure_event(self, kwargs, response_obj, start_time, end_time): try: self.states.append("sync_failure") ## START TIME assert isinstance(start_time, datetime) ## END TIME assert isinstance(end_time, datetime) ## RESPONSE OBJECT assert response_obj == None ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) and isinstance( kwargs["messages"][0], dict ) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) assert ( isinstance(kwargs["input"], list) and isinstance(kwargs["input"][0], dict) ) or isinstance(kwargs["input"], (dict, str)) assert isinstance(kwargs["api_key"], (str, type(None))) assert ( isinstance( kwargs["original_response"], (str, litellm.CustomStreamWrapper) ) or kwargs["original_response"] == None ) assert isinstance(kwargs["additional_args"], (dict, type(None))) assert isinstance(kwargs["log_event_type"], str) except: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) async def async_log_pre_api_call(self, model, messages, kwargs): try: self.states.append("async_pre_api_call") ## MODEL assert isinstance(model, str) ## MESSAGES assert isinstance(messages, list) and isinstance(messages[0], dict) ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) and isinstance( kwargs["messages"][0], dict ) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) except Exception as e: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): try: self.states.append("async_success") ## START TIME assert isinstance(start_time, datetime) ## END TIME assert isinstance(end_time, datetime) ## RESPONSE OBJECT assert isinstance( response_obj, (litellm.ModelResponse, litellm.EmbeddingResponse) ) ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) assert isinstance(kwargs["input"], (list, dict, str)) assert isinstance(kwargs["api_key"], (str, type(None))) assert ( isinstance( kwargs["original_response"], (str, litellm.CustomStreamWrapper) ) or inspect.isasyncgen(kwargs["original_response"]) or inspect.iscoroutine(kwargs["original_response"]) ) assert isinstance(kwargs["additional_args"], (dict, type(None))) assert isinstance(kwargs["log_event_type"], str) assert kwargs["cache_hit"] is None or isinstance(kwargs["cache_hit"], bool) except: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): try: self.states.append("async_failure") ## START TIME assert isinstance(start_time, datetime) ## END TIME assert isinstance(end_time, datetime) ## RESPONSE OBJECT assert response_obj == None ## KWARGS assert isinstance(kwargs["model"], str) assert isinstance(kwargs["messages"], list) assert isinstance(kwargs["optional_params"], dict) assert isinstance(kwargs["litellm_params"], dict) assert isinstance(kwargs["start_time"], (datetime, type(None))) assert isinstance(kwargs["stream"], bool) assert isinstance(kwargs["user"], (str, type(None))) assert isinstance(kwargs["input"], (list, str, dict)) assert isinstance(kwargs["api_key"], (str, type(None))) assert ( isinstance( kwargs["original_response"], (str, litellm.CustomStreamWrapper) ) or inspect.isasyncgen(kwargs["original_response"]) or inspect.iscoroutine(kwargs["original_response"]) or kwargs["original_response"] == None ) assert isinstance(kwargs["additional_args"], (dict, type(None))) assert isinstance(kwargs["log_event_type"], str) except: print(f"Assertion Error: {traceback.format_exc()}") self.errors.append(traceback.format_exc()) # COMPLETION ## Test OpenAI + sync def test_chat_openai_stream(): try: customHandler = CompletionCustomHandler() litellm.callbacks = [customHandler] response = litellm.completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hi 👋 - i'm sync openai"}], ) ## test streaming response = litellm.completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}], stream=True, ) for chunk in response: continue ## test failure callback try: response = litellm.completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}], api_key="my-bad-key", stream=True, ) for chunk in response: continue except: pass time.sleep(1) print(f"customHandler.errors: {customHandler.errors}") assert len(customHandler.errors) == 0 litellm.callbacks = [] except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # test_chat_openai_stream() ## Test OpenAI + Async @pytest.mark.asyncio async def test_async_chat_openai_stream(): try: customHandler = CompletionCustomHandler() litellm.callbacks = [customHandler] response = await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}], ) ## test streaming response = await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}], stream=True, ) async for chunk in response: continue ## test failure callback try: response = await litellm.acompletion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hi 👋 - i'm openai"}], api_key="my-bad-key", stream=True, ) async for chunk in response: continue except: pass time.sleep(1) print(f"customHandler.errors: {customHandler.errors}") assert len(customHandler.errors) == 0 litellm.callbacks = [] except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # asyncio.run(test_async_chat_openai_stream()) ## Test Azure + sync def test_chat_azure_stream(): try: customHandler = CompletionCustomHandler() litellm.callbacks = [customHandler] response = litellm.completion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": "Hi 👋 - i'm sync azure"}], ) # test streaming response = litellm.completion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": "Hi 👋 - i'm sync azure"}], stream=True, ) for chunk in response: continue # test failure callback try: response = litellm.completion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": "Hi 👋 - i'm sync azure"}], api_key="my-bad-key", stream=True, ) for chunk in response: continue except: pass time.sleep(1) print(f"customHandler.errors: {customHandler.errors}") assert len(customHandler.errors) == 0 litellm.callbacks = [] except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # test_chat_azure_stream() ## Test Azure + Async @pytest.mark.asyncio async def test_async_chat_azure_stream(): try: customHandler = CompletionCustomHandler() litellm.callbacks = [customHandler] response = await litellm.acompletion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": "Hi 👋 - i'm async azure"}], ) ## test streaming response = await litellm.acompletion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": "Hi 👋 - i'm async azure"}], stream=True, ) async for chunk in response: continue # test failure callback try: response = await litellm.acompletion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": "Hi 👋 - i'm async azure"}], api_key="my-bad-key", stream=True, ) async for chunk in response: continue except: pass await asyncio.sleep(1) print(f"customHandler.errors: {customHandler.errors}") assert len(customHandler.errors) == 0 litellm.callbacks = [] except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") asyncio.run(test_async_chat_azure_stream()) ## Test Bedrock + sync def test_chat_bedrock_stream(): try: customHandler = CompletionCustomHandler() litellm.callbacks = [customHandler] response = litellm.completion( model="bedrock/anthropic.claude-v1", messages=[{"role": "user", "content": "Hi 👋 - i'm sync bedrock"}], ) # test streaming response = litellm.completion( model="bedrock/anthropic.claude-v1", messages=[{"role": "user", "content": "Hi 👋 - i'm sync bedrock"}], stream=True, ) for chunk in response: continue # test failure callback try: response = litellm.completion( model="bedrock/anthropic.claude-v1", messages=[{"role": "user", "content": "Hi 👋 - i'm sync bedrock"}], aws_region_name="my-bad-region", stream=True, ) for chunk in response: continue except: pass time.sleep(1) print(f"customHandler.errors: {customHandler.errors}") assert len(customHandler.errors) == 0 litellm.callbacks = [] except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # test_chat_bedrock_stream() ## Test Bedrock + Async @pytest.mark.asyncio async def test_async_chat_bedrock_stream(): try: customHandler = CompletionCustomHandler() litellm.callbacks = [customHandler] response = await litellm.acompletion( model="bedrock/anthropic.claude-v1", messages=[{"role": "user", "content": "Hi 👋 - i'm async bedrock"}], ) # test streaming response = await litellm.acompletion( model="bedrock/anthropic.claude-v1", messages=[{"role": "user", "content": "Hi 👋 - i'm async bedrock"}], stream=True, ) print(f"response: {response}") async for chunk in response: print(f"chunk: {chunk}") continue ## test failure callback try: response = await litellm.acompletion( model="bedrock/anthropic.claude-v1", messages=[{"role": "user", "content": "Hi 👋 - i'm async bedrock"}], aws_region_name="my-bad-key", stream=True, ) async for chunk in response: continue except: pass time.sleep(1) print(f"customHandler.errors: {customHandler.errors}") assert len(customHandler.errors) == 0 litellm.callbacks = [] except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # asyncio.run(test_async_chat_bedrock_stream()) # Text Completion ## Test OpenAI text completion + Async @pytest.mark.asyncio async def test_async_text_completion_openai_stream(): try: customHandler = CompletionCustomHandler() litellm.callbacks = [customHandler] response = await litellm.atext_completion( model="gpt-3.5-turbo", prompt="Hi 👋 - i'm async text completion openai", ) # test streaming response = await litellm.atext_completion( model="gpt-3.5-turbo", prompt="Hi 👋 - i'm async text completion openai", stream=True, ) async for chunk in response: print(f"chunk: {chunk}") continue ## test failure callback try: response = await litellm.atext_completion( model="gpt-3.5-turbo", prompt="Hi 👋 - i'm async text completion openai", stream=True, api_key="my-bad-key", ) async for chunk in response: continue except: pass time.sleep(1) print(f"customHandler.errors: {customHandler.errors}") assert len(customHandler.errors) == 0 litellm.callbacks = [] except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # EMBEDDING ## Test OpenAI + Async @pytest.mark.asyncio async def test_async_embedding_openai(): try: customHandler_success = CompletionCustomHandler() customHandler_failure = CompletionCustomHandler() litellm.callbacks = [customHandler_success] response = await litellm.aembedding( model="azure/azure-embedding-model", input=["good morning from litellm"] ) await asyncio.sleep(1) print(f"customHandler_success.errors: {customHandler_success.errors}") print(f"customHandler_success.states: {customHandler_success.states}") assert len(customHandler_success.errors) == 0 assert len(customHandler_success.states) == 3 # pre, post, success # test failure callback litellm.callbacks = [customHandler_failure] try: response = await litellm.aembedding( model="text-embedding-ada-002", input=["good morning from litellm"], api_key="my-bad-key", ) except: pass await asyncio.sleep(1) print(f"customHandler_failure.errors: {customHandler_failure.errors}") print(f"customHandler_failure.states: {customHandler_failure.states}") assert len(customHandler_failure.errors) == 0 assert len(customHandler_failure.states) == 3 # pre, post, failure except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # asyncio.run(test_async_embedding_openai()) ## Test Azure + Async @pytest.mark.asyncio async def test_async_embedding_azure(): try: customHandler_success = CompletionCustomHandler() customHandler_failure = CompletionCustomHandler() litellm.callbacks = [customHandler_success] response = await litellm.aembedding( model="azure/azure-embedding-model", input=["good morning from litellm"] ) await asyncio.sleep(1) print(f"customHandler_success.errors: {customHandler_success.errors}") print(f"customHandler_success.states: {customHandler_success.states}") assert len(customHandler_success.errors) == 0 assert len(customHandler_success.states) == 3 # pre, post, success # test failure callback litellm.callbacks = [customHandler_failure] try: response = await litellm.aembedding( model="azure/azure-embedding-model", input=["good morning from litellm"], api_key="my-bad-key", ) except: pass await asyncio.sleep(1) print(f"customHandler_failure.errors: {customHandler_failure.errors}") print(f"customHandler_failure.states: {customHandler_failure.states}") assert len(customHandler_failure.errors) == 0 assert len(customHandler_failure.states) == 3 # pre, post, success except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # asyncio.run(test_async_embedding_azure()) ## Test Bedrock + Async @pytest.mark.asyncio async def test_async_embedding_bedrock(): try: customHandler_success = CompletionCustomHandler() customHandler_failure = CompletionCustomHandler() litellm.callbacks = [customHandler_success] litellm.set_verbose = True response = await litellm.aembedding( model="bedrock/cohere.embed-multilingual-v3", input=["good morning from litellm"], aws_region_name="os.environ/AWS_REGION_NAME_2", ) await asyncio.sleep(1) print(f"customHandler_success.errors: {customHandler_success.errors}") print(f"customHandler_success.states: {customHandler_success.states}") assert len(customHandler_success.errors) == 0 assert len(customHandler_success.states) == 3 # pre, post, success # test failure callback litellm.callbacks = [customHandler_failure] try: response = await litellm.aembedding( model="bedrock/cohere.embed-multilingual-v3", input=["good morning from litellm"], aws_region_name="my-bad-region", ) except: pass await asyncio.sleep(1) print(f"customHandler_failure.errors: {customHandler_failure.errors}") print(f"customHandler_failure.states: {customHandler_failure.states}") assert len(customHandler_failure.errors) == 0 assert len(customHandler_failure.states) == 3 # pre, post, success except Exception as e: pytest.fail(f"An exception occurred: {str(e)}") # asyncio.run(test_async_embedding_bedrock()) # CACHING ## Test Azure - completion, embedding @pytest.mark.asyncio async def test_async_completion_azure_caching(): customHandler_caching = CompletionCustomHandler() litellm.cache = Cache( type="redis", host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"], password=os.environ["REDIS_PASSWORD"], ) litellm.callbacks = [customHandler_caching] unique_time = time.time() response1 = await litellm.acompletion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": f"Hi 👋 - i'm async azure {unique_time}"}], caching=True, ) await asyncio.sleep(1) print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}") response2 = await litellm.acompletion( model="azure/chatgpt-v-2", messages=[{"role": "user", "content": f"Hi 👋 - i'm async azure {unique_time}"}], caching=True, ) await asyncio.sleep(1) # success callbacks are done in parallel print( f"customHandler_caching.states post-cache hit: {customHandler_caching.states}" ) assert len(customHandler_caching.errors) == 0 assert len(customHandler_caching.states) == 4 # pre, post, success, success @pytest.mark.asyncio async def test_async_embedding_azure_caching(): print("Testing custom callback input - Azure Caching") customHandler_caching = CompletionCustomHandler() litellm.cache = Cache( type="redis", host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"], password=os.environ["REDIS_PASSWORD"], ) litellm.callbacks = [customHandler_caching] unique_time = time.time() response1 = await litellm.aembedding( model="azure/azure-embedding-model", input=[f"good morning from litellm1 {unique_time}"], caching=True, ) await asyncio.sleep(1) # set cache is async for aembedding() response2 = await litellm.aembedding( model="azure/azure-embedding-model", input=[f"good morning from litellm1 {unique_time}"], caching=True, ) await asyncio.sleep(1) # success callbacks are done in parallel print(customHandler_caching.states) assert len(customHandler_caching.errors) == 0 assert len(customHandler_caching.states) == 4 # pre, post, success, success # asyncio.run( # test_async_embedding_azure_caching() # )