File size: 17,364 Bytes
7db0ae4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
### What this tests ####
import sys, os, time, inspect, asyncio, traceback
import pytest
sys.path.insert(0, os.path.abspath('../..'))

from litellm import completion, embedding
import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyCustomHandler(CustomLogger):
    complete_streaming_response_in_callback = ""
    def __init__(self):
        self.success: bool = False                  # type: ignore
        self.failure: bool = False                  # type: ignore
        self.async_success: bool = False            # type: ignore
        self.async_success_embedding: bool = False  # type: ignore
        self.async_failure: bool = False            # type: ignore
        self.async_failure_embedding: bool = False  # type: ignore

        self.async_completion_kwargs = None         # type: ignore
        self.async_embedding_kwargs = None          # type: ignore
        self.async_embedding_response = None        # type: ignore

        self.async_completion_kwargs_fail = None    # type: ignore
        self.async_embedding_kwargs_fail = None     # type: ignore

        self.stream_collected_response = None       # type: ignore
        self.sync_stream_collected_response = None       # type: ignore
        self.user = None # type: ignore
        self.data_sent_to_api: dict = {}

    def log_pre_api_call(self, model, messages, kwargs): 
        print(f"Pre-API Call")
        self.data_sent_to_api = kwargs["additional_args"].get("complete_input_dict", {})
    
    def log_post_api_call(self, kwargs, response_obj, start_time, end_time): 
        print(f"Post-API Call")
    
    def log_stream_event(self, kwargs, response_obj, start_time, end_time):
        print(f"On Stream")
        
    def log_success_event(self, kwargs, response_obj, start_time, end_time): 
        print(f"On Success")
        self.success = True
        if kwargs.get("stream") == True:
            self.sync_stream_collected_response = response_obj


    def log_failure_event(self, kwargs, response_obj, start_time, end_time): 
        print(f"On Failure")
        self.failure = True

    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): 
        print(f"On Async success")
        print(f"received kwargs user: {kwargs['user']}")
        self.async_success = True
        if kwargs.get("model") == "text-embedding-ada-002":
            self.async_success_embedding = True
            self.async_embedding_kwargs = kwargs
            self.async_embedding_response = response_obj
        if kwargs.get("stream") == True:
            self.stream_collected_response = response_obj
        self.async_completion_kwargs = kwargs
        self.user = kwargs.get("user", None)
    
    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): 
        print(f"On Async Failure")
        self.async_failure = True
        if kwargs.get("model") == "text-embedding-ada-002":
            self.async_failure_embedding = True
            self.async_embedding_kwargs_fail = kwargs
        
        self.async_completion_kwargs_fail = kwargs

class TmpFunction:
    complete_streaming_response_in_callback = ""
    async_success: bool = False
    async def async_test_logging_fn(self, kwargs, completion_obj, start_time, end_time):
        print(f"ON ASYNC LOGGING")
        self.async_success = True
        print(f'kwargs.get("complete_streaming_response"): {kwargs.get("complete_streaming_response")}')
        self.complete_streaming_response_in_callback = kwargs.get("complete_streaming_response")


def test_async_chat_openai_stream():
    try:
        tmp_function = TmpFunction()
        # litellm.set_verbose = True
        litellm.success_callback = [tmp_function.async_test_logging_fn]
        complete_streaming_response = ""
        async def call_gpt():
            nonlocal complete_streaming_response
            response = await litellm.acompletion(model="gpt-3.5-turbo",
                                messages=[{
                                    "role": "user",
                                    "content": "Hi 👋 - i'm openai"
                                }],
                                stream=True)
            async for chunk in response: 
                complete_streaming_response += chunk["choices"][0]["delta"]["content"] or ""
                print(complete_streaming_response)
        asyncio.run(call_gpt())
        complete_streaming_response = complete_streaming_response.strip("'")
        response1 = tmp_function.complete_streaming_response_in_callback["choices"][0]["message"]["content"]
        response2 = complete_streaming_response
        # assert [ord(c) for c in response1] == [ord(c) for c in response2]
        assert response1 == response2
        assert tmp_function.async_success == True
    except Exception as e:
        print(e)
        pytest.fail(f"An error occurred - {str(e)}")
# test_async_chat_openai_stream()

