# #### What this tests #### # # This tests the cost tracking function works with consecutive calls (~10 consecutive calls) # import sys, os, asyncio # import traceback # import pytest # sys.path.insert( # 0, os.path.abspath("../..") # ) # Adds the parent directory to the system path # import dotenv # dotenv.load_dotenv() # import litellm # from fastapi.testclient import TestClient # from fastapi import FastAPI # from litellm.proxy.proxy_server import router, save_worker_config, startup_event # Replace with the actual module where your FastAPI router is defined # filepath = os.path.dirname(os.path.abspath(__file__)) # config_fp = f"{filepath}/test_config.yaml" # save_worker_config(config=config_fp, model=None, alias=None, api_base=None, api_version=None, debug=True, temperature=None, max_tokens=None, request_timeout=600, max_budget=None, telemetry=False, drop_params=True, add_function_to_prompt=False, headers=None, save=False, use_queue=False) # app = FastAPI() # app.include_router(router) # Include your router in the test app # @app.on_event("startup") # async def wrapper_startup_event(): # await startup_event() # # Here you create a fixture that will be used by your tests # # Make sure the fixture returns TestClient(app) # @pytest.fixture(autouse=True) # def client(): # with TestClient(app) as client: # yield client # @pytest.mark.asyncio # async def test_proxy_cost_tracking(client): # """ # Get min cost. # Create new key. # Run 10 parallel calls. # Check cost for key at the end. # assert it's > min cost. # """ # model = "gpt-3.5-turbo" # messages = [{"role": "user", "content": "Hey, how's it going?"}] # number_of_calls = 1 # min_cost = litellm.completion_cost(model=model, messages=messages) * number_of_calls # try: # ### CREATE NEW KEY ### # test_data = { # "models": ["azure-model"], # } # # Your bearer token # token = os.getenv("PROXY_MASTER_KEY") # headers = { # "Authorization": f"Bearer {token}" # } # create_new_key = client.post("/key/generate", json=test_data, headers=headers) # key = create_new_key.json()["key"] # print(f"received key: {key}") # ### MAKE PARALLEL CALLS ### # async def test_chat_completions(): # # Your test data # test_data = { # "model": "azure-model", # "messages": messages # } # tmp_headers = { # "Authorization": f"Bearer {key}" # } # response = client.post("/v1/chat/completions", json=test_data, headers=tmp_headers) # assert response.status_code == 200 # result = response.json() # print(f"Received response: {result}") # tasks = [test_chat_completions() for _ in range(number_of_calls)] # chat_completions = await asyncio.gather(*tasks) # ### CHECK SPEND ### # get_key_spend = client.get(f"/key/info?key={key}", headers=headers) # assert get_key_spend.json()["info"]["spend"] > min_cost # # print(f"chat_completions: {chat_completions}") # # except Exception as e: # # pytest.fail(f"LiteLLM Proxy test failed. Exception - {str(e)}") # #### JUST TEST LOCAL PROXY SERVER # import requests, os # from concurrent.futures import ThreadPoolExecutor # import dotenv # dotenv.load_dotenv() # api_url = "" # def make_api_call(api_url): # # Your test data # test_data = { # "model": "azure-model", # "messages": [ # { # "role": "user", # "content": "hi" # }, # ], # "max_tokens": 10, # } # # Your bearer token # token = os.getenv("PROXY_MASTER_KEY") # headers = { # "Authorization": f"Bearer {token}" # } # print("testing proxy server") # response = requests.post(api_url, json=test_data, headers=headers) # return response.json() # # Number of parallel API calls # num_parallel_calls = 3 # # List to store results # results = [] # # Create a ThreadPoolExecutor # with ThreadPoolExecutor() as executor: # # Submit the API calls concurrently # futures = [executor.submit(make_api_call, api_url) for _ in range(num_parallel_calls)] # # Gather the results as they become available # for future in futures: # try: # result = future.result() # results.append(result) # except Exception as e: # print(f"Error: {e}") # # Print the results # for idx, result in enumerate(results, start=1): # print(f"Result {idx}: {result}")