File size: 4,758 Bytes
395201c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# #### What this tests ####
# #    This tests the cost tracking function works with consecutive calls (~10 consecutive calls)

# import sys, os, asyncio
# import traceback
# import pytest
# sys.path.insert(
#     0, os.path.abspath("../..")
# )  # Adds the parent directory to the system path
# import dotenv
# dotenv.load_dotenv()
# import litellm
# from fastapi.testclient import TestClient
# from fastapi import FastAPI
# from litellm.proxy.proxy_server import router, save_worker_config, startup_event  # Replace with the actual module where your FastAPI router is defined
# filepath = os.path.dirname(os.path.abspath(__file__))
# config_fp = f"{filepath}/test_config.yaml"
# save_worker_config(config=config_fp, model=None, alias=None, api_base=None, api_version=None, debug=True, temperature=None, max_tokens=None, request_timeout=600, max_budget=None, telemetry=False, drop_params=True, add_function_to_prompt=False, headers=None, save=False, use_queue=False)
# app = FastAPI()
# app.include_router(router)  # Include your router in the test app
# @app.on_event("startup")
# async def wrapper_startup_event():
#     await startup_event()

# # Here you create a fixture that will be used by your tests
# # Make sure the fixture returns TestClient(app)
# @pytest.fixture(autouse=True)
# def client():
#     with TestClient(app) as client:
#         yield client

# @pytest.mark.asyncio
# async def test_proxy_cost_tracking(client): 
#     """
#     Get min cost. 
#     Create new key.
#     Run 10 parallel calls. 
#     Check cost for key at the end. 
#     assert it's > min cost. 
#     """
#     model = "gpt-3.5-turbo"
#     messages = [{"role": "user", "content": "Hey, how's it going?"}]
#     number_of_calls = 1
#     min_cost = litellm.completion_cost(model=model, messages=messages) * number_of_calls
#     try: 
#         ### CREATE NEW KEY ###
#         test_data = {
#             "models": ["azure-model"], 
#         }
#         # Your bearer token
#         token = os.getenv("PROXY_MASTER_KEY")

#         headers = {
#             "Authorization": f"Bearer {token}"
#         }
#         create_new_key = client.post("/key/generate", json=test_data, headers=headers)
#         key = create_new_key.json()["key"]
#         print(f"received key: {key}")
#         ### MAKE PARALLEL CALLS ###
#         async def test_chat_completions(): 
#             # Your test data
#             test_data = {
#                 "model": "azure-model",
#                 "messages": messages
#             }

#             tmp_headers = {
#                 "Authorization": f"Bearer {key}"
#             }

#             response = client.post("/v1/chat/completions", json=test_data, headers=tmp_headers)

#             assert response.status_code == 200
#             result = response.json()
#             print(f"Received response: {result}")
#         tasks = [test_chat_completions() for _ in range(number_of_calls)]
#         chat_completions = await asyncio.gather(*tasks)
#         ### CHECK SPEND ###
#         get_key_spend = client.get(f"/key/info?key={key}", headers=headers)

#         assert get_key_spend.json()["info"]["spend"] > min_cost
# #         print(f"chat_completions: {chat_completions}")
# #     except Exception as e:
# #         pytest.fail(f"LiteLLM Proxy test failed. Exception - {str(e)}")

# #### JUST TEST LOCAL PROXY SERVER

# import requests, os
# from concurrent.futures import ThreadPoolExecutor
# import dotenv
# dotenv.load_dotenv()

# api_url = "http://0.0.0.0:8000/chat/completions"

# def make_api_call(api_url):
#     # Your test data
#     test_data = {
#         "model": "azure-model",
#         "messages": [
#             {
#                 "role": "user",
#                 "content": "hi"
#             },
#         ],
#         "max_tokens": 10,
#     }
#     # Your bearer token
#     token = os.getenv("PROXY_MASTER_KEY")

#     headers = {
#         "Authorization": f"Bearer {token}"
#     }
#     print("testing proxy server")
#     response = requests.post(api_url, json=test_data, headers=headers)
#     return response.json()

# # Number of parallel API calls
# num_parallel_calls = 3

# # List to store results
# results = []

# # Create a ThreadPoolExecutor
# with ThreadPoolExecutor() as executor:
#     # Submit the API calls concurrently
#     futures = [executor.submit(make_api_call, api_url) for _ in range(num_parallel_calls)]

#     # Gather the results as they become available
#     for future in futures:
#         try:
#             result = future.result()
#             results.append(result)
#         except Exception as e:
#             print(f"Error: {e}")

# # Print the results
# for idx, result in enumerate(results, start=1):
#     print(f"Result {idx}: {result}")