def test_completion_azure_stream_moderation_failure():
    try:
        customHandler = MyCustomHandler()
        litellm.callbacks = [customHandler]
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {
                "role": "user",
                "content": "how do i kill someone",
            },
        ]
        try: 
            response = completion(
                model="azure/chatgpt-v-2", messages=messages, stream=True
            )
            for chunk in response: 
                print(f"chunk: {chunk}")
                continue
        except Exception as e:
            print(e)
        time.sleep(1)
        assert customHandler.failure == True
    except Exception as e:
        pytest.fail(f"Error occurred: {e}")


def test_async_custom_handler_stream():
    try:
        # [PROD Test] - Do not DELETE 
        # checks if the model response available in the async + stream callbacks is equal to the received response
        customHandler2 = MyCustomHandler()
        litellm.callbacks = [customHandler2]
        litellm.set_verbose = False
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {
                "role": "user",
                "content": "write 1 sentence about litellm being amazing",
            },
        ]
        complete_streaming_response = ""
        async def test_1():
            nonlocal complete_streaming_response
            response = await litellm.acompletion(
                model="azure/chatgpt-v-2", 
                messages=messages,
                stream=True
            )
            async for chunk in response: 
                complete_streaming_response += chunk["choices"][0]["delta"]["content"] or ""
                print(complete_streaming_response)
        
        asyncio.run(test_1())

        response_in_success_handler = customHandler2.stream_collected_response
        response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"]
        print("\n\n")
        print("response_in_success_handler: ", response_in_success_handler)
        print("complete_streaming_response: ", complete_streaming_response)
        assert response_in_success_handler == complete_streaming_response
    except Exception as e:
        pytest.fail(f"Error occurred: {e}")
# test_async_custom_handler_stream()


def test_azure_completion_stream():
    # [PROD Test] - Do not DELETE 
    # test if completion() + sync custom logger get the same complete stream response
    try:
        # checks if the model response available in the async + stream callbacks is equal to the received response
        customHandler2 = MyCustomHandler()
        litellm.callbacks = [customHandler2]
        litellm.set_verbose = False
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {
                "role": "user",
                "content": "write 1 sentence about litellm being amazing",
            },
        ]
        complete_streaming_response = ""

        response = litellm.completion(
            model="azure/chatgpt-v-2", 
            messages=messages,
            stream=True
        )
        for chunk in response: 
            complete_streaming_response += chunk["choices"][0]["delta"]["content"] or ""
            print(complete_streaming_response)
        
        time.sleep(0.5) # wait 1/2 second before checking callbacks
        response_in_success_handler = customHandler2.sync_stream_collected_response
        response_in_success_handler = response_in_success_handler["choices"][0]["message"]["content"]
        print("\n\n")
        print("response_in_success_handler: ", response_in_success_handler)
        print("complete_streaming_response: ", complete_streaming_response)
        assert response_in_success_handler == complete_streaming_response
    except Exception as e:
        pytest.fail(f"Error occurred: {e}")

@pytest.mark.asyncio
async def test_async_custom_handler_completion(): 
    try: 
        customHandler_success = MyCustomHandler()
        customHandler_failure = MyCustomHandler()
        # success
        assert customHandler_success.async_success == False
        litellm.callbacks = [customHandler_success]
        response = await litellm.acompletion(
                model="gpt-3.5-turbo", 
                messages=[{
                    "role": "user",
                    "content": "hello from litellm test",
                }]
            )
        await asyncio.sleep(1)
        assert customHandler_success.async_success == True, "async success is not set to True even after success"
        assert customHandler_success.async_completion_kwargs.get("model") == "gpt-3.5-turbo"
        # failure
        litellm.callbacks = [customHandler_failure]
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {
                "role": "user",
                "content": "how do i kill someone",
            },
        ]

        assert customHandler_failure.async_failure == False 
        try: 
            response = await litellm.acompletion(
                        model="gpt-3.5-turbo", 
                        messages=messages,
                        api_key="my-bad-key",
                    )
        except:
            pass
        assert customHandler_failure.async_failure == True, "async failure is not set to True even after failure"        
        assert customHandler_failure.async_completion_kwargs_fail.get("model") == "gpt-3.5-turbo"
        assert len(str(customHandler_failure.async_completion_kwargs_fail.get("exception"))) > 10 # expect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n  File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n    response = await openai_aclient.chat.completions.create(**data)\n  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119
        litellm.callbacks = []
        print("Passed setting async failure")
    except Exception as e:
        pytest.fail(f"An exception occurred - {str(e)}")
# asyncio.run(test_async_custom_handler_completion())

