|
|
|
|
|
import sys, os, time, inspect, asyncio, traceback |
|
from datetime import datetime |
|
import pytest |
|
|
|
sys.path.insert(0, os.path.abspath("../..")) |
|
from typing import Optional, Literal, List, Union |
|
from litellm import completion, embedding, Cache |
|
import litellm |
|
from litellm.integrations.custom_logger import CustomLogger |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CompletionCustomHandler( |
|
CustomLogger |
|
): |
|
""" |
|
The set of expected inputs to a custom handler for a |
|
""" |
|
|
|
|
|
def __init__(self): |
|
self.errors = [] |
|
self.states: Optional[ |
|
List[ |
|
Literal[ |
|
"sync_pre_api_call", |
|
"async_pre_api_call", |
|
"post_api_call", |
|
"sync_stream", |
|
"async_stream", |
|
"sync_success", |
|
"async_success", |
|
"sync_failure", |
|
"async_failure", |
|
] |
|
] |
|
] = [] |
|
|
|
def log_pre_api_call(self, model, messages, kwargs): |
|
try: |
|
self.states.append("sync_pre_api_call") |
|
|
|
assert isinstance(model, str) |
|
|
|
assert isinstance(messages, list) |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
except Exception as e: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
def log_post_api_call(self, kwargs, response_obj, start_time, end_time): |
|
try: |
|
self.states.append("post_api_call") |
|
|
|
assert isinstance(start_time, datetime) |
|
|
|
assert end_time == None |
|
|
|
assert response_obj == None |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
assert isinstance(kwargs["input"], (list, dict, str)) |
|
assert isinstance(kwargs["api_key"], (str, type(None))) |
|
assert ( |
|
isinstance( |
|
kwargs["original_response"], (str, litellm.CustomStreamWrapper) |
|
) |
|
or inspect.iscoroutine(kwargs["original_response"]) |
|
or inspect.isasyncgen(kwargs["original_response"]) |
|
) |
|
assert isinstance(kwargs["additional_args"], (dict, type(None))) |
|
assert isinstance(kwargs["log_event_type"], str) |
|
except: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time): |
|
try: |
|
self.states.append("async_stream") |
|
|
|
assert isinstance(start_time, datetime) |
|
|
|
assert isinstance(end_time, datetime) |
|
|
|
assert isinstance(response_obj, litellm.ModelResponse) |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) and isinstance( |
|
kwargs["messages"][0], dict |
|
) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
assert ( |
|
isinstance(kwargs["input"], list) |
|
and isinstance(kwargs["input"][0], dict) |
|
) or isinstance(kwargs["input"], (dict, str)) |
|
assert isinstance(kwargs["api_key"], (str, type(None))) |
|
assert ( |
|
isinstance( |
|
kwargs["original_response"], (str, litellm.CustomStreamWrapper) |
|
) |
|
or inspect.isasyncgen(kwargs["original_response"]) |
|
or inspect.iscoroutine(kwargs["original_response"]) |
|
) |
|
assert isinstance(kwargs["additional_args"], (dict, type(None))) |
|
assert isinstance(kwargs["log_event_type"], str) |
|
except: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
def log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
try: |
|
self.states.append("sync_success") |
|
|
|
assert isinstance(start_time, datetime) |
|
|
|
assert isinstance(end_time, datetime) |
|
|
|
assert isinstance(response_obj, litellm.ModelResponse) |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) and isinstance( |
|
kwargs["messages"][0], dict |
|
) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
assert ( |
|
isinstance(kwargs["input"], list) |
|
and isinstance(kwargs["input"][0], dict) |
|
) or isinstance(kwargs["input"], (dict, str)) |
|
assert isinstance(kwargs["api_key"], (str, type(None))) |
|
assert isinstance( |
|
kwargs["original_response"], (str, litellm.CustomStreamWrapper) |
|
) |
|
assert isinstance(kwargs["additional_args"], (dict, type(None))) |
|
assert isinstance(kwargs["log_event_type"], str) |
|
except: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
def log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
try: |
|
self.states.append("sync_failure") |
|
|
|
assert isinstance(start_time, datetime) |
|
|
|
assert isinstance(end_time, datetime) |
|
|
|
assert response_obj == None |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) and isinstance( |
|
kwargs["messages"][0], dict |
|
) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
assert ( |
|
isinstance(kwargs["input"], list) |
|
and isinstance(kwargs["input"][0], dict) |
|
) or isinstance(kwargs["input"], (dict, str)) |
|
assert isinstance(kwargs["api_key"], (str, type(None))) |
|
assert ( |
|
isinstance( |
|
kwargs["original_response"], (str, litellm.CustomStreamWrapper) |
|
) |
|
or kwargs["original_response"] == None |
|
) |
|
assert isinstance(kwargs["additional_args"], (dict, type(None))) |
|
assert isinstance(kwargs["log_event_type"], str) |
|
except: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
async def async_log_pre_api_call(self, model, messages, kwargs): |
|
try: |
|
self.states.append("async_pre_api_call") |
|
|
|
assert isinstance(model, str) |
|
|
|
assert isinstance(messages, list) and isinstance(messages[0], dict) |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) and isinstance( |
|
kwargs["messages"][0], dict |
|
) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
except Exception as e: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): |
|
try: |
|
self.states.append("async_success") |
|
|
|
assert isinstance(start_time, datetime) |
|
|
|
assert isinstance(end_time, datetime) |
|
|
|
assert isinstance( |
|
response_obj, (litellm.ModelResponse, litellm.EmbeddingResponse) |
|
) |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
assert isinstance(kwargs["input"], (list, dict, str)) |
|
assert isinstance(kwargs["api_key"], (str, type(None))) |
|
assert ( |
|
isinstance( |
|
kwargs["original_response"], (str, litellm.CustomStreamWrapper) |
|
) |
|
or inspect.isasyncgen(kwargs["original_response"]) |
|
or inspect.iscoroutine(kwargs["original_response"]) |
|
) |
|
assert isinstance(kwargs["additional_args"], (dict, type(None))) |
|
assert isinstance(kwargs["log_event_type"], str) |
|
assert kwargs["cache_hit"] is None or isinstance(kwargs["cache_hit"], bool) |
|
except: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): |
|
try: |
|
self.states.append("async_failure") |
|
|
|
assert isinstance(start_time, datetime) |
|
|
|
assert isinstance(end_time, datetime) |
|
|
|
assert response_obj == None |
|
|
|
assert isinstance(kwargs["model"], str) |
|
assert isinstance(kwargs["messages"], list) |
|
assert isinstance(kwargs["optional_params"], dict) |
|
assert isinstance(kwargs["litellm_params"], dict) |
|
assert isinstance(kwargs["start_time"], (datetime, type(None))) |
|
assert isinstance(kwargs["stream"], bool) |
|
assert isinstance(kwargs["user"], (str, type(None))) |
|
assert isinstance(kwargs["input"], (list, str, dict)) |
|
assert isinstance(kwargs["api_key"], (str, type(None))) |
|
assert ( |
|
isinstance( |
|
kwargs["original_response"], (str, litellm.CustomStreamWrapper) |
|
) |
|
or inspect.isasyncgen(kwargs["original_response"]) |
|
or inspect.iscoroutine(kwargs["original_response"]) |
|
or kwargs["original_response"] == None |
|
) |
|
assert isinstance(kwargs["additional_args"], (dict, type(None))) |
|
assert isinstance(kwargs["log_event_type"], str) |
|
except: |
|
print(f"Assertion Error: {traceback.format_exc()}") |
|
self.errors.append(traceback.format_exc()) |
|
|
|
|
|
|
|
|
|
def test_chat_openai_stream(): |
|
try: |
|
customHandler = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
response = litellm.completion( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": "Hi π - i'm sync openai"}], |
|
) |
|
|
|
response = litellm.completion( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": "Hi π - i'm openai"}], |
|
stream=True, |
|
) |
|
for chunk in response: |
|
continue |
|
|
|
try: |
|
response = litellm.completion( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": "Hi π - i'm openai"}], |
|
api_key="my-bad-key", |
|
stream=True, |
|
) |
|
for chunk in response: |
|
continue |
|
except: |
|
pass |
|
time.sleep(1) |
|
print(f"customHandler.errors: {customHandler.errors}") |
|
assert len(customHandler.errors) == 0 |
|
litellm.callbacks = [] |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_chat_openai_stream(): |
|
try: |
|
customHandler = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
response = await litellm.acompletion( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": "Hi π - i'm openai"}], |
|
) |
|
|
|
response = await litellm.acompletion( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": "Hi π - i'm openai"}], |
|
stream=True, |
|
) |
|
async for chunk in response: |
|
continue |
|
|
|
try: |
|
response = await litellm.acompletion( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": "Hi π - i'm openai"}], |
|
api_key="my-bad-key", |
|
stream=True, |
|
) |
|
async for chunk in response: |
|
continue |
|
except: |
|
pass |
|
time.sleep(1) |
|
print(f"customHandler.errors: {customHandler.errors}") |
|
assert len(customHandler.errors) == 0 |
|
litellm.callbacks = [] |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_chat_azure_stream(): |
|
try: |
|
customHandler = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
response = litellm.completion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": "Hi π - i'm sync azure"}], |
|
) |
|
|
|
response = litellm.completion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": "Hi π - i'm sync azure"}], |
|
stream=True, |
|
) |
|
for chunk in response: |
|
continue |
|
|
|
try: |
|
response = litellm.completion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": "Hi π - i'm sync azure"}], |
|
api_key="my-bad-key", |
|
stream=True, |
|
) |
|
for chunk in response: |
|
continue |
|
except: |
|
pass |
|
time.sleep(1) |
|
print(f"customHandler.errors: {customHandler.errors}") |
|
assert len(customHandler.errors) == 0 |
|
litellm.callbacks = [] |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_chat_azure_stream(): |
|
try: |
|
customHandler = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
response = await litellm.acompletion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": "Hi π - i'm async azure"}], |
|
) |
|
|
|
response = await litellm.acompletion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": "Hi π - i'm async azure"}], |
|
stream=True, |
|
) |
|
async for chunk in response: |
|
continue |
|
|
|
try: |
|
response = await litellm.acompletion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": "Hi π - i'm async azure"}], |
|
api_key="my-bad-key", |
|
stream=True, |
|
) |
|
async for chunk in response: |
|
continue |
|
except: |
|
pass |
|
await asyncio.sleep(1) |
|
print(f"customHandler.errors: {customHandler.errors}") |
|
assert len(customHandler.errors) == 0 |
|
litellm.callbacks = [] |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
asyncio.run(test_async_chat_azure_stream()) |
|
|
|
|
|
|
|
def test_chat_bedrock_stream(): |
|
try: |
|
customHandler = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
response = litellm.completion( |
|
model="bedrock/anthropic.claude-v1", |
|
messages=[{"role": "user", "content": "Hi π - i'm sync bedrock"}], |
|
) |
|
|
|
response = litellm.completion( |
|
model="bedrock/anthropic.claude-v1", |
|
messages=[{"role": "user", "content": "Hi π - i'm sync bedrock"}], |
|
stream=True, |
|
) |
|
for chunk in response: |
|
continue |
|
|
|
try: |
|
response = litellm.completion( |
|
model="bedrock/anthropic.claude-v1", |
|
messages=[{"role": "user", "content": "Hi π - i'm sync bedrock"}], |
|
aws_region_name="my-bad-region", |
|
stream=True, |
|
) |
|
for chunk in response: |
|
continue |
|
except: |
|
pass |
|
time.sleep(1) |
|
print(f"customHandler.errors: {customHandler.errors}") |
|
assert len(customHandler.errors) == 0 |
|
litellm.callbacks = [] |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_chat_bedrock_stream(): |
|
try: |
|
customHandler = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
response = await litellm.acompletion( |
|
model="bedrock/anthropic.claude-v1", |
|
messages=[{"role": "user", "content": "Hi π - i'm async bedrock"}], |
|
) |
|
|
|
response = await litellm.acompletion( |
|
model="bedrock/anthropic.claude-v1", |
|
messages=[{"role": "user", "content": "Hi π - i'm async bedrock"}], |
|
stream=True, |
|
) |
|
print(f"response: {response}") |
|
async for chunk in response: |
|
print(f"chunk: {chunk}") |
|
continue |
|
|
|
try: |
|
response = await litellm.acompletion( |
|
model="bedrock/anthropic.claude-v1", |
|
messages=[{"role": "user", "content": "Hi π - i'm async bedrock"}], |
|
aws_region_name="my-bad-key", |
|
stream=True, |
|
) |
|
async for chunk in response: |
|
continue |
|
except: |
|
pass |
|
time.sleep(1) |
|
print(f"customHandler.errors: {customHandler.errors}") |
|
assert len(customHandler.errors) == 0 |
|
litellm.callbacks = [] |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_text_completion_openai_stream(): |
|
try: |
|
customHandler = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler] |
|
response = await litellm.atext_completion( |
|
model="gpt-3.5-turbo", |
|
prompt="Hi π - i'm async text completion openai", |
|
) |
|
|
|
response = await litellm.atext_completion( |
|
model="gpt-3.5-turbo", |
|
prompt="Hi π - i'm async text completion openai", |
|
stream=True, |
|
) |
|
async for chunk in response: |
|
print(f"chunk: {chunk}") |
|
continue |
|
|
|
try: |
|
response = await litellm.atext_completion( |
|
model="gpt-3.5-turbo", |
|
prompt="Hi π - i'm async text completion openai", |
|
stream=True, |
|
api_key="my-bad-key", |
|
) |
|
async for chunk in response: |
|
continue |
|
except: |
|
pass |
|
time.sleep(1) |
|
print(f"customHandler.errors: {customHandler.errors}") |
|
assert len(customHandler.errors) == 0 |
|
litellm.callbacks = [] |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_embedding_openai(): |
|
try: |
|
customHandler_success = CompletionCustomHandler() |
|
customHandler_failure = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler_success] |
|
response = await litellm.aembedding( |
|
model="azure/azure-embedding-model", input=["good morning from litellm"] |
|
) |
|
await asyncio.sleep(1) |
|
print(f"customHandler_success.errors: {customHandler_success.errors}") |
|
print(f"customHandler_success.states: {customHandler_success.states}") |
|
assert len(customHandler_success.errors) == 0 |
|
assert len(customHandler_success.states) == 3 |
|
|
|
litellm.callbacks = [customHandler_failure] |
|
try: |
|
response = await litellm.aembedding( |
|
model="text-embedding-ada-002", |
|
input=["good morning from litellm"], |
|
api_key="my-bad-key", |
|
) |
|
except: |
|
pass |
|
await asyncio.sleep(1) |
|
print(f"customHandler_failure.errors: {customHandler_failure.errors}") |
|
print(f"customHandler_failure.states: {customHandler_failure.states}") |
|
assert len(customHandler_failure.errors) == 0 |
|
assert len(customHandler_failure.states) == 3 |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_embedding_azure(): |
|
try: |
|
customHandler_success = CompletionCustomHandler() |
|
customHandler_failure = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler_success] |
|
response = await litellm.aembedding( |
|
model="azure/azure-embedding-model", input=["good morning from litellm"] |
|
) |
|
await asyncio.sleep(1) |
|
print(f"customHandler_success.errors: {customHandler_success.errors}") |
|
print(f"customHandler_success.states: {customHandler_success.states}") |
|
assert len(customHandler_success.errors) == 0 |
|
assert len(customHandler_success.states) == 3 |
|
|
|
litellm.callbacks = [customHandler_failure] |
|
try: |
|
response = await litellm.aembedding( |
|
model="azure/azure-embedding-model", |
|
input=["good morning from litellm"], |
|
api_key="my-bad-key", |
|
) |
|
except: |
|
pass |
|
await asyncio.sleep(1) |
|
print(f"customHandler_failure.errors: {customHandler_failure.errors}") |
|
print(f"customHandler_failure.states: {customHandler_failure.states}") |
|
assert len(customHandler_failure.errors) == 0 |
|
assert len(customHandler_failure.states) == 3 |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_embedding_bedrock(): |
|
try: |
|
customHandler_success = CompletionCustomHandler() |
|
customHandler_failure = CompletionCustomHandler() |
|
litellm.callbacks = [customHandler_success] |
|
litellm.set_verbose = True |
|
response = await litellm.aembedding( |
|
model="bedrock/cohere.embed-multilingual-v3", |
|
input=["good morning from litellm"], |
|
aws_region_name="os.environ/AWS_REGION_NAME_2", |
|
) |
|
await asyncio.sleep(1) |
|
print(f"customHandler_success.errors: {customHandler_success.errors}") |
|
print(f"customHandler_success.states: {customHandler_success.states}") |
|
assert len(customHandler_success.errors) == 0 |
|
assert len(customHandler_success.states) == 3 |
|
|
|
litellm.callbacks = [customHandler_failure] |
|
try: |
|
response = await litellm.aembedding( |
|
model="bedrock/cohere.embed-multilingual-v3", |
|
input=["good morning from litellm"], |
|
aws_region_name="my-bad-region", |
|
) |
|
except: |
|
pass |
|
await asyncio.sleep(1) |
|
print(f"customHandler_failure.errors: {customHandler_failure.errors}") |
|
print(f"customHandler_failure.states: {customHandler_failure.states}") |
|
assert len(customHandler_failure.errors) == 0 |
|
assert len(customHandler_failure.states) == 3 |
|
except Exception as e: |
|
pytest.fail(f"An exception occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_completion_azure_caching(): |
|
customHandler_caching = CompletionCustomHandler() |
|
litellm.cache = Cache( |
|
type="redis", |
|
host=os.environ["REDIS_HOST"], |
|
port=os.environ["REDIS_PORT"], |
|
password=os.environ["REDIS_PASSWORD"], |
|
) |
|
litellm.callbacks = [customHandler_caching] |
|
unique_time = time.time() |
|
response1 = await litellm.acompletion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": f"Hi π - i'm async azure {unique_time}"}], |
|
caching=True, |
|
) |
|
await asyncio.sleep(1) |
|
print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}") |
|
response2 = await litellm.acompletion( |
|
model="azure/chatgpt-v-2", |
|
messages=[{"role": "user", "content": f"Hi π - i'm async azure {unique_time}"}], |
|
caching=True, |
|
) |
|
await asyncio.sleep(1) |
|
print( |
|
f"customHandler_caching.states post-cache hit: {customHandler_caching.states}" |
|
) |
|
assert len(customHandler_caching.errors) == 0 |
|
assert len(customHandler_caching.states) == 4 |
|
|
|
|
|
@pytest.mark.asyncio |
|
async def test_async_embedding_azure_caching(): |
|
print("Testing custom callback input - Azure Caching") |
|
customHandler_caching = CompletionCustomHandler() |
|
litellm.cache = Cache( |
|
type="redis", |
|
host=os.environ["REDIS_HOST"], |
|
port=os.environ["REDIS_PORT"], |
|
password=os.environ["REDIS_PASSWORD"], |
|
) |
|
litellm.callbacks = [customHandler_caching] |
|
unique_time = time.time() |
|
response1 = await litellm.aembedding( |
|
model="azure/azure-embedding-model", |
|
input=[f"good morning from litellm1 {unique_time}"], |
|
caching=True, |
|
) |
|
await asyncio.sleep(1) |
|
response2 = await litellm.aembedding( |
|
model="azure/azure-embedding-model", |
|
input=[f"good morning from litellm1 {unique_time}"], |
|
caching=True, |
|
) |
|
await asyncio.sleep(1) |
|
print(customHandler_caching.states) |
|
assert len(customHandler_caching.errors) == 0 |
|
assert len(customHandler_caching.states) == 4 |
|
|
|
|
|
|
|
|
|
|
|
|