### What this tests #### import sys, os, time, inspect, asyncio, traceback import pytest sys.path.insert(0, os.path.abspath('../..')) from litellm import completion, embedding import litellm from litellm.integrations.custom_logger import CustomLogger class MyCustomHandler(CustomLogger): complete_streaming_response_in_callback = "" def __init__(self): self.success: bool = False # type: ignore self.failure: bool = False # type: ignore self.async_success: bool = False # type: ignore self.async_success_embedding: bool = False # type: ignore self.async_failure: bool = False # type: ignore self.async_failure_embedding: bool = False # type: ignore self.async_completion_kwargs = None # type: ignore self.async_embedding_kwargs = None # type: ignore self.async_embedding_response = None # type: ignore self.async_completion_kwargs_fail = None # type: ignore self.async_embedding_kwargs_fail = None # type: ignore self.stream_collected_response = None # type: ignore self.sync_stream_collected_response = None # type: ignore self.user = None # type: ignore self.data_sent_to_api: dict = {} def log_pre_api_call(self, model, messages, kwargs): print(f"Pre-API Call") self.data_sent_to_api = kwargs["additional_args"].get("complete_input_dict", {}) def log_post_api_call(self, kwargs, response_obj, start_time, end_time): print(f"Post-API Call") def log_stream_event(self, kwargs, response_obj, start_time, end_time): print(f"On Stream") def log_success_event(self, kwargs, response_obj, start_time, end_time): print(f"On Success") self.success = True if kwargs.get("stream") == True: self.sync_stream_collected_response = response_obj def log_failure_event(self, kwargs, response_obj, start_time, end_time): print(f"On Failure") self.failure = True async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): print(f"On Async success") print(f"received kwargs user: {kwargs['user']}") self.async_success = True if kwargs.get("model") == "text-embedding-ada-002": self.async_success_embedding = True self.async_embedding_kwargs = kwargs self.async_embedding_response = response_obj if kwargs.get("stream") == True: self.stream_collected_response = response_obj self.async_completion_kwargs = kwargs self.user = kwargs.get("user", None) async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): print(f"On Async Failure") self.async_failure = True if kwargs.get("model") == "text-embedding-ada-002": self.async_failure_embedding = True self.async_embedding_kwargs_fail = kwargs self.async_completion_kwargs_fail = kwargs class TmpFunction: complete_streaming_response_in_callback = "" async_success: bool = False async def async_test_logging_fn(self, kwargs, completion_obj, start_time, end_time): print(f"ON ASYNC LOGGING") self.async_success = True print(f'kwargs.get("complete_streaming_response"): {kwargs.get("complete_streaming_response")}') self.complete_streaming_response_in_callback = kwargs.get("complete_streaming_response") def test_async_chat_openai_stream(): try: tmp_function = TmpFunction() # litellm.set_verbose = True litellm.success_callback = [tmp_function.async_test_logging_fn] complete_streaming_response = "" async def call_gpt(): nonlocal complete_streaming_response response = await litellm.acompletion(model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "Hi 👋 - i'm openai" }], stream=True) async for chunk in response: complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" print(complete_streaming_response) asyncio.run(call_gpt()) complete_streaming_response = complete_streaming_response.strip("'") response1 = tmp_function.complete_streaming_response_in_callback["choices"][0]["message"]["content"] response2 = complete_streaming_response # assert [ord(c) for c in response1] == [ord(c) for c in response2] assert response1 == response2 assert tmp_function.async_success == True except Exception as e: print(e) pytest.fail(f"An error occurred - {str(e)}") # test_async_chat_openai_stream() def test_completion_azure_stream_moderation_failure(): try: customHandler = MyCustomHandler() litellm.callbacks = [customHandler] messages = [ {"role": "system", "content": "You are a helpful assistant."}, { "role": "user", "content": "how do i kill someone", }, ] try: response = completion( model="azure/chatgpt-v-2", messages=messages, stream=True ) for chunk in response: print(f"chunk: {chunk}") continue except Exception as e: print(e) time.sleep(1) assert customHandler.failure == True except Exception as e: pytest.fail(f"Error occurred: {e}") def test_async_custom_handler_stream(): try: # [PROD Test] - Do not DELETE # checks if the model response available in the async + stream callbacks is equal to the received response customHandler2 = MyCustomHandler() litellm.callbacks = [customHandler2] litellm.set_verbose = False messages = [ {"role": "system", "content": "You are a helpful assistant."}, { "role": "user", "content": "write 1 sentence about litellm being amazing", }, ] complete_streaming_response = "" async def test_1(): nonlocal complete_streaming_response response = await litellm.acompletion( model="azure/chatgpt-v-2", messages=messages, stream=True ) async for chunk in response: complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" print(complete_streaming_response) asyncio.run(test_1()) response_in_success_handler = customHandler2.stream_collected_response response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"] print("\n\n") print("response_in_success_handler: ", response_in_success_handler) print("complete_streaming_response: ", complete_streaming_response) assert response_in_success_handler == complete_streaming_response except Exception as e: pytest.fail(f"Error occurred: {e}") # test_async_custom_handler_stream() def test_azure_completion_stream(): # [PROD Test] - Do not DELETE # test if completion() + sync custom logger get the same complete stream response try: # checks if the model response available in the async + stream callbacks is equal to the received response customHandler2 = MyCustomHandler() litellm.callbacks = [customHandler2] litellm.set_verbose = False messages = [ {"role": "system", "content": "You are a helpful assistant."}, { "role": "user", "content": "write 1 sentence about litellm being amazing", }, ] complete_streaming_response = "" response = litellm.completion( model="azure/chatgpt-v-2", messages=messages, stream=True ) for chunk in response: complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" print(complete_streaming_response) time.sleep(0.5) # wait 1/2 second before checking callbacks response_in_success_handler = customHandler2.sync_stream_collected_response response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"] print("\n\n") print("response_in_success_handler: ", response_in_success_handler) print("complete_streaming_response: ", complete_streaming_response) assert response_in_success_handler == complete_streaming_response except Exception as e: pytest.fail(f"Error occurred: {e}") @pytest.mark.asyncio async def test_async_custom_handler_completion(): try: customHandler_success = MyCustomHandler() customHandler_failure = MyCustomHandler() # success assert customHandler_success.async_success == False litellm.callbacks = [customHandler_success] response = await litellm.acompletion( model="gpt-3.5-turbo", messages=[{ "role": "user", "content": "hello from litellm test", }] ) await asyncio.sleep(1) assert customHandler_success.async_success == True, "async success is not set to True even after success" assert customHandler_success.async_completion_kwargs.get("model") == "gpt-3.5-turbo" # failure litellm.callbacks = [customHandler_failure] messages = [ {"role": "system", "content": "You are a helpful assistant."}, { "role": "user", "content": "how do i kill someone", }, ] assert customHandler_failure.async_failure == False try: response = await litellm.acompletion( model="gpt-3.5-turbo", messages=messages, api_key="my-bad-key", ) except: pass assert customHandler_failure.async_failure == True, "async failure is not set to True even after failure" assert customHandler_failure.async_completion_kwargs_fail.get("model") == "gpt-3.5-turbo" assert len(str(customHandler_failure.async_completion_kwargs_fail.get("exception"))) > 10 # expect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119 litellm.callbacks = [] print("Passed setting async failure") except Exception as e: pytest.fail(f"An exception occurred - {str(e)}") # asyncio.run(test_async_custom_handler_completion()) @pytest.mark.asyncio async def test_async_custom_handler_embedding(): try: customHandler_embedding = MyCustomHandler() litellm.callbacks = [customHandler_embedding] # success assert customHandler_embedding.async_success_embedding == False response = await litellm.aembedding( model="text-embedding-ada-002", input = ["hello world"], ) await asyncio.sleep(1) assert customHandler_embedding.async_success_embedding == True, "async_success_embedding is not set to True even after success" assert customHandler_embedding.async_embedding_kwargs.get("model") == "text-embedding-ada-002" assert customHandler_embedding.async_embedding_response["usage"]["prompt_tokens"] ==2 print("Passed setting async success: Embedding") # failure assert customHandler_embedding.async_failure_embedding == False try: response = await litellm.aembedding( model="text-embedding-ada-002", input = ["hello world"], api_key="my-bad-key", ) except: pass assert customHandler_embedding.async_failure_embedding == True, "async failure embedding is not set to True even after failure" assert customHandler_embedding.async_embedding_kwargs_fail.get("model") == "text-embedding-ada-002" assert len(str(customHandler_embedding.async_embedding_kwargs_fail.get("exception"))) > 10 # exppect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119 except Exception as e: pytest.fail(f"An exception occurred - {str(e)}") # asyncio.run(test_async_custom_handler_embedding()) @pytest.mark.asyncio async def test_async_custom_handler_embedding_optional_param(): """ Tests if the openai optional params for embedding - user + encoding_format, are logged """ customHandler_optional_params = MyCustomHandler() litellm.callbacks = [customHandler_optional_params] response = await litellm.aembedding( model="azure/azure-embedding-model", input = ["hello world"], user = "John" ) await asyncio.sleep(1) # success callback is async assert customHandler_optional_params.user == "John" assert customHandler_optional_params.user == customHandler_optional_params.data_sent_to_api["user"] # asyncio.run(test_async_custom_handler_embedding_optional_param()) @pytest.mark.asyncio async def test_async_custom_handler_embedding_optional_param_bedrock(): """ Tests if the openai optional params for embedding - user + encoding_format, are logged but makes sure these are not sent to the non-openai/azure endpoint (raises errors). """ litellm.drop_params = True litellm.set_verbose = True customHandler_optional_params = MyCustomHandler() litellm.callbacks = [customHandler_optional_params] response = await litellm.aembedding( model="bedrock/amazon.titan-embed-text-v1", input = ["hello world"], user = "John" ) await asyncio.sleep(1) # success callback is async assert customHandler_optional_params.user == "John" assert "user" not in customHandler_optional_params.data_sent_to_api def test_redis_cache_completion_stream(): from litellm import Cache # Important Test - This tests if we can add to streaming cache, when custom callbacks are set import random try: print("\nrunning test_redis_cache_completion_stream") litellm.set_verbose = True random_number = random.randint(1, 100000) # add a random number to ensure it's always adding / reading from cache messages = [{"role": "user", "content": f"write a one sentence poem about: {random_number}"}] litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD']) print("test for caching, streaming + completion") response1 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True) response_1_content = "" for chunk in response1: print(chunk) response_1_content += chunk.choices[0].delta.content or "" print(response_1_content) time.sleep(0.1) # sleep for 0.1 seconds allow set cache to occur response2 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True) response_2_content = "" for chunk in response2: print(chunk) response_2_content += chunk.choices[0].delta.content or "" print("\nresponse 1", response_1_content) print("\nresponse 2", response_2_content) assert response_1_content == response_2_content, f"Response 1 != Response 2. Same params, Response 1{response_1_content} != Response 2{response_2_content}" litellm.success_callback = [] litellm._async_success_callback = [] litellm.cache = None except Exception as e: print(e) litellm.success_callback = [] raise e # test_redis_cache_completion_stream()