@pytest.mark.asyncio
async def test_async_custom_handler_embedding(): 
    try: 
        customHandler_embedding = MyCustomHandler()
        litellm.callbacks = [customHandler_embedding]
        # success
        assert customHandler_embedding.async_success_embedding == False
        response = await litellm.aembedding(
                model="text-embedding-ada-002", 
                input = ["hello world"],
            )
        await asyncio.sleep(1)
        assert customHandler_embedding.async_success_embedding == True, "async_success_embedding is not set to True even after success"
        assert customHandler_embedding.async_embedding_kwargs.get("model") == "text-embedding-ada-002"
        assert customHandler_embedding.async_embedding_response["usage"]["prompt_tokens"] ==2
        print("Passed setting async success: Embedding")
        # failure 
        assert customHandler_embedding.async_failure_embedding == False
        try: 
            response = await litellm.aembedding(
                        model="text-embedding-ada-002", 
                        input = ["hello world"],
                        api_key="my-bad-key",
                    )
        except: 
            pass
        assert customHandler_embedding.async_failure_embedding == True, "async failure embedding is not set to True even after failure"        
        assert customHandler_embedding.async_embedding_kwargs_fail.get("model") == "text-embedding-ada-002"
        assert len(str(customHandler_embedding.async_embedding_kwargs_fail.get("exception"))) > 10 # exppect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n  File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n    response = await openai_aclient.chat.completions.create(**data)\n  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119
    except Exception as e:
        pytest.fail(f"An exception occurred - {str(e)}")
# asyncio.run(test_async_custom_handler_embedding())

@pytest.mark.asyncio
async def test_async_custom_handler_embedding_optional_param(): 
    """
    Tests if the openai optional params for embedding - user + encoding_format, 
    are logged
    """
    customHandler_optional_params = MyCustomHandler()
    litellm.callbacks = [customHandler_optional_params]
    response = await litellm.aembedding(
                model="azure/azure-embedding-model", 
                input = ["hello world"],
                user = "John"
            )
    await asyncio.sleep(1) # success callback is async 
    assert customHandler_optional_params.user == "John"
    assert customHandler_optional_params.user == customHandler_optional_params.data_sent_to_api["user"]

# asyncio.run(test_async_custom_handler_embedding_optional_param())

@pytest.mark.asyncio
async def test_async_custom_handler_embedding_optional_param_bedrock(): 
    """
    Tests if the openai optional params for embedding - user + encoding_format, 
    are logged

    but makes sure these are not sent to the non-openai/azure endpoint (raises errors).
    """
    litellm.drop_params = True
    litellm.set_verbose = True
    customHandler_optional_params = MyCustomHandler()
    litellm.callbacks = [customHandler_optional_params]
    response = await litellm.aembedding(
                model="bedrock/amazon.titan-embed-text-v1", 
                input = ["hello world"],
                user = "John"
            )
    await asyncio.sleep(1) # success callback is async 
    assert customHandler_optional_params.user == "John"
    assert "user" not in customHandler_optional_params.data_sent_to_api


def test_redis_cache_completion_stream():
    from litellm import Cache
    # Important Test - This tests if we can add to streaming cache, when custom callbacks are set 
    import random
    try:
        print("\nrunning test_redis_cache_completion_stream")
        litellm.set_verbose = True
        random_number = random.randint(1, 100000) # add a random number to ensure it's always adding / reading from cache
        messages = [{"role": "user", "content": f"write a one sentence poem about: {random_number}"}]
        litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
        print("test for caching, streaming + completion")
        response1 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True)
        response_1_content = ""
        for chunk in response1:
            print(chunk)
            response_1_content += chunk.choices[0].delta.content or ""
        print(response_1_content)

        time.sleep(0.1) # sleep for 0.1 seconds allow set cache to occur
        response2 = completion(model="gpt-3.5-turbo", messages=messages, max_tokens=40, temperature=0.2, stream=True)
        response_2_content = ""
        for chunk in response2:
            print(chunk)
            response_2_content += chunk.choices[0].delta.content or ""
        print("\nresponse 1", response_1_content)
        print("\nresponse 2", response_2_content)
        assert response_1_content == response_2_content, f"Response 1 != Response 2. Same params, Response 1{response_1_content} != Response 2{response_2_content}"
        litellm.success_callback = []
        litellm._async_success_callback = []
        litellm.cache = None
    except Exception as e:
        print(e)
        litellm.success_callback = []
        raise e
# test_redis_cache_completion_stream()