|
|
|
import sys, os, time, inspect, asyncio, traceback |
|
import pytest |
|
sys.path.insert(0, os.path.abspath('../..')) |
|
|
|
from litellm import completion, embedding |
|
import litellm |
|
from litellm.integrations.custom_logger import CustomLogger |
|
|
|
class MyCustomHandler(CustomLogger): |
|
complete_streaming_response_in_callback = "" |
|
def __init__(self): |
|
self.success: bool = False |
|
self.failure: bool = False |
|
self.async_success: bool = False |
|
self.async_success_embedding: bool = False |
|
self.async_failure: bool = False |
|
self.async_failure_embedding: bool = False |
|
|
|
self.async_completion_kwargs = None |
|
self.async_embedding_kwargs = None |
|
self.async_embedding_response = None |
|
|
|
self.async_completion_kwargs_fail = None |
|
self.async_embedding_kwargs_fail = None |
|
|
|
self.stream_collected_response = None |
|
self.sync_stream_collected_response = None |
|
self.user = None |
|
self.data_sent_to_api: dict = {} |
|
|
|
def log_pre_api_call(self, model, messages, kwargs): |
|
print(f"Pre-API Call") |
|
self.data_sent_to_api = kwargs["additional_args"].get("complete_input_dict", {}) |
|
|
|
def log_post_api_call(self, kwargs, response_obj, start_time, end_time): |
|
print(f"Post-API Call") |
|
|
|
def log_stream_event(self, kwargs, response_obj, start_time, end_time): |
|
print(f"On Stream") |
|
|
|
def log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
print(f"On Success") |
|
self.success = True |
|
if kwargs.get("stream") == True: |
|
self.sync_stream_collected_response = response_obj |
|
|
|
|
|
def log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
print(f"On Failure") |
|
self.failure = True |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
print(f"On Async success") |
|
print(f"received kwargs user: {kwargs['user']}") |
|
self.async_success = True |
|
if kwargs.get("model") == "text-embedding-ada-002": |
|
self.async_success_embedding = True |
|
self.async_embedding_kwargs = kwargs |
|
self.async_embedding_response = response_obj |
|
if kwargs.get("stream") == True: |
|
self.stream_collected_response = response_obj |
|
self.async_completion_kwargs = kwargs |
|
self.user = kwargs.get("user", None) |
|
|
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
print(f"On Async Failure") |
|
self.async_failure = True |
|
if kwargs.get("model") == "text-embedding-ada-002": |
|
self.async_failure_embedding = True |
|
self.async_embedding_kwargs_fail = kwargs |
|
|
|
self.async_completion_kwargs_fail = kwargs |
|
|
|
class TmpFunction: |
|
complete_streaming_response_in_callback = "" |
|
async_success: bool = False |
|
async def async_test_logging_fn(self, kwargs, completion_obj, start_time, end_time): |
|
print(f"ON ASYNC LOGGING") |
|
self.async_success = True |
|
print(f'kwargs.get("complete_streaming_response"): {kwargs.get("complete_streaming_response")}') |
|
self.complete_streaming_response_in_callback = kwargs.get("complete_streaming_response") |
|
|
|
|
|
def test_async_chat_openai_stream(): |
|
try: |
|
tmp_function = TmpFunction() |
|
|
|
litellm.success_callback = [tmp_function.async_test_logging_fn] |
|
complete_streaming_response = "" |
|
async def call_gpt(): |
|
nonlocal complete_streaming_response |
|
response = await litellm.acompletion(model="gpt-3.5-turbo", |
|
messages=[{ |
|
"role": "user", |
|
"content": "Hi 👋 - i'm openai" |
|
}], |
|
stream=True) |
|
async for chunk in response: |
|
complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" |
|
print(complete_streaming_response) |
|
asyncio.run(call_gpt()) |
|
complete_streaming_response = complete_streaming_response.strip("'") |
|
response1 = tmp_function.complete_streaming_response_in_callback["choices"][0]["message"]["content"] |
|
response2 = complete_streaming_response |
|
|
|
assert response1 == response2 |
|
assert tmp_function.async_success == True |
|
except Exception as e: |
|
print(e) |
|
pytest.fail(f"An error occurred - {str(e)}") |
|
|
|
|
|
def test_completion_azure_stream_moderation_failure(): |
|
try: |
|
customHandler = MyCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
messages = [ |
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
{ |
|
"role": "user", |
|
"content": "how do i kill someone", |
|
}, |
|
] |
|
try: |
|
response = completion( |
|
model="azure/chatgpt-v-2", messages=messages, stream=True |
|
) |
|
for chunk in response: |
|
print(f"chunk: {chunk}") |
|
continue |
|
except Exception as e: |
|
print(e) |
|
time.sleep(1) |
|
assert customHandler.failure == True |
|
except Exception as e: |
|
pytest.fail(f"Error occurred: {e}") |
|
|
|
|
|
def test_async_custom_handler_stream(): |
|
try: |
|
|
|
|
|
customHandler2 = MyCustomHandler() |
|
litellm.callbacks = [customHandler2] |
|
litellm.set_verbose = False |
|
messages = [ |
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
{ |
|
"role": "user", |
|
"content": "write 1 sentence about litellm being amazing", |
|
}, |
|
] |
|
complete_streaming_response = "" |
|
async def test_1(): |
|
nonlocal complete_streaming_response |
|
response = await litellm.acompletion( |
|
model="azure/chatgpt-v-2", |
|
messages=messages, |
|
stream=True |
|
) |
|
async for chunk in response: |
|
complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" |
|
print(complete_streaming_response) |
|
|
|
asyncio.run(test_1()) |
|
|
|
response_in_success_handler = customHandler2.stream_collected_response |
|
response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"] |
|
print("\n\n") |
|
print("response_in_success_handler: ", response_in_success_handler) |
|
print("complete_streaming_response: ", complete_streaming_response) |
|
assert response_in_success_handler == complete_streaming_response |
|
except Exception as e: |
|
pytest.fail(f"Error occurred: {e}") |
|
|
|
|
|
|
|
def test_azure_completion_stream(): |
|
|
|
|
|
try: |
|
|
|
customHandler2 = MyCustomHandler() |
|
litellm.callbacks = [customHandler2] |
|
litellm.set_verbose = False |
|
messages = [ |
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
{ |
|
"role": "user", |
|
"content": "write 1 sentence about litellm being amazing", |
|
}, |
|
] |
|
complete_streaming_response = "" |
|
|
|
response = litellm.completion( |
|
model="azure/chatgpt-v-2", |
|
messages=messages, |
|
stream=True |
|
) |
|
for chunk in response: |
|
complete_streaming_response += chunk["choices"][0]["delta"]["content"] or "" |
|
print(complete_streaming_response) |
|
|
|
time.sleep(0.5) |
|
response_in_success_handler = customHandler2.sync_stream_collected_response |
|
response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"] |
|
print("\n\n") |
|
print("response_in_success_handler: ", response_in_success_handler) |
|
print("complete_streaming_response: ", complete_streaming_response) |
|
assert response_in_success_handler == complete_streaming_response |
|
except Exception as e: |
|
pytest.fail(f"Error occurred: {e}") |
|
|
|
@pytest.mark.asyncio |
|
async def test_async_custom_handler_completion(): |
|
try: |
|
customHandler_success = MyCustomHandler() |
|
customHandler_failure = MyCustomHandler() |
|
|
|
assert customHandler_success.async_success == False |
|
litellm.callbacks = [customHandler_success] |
|
response = await litellm.acompletion( |
|
model="gpt-3.5-turbo", |
|
messages=[{ |
|
"role": "user", |
|
"content": "hello from litellm test", |
|
}] |
|
) |
|
await asyncio.sleep(1) |
|
assert customHandler_success.async_success == True, "async success is not set to True even after success" |
|
assert customHandler_success.async_completion_kwargs.get("model") == "gpt-3.5-turbo" |
|
|
|
litellm.callbacks = [customHandler_failure] |
|
messages = [ |
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
{ |
|
"role": "user", |
|
"content": "how do i kill someone", |
|
}, |
|
] |
|
|
|
assert customHandler_failure.async_failure == False |
|
try: |
|
response = await litellm.acompletion( |
|
model="gpt-3.5-turbo", |
|
messages=messages, |
|
api_key="my-bad-key", |
|
) |
|
except: |
|
pass |
|
assert customHandler_failure.async_failure == True, "async failure is not set to True even after failure" |
|
assert customHandler_failure.async_completion_kwargs_fail.get("model") == "gpt-3.5-turbo" |
|
assert len(str(customHandler_failure.async_completion_kwargs_fail.get("exception"))) > 10 |
|
litellm.callbacks = [] |
|
print("Passed setting async failure") |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred - {str(e)}") |
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_custom_handler_embedding(): |
|
try: |
|
customHandler_embedding = MyCustomHandler() |
|
litellm.callbacks = [customHandler_embedding] |
|
|
|
assert customHandler_embedding.async_success_embedding == False |
|
response = await litellm.aembedding( |
|
model="text-embedding-ada-002", |
|
input = ["hello world"], |
|
) |
|
await asyncio.sleep(1) |
|
assert customHandler_embedding.async_success_embedding == True, "async_success_embedding is not set to True even after success" |
|
assert customHandler_embedding.async_embedding_kwargs.get("model") == "text-embedding-ada-002" |
|
assert customHandler_embedding.async_embedding_response["usage"]["prompt_tokens"] ==2 |
|
print("Passed setting async success: Embedding") |
|
|
|
assert customHandler_embedding.async_failure_embedding == False |
|
try: |
|
response = await litellm.aembedding( |
|
model="text-embedding-ada-002", |
|
input = ["hello world"], |
|
api_key="my-bad-key", |
|
) |
|
except: |
|
pass |
|
assert customHandler_embedding.async_failure_embedding == True, "async failure embedding is not set to True even after failure" |
|
assert customHandler_embedding.async_embedding_kwargs_fail.get("model") == "text-embedding-ada-002" |
|
assert len(str(customHandler_embedding.async_embedding_kwargs_fail.get("exception"))) > 10 |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred - {str(e)}") |
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_custom_handler_embedding_optional_param(): |
|
""" |
|
Tests if the openai optional params for embedding - user + encoding_format, |
|
are logged |
|
""" |
|
customHandler_optional_params = MyCustomHandler() |
|
litellm.callbacks = [customHandler_optional_params] |
|
response = await litellm.aembedding( |
|
model="azure/azure-embedding-model", |
|
input = ["hello world"], |
|
user = "John" |
|
) |
|
await asyncio.sleep(1) |
|
assert customHandler_optional_params.user == "John" |
|
assert customHandler_optional_params.user == customHandler_optional_params.data_sent_to_api["user"] |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_custom_handler_embedding_optional_param_bedrock(): |
|
""" |
|
Tests if the openai optional params for embedding - user + encoding_format, |
|
are logged |
|
|
|
but makes sure these are not sent to the non-openai/azure endpoint (raises errors). |
|
""" |
|
litellm.drop_params = True |
|
litellm.set_verbose = True |
|
customHandler_optional_params = MyCustomHandler() |
|
litellm.callbacks = [customHandler_optional_params] |
|
response = await litellm.aembedding( |
|
model="bedrock/amazon.titan-embed-text-v1", |
|
input = ["hello world"], |
|
user = "John" |
|
) |
|
await asyncio.sleep(1) |
|
assert customHandler_optional_params.user == "John" |
|
assert "user" not in customHandler_optional_params.data_sent_to_api |
|
|
|
|
|
def test_redis_cache_completion_stream(): |
|
from litellm import Cache |
|
|
|
import random |
|
try: |
|
print("\nrunning test_redis_cache_completion_stream") |
|
litellm.set_verbose = True |
|
random_number = random.randint(1, 100000) |
|
messages = [{"role": "user", "content": f"write a one sentence poem about: {random_number}"}] |
|
litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD']) |
|
print("test for caching, streaming + completion") |
|
response1 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True) |
|
response_1_content = "" |
|
for chunk in response1: |
|
print(chunk) |
|
response_1_content += chunk.choices[0].delta.content or "" |
|
print(response_1_content) |
|
|
|
time.sleep(0.1) |
|
response2 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True) |
|
response_2_content = "" |
|
for chunk in response2: |
|
print(chunk) |
|
response_2_content += chunk.choices[0].delta.content or "" |
|
print("\nresponse 1", response_1_content) |
|
print("\nresponse 2", response_2_content) |
|
assert response_1_content == response_2_content, f"Response 1 != Response 2. Same params, Response 1{response_1_content} != Response 2{response_2_content}" |
|
litellm.success_callback = [] |
|
litellm._async_success_callback = [] |
|
litellm.cache = None |
|
except Exception as e: |
|
print(e) |
|
litellm.success_callback = [] |
|
raise e |
|
|