File size: 4,758 Bytes
395201c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# #### What this tests ####
# # This tests the cost tracking function works with consecutive calls (~10 consecutive calls)
# import sys, os, asyncio
# import traceback
# import pytest
# sys.path.insert(
# 0, os.path.abspath("../..")
# ) # Adds the parent directory to the system path
# import dotenv
# dotenv.load_dotenv()
# import litellm
# from fastapi.testclient import TestClient
# from fastapi import FastAPI
# from litellm.proxy.proxy_server import router, save_worker_config, startup_event # Replace with the actual module where your FastAPI router is defined
# filepath = os.path.dirname(os.path.abspath(__file__))
# config_fp = f"{filepath}/test_config.yaml"
# save_worker_config(config=config_fp, model=None, alias=None, api_base=None, api_version=None, debug=True, temperature=None, max_tokens=None, request_timeout=600, max_budget=None, telemetry=False, drop_params=True, add_function_to_prompt=False, headers=None, save=False, use_queue=False)
# app = FastAPI()
# app.include_router(router) # Include your router in the test app
# @app.on_event("startup")
# async def wrapper_startup_event():
# await startup_event()
# # Here you create a fixture that will be used by your tests
# # Make sure the fixture returns TestClient(app)
# @pytest.fixture(autouse=True)
# def client():
# with TestClient(app) as client:
# yield client
# @pytest.mark.asyncio
# async def test_proxy_cost_tracking(client):
# """
# Get min cost.
# Create new key.
# Run 10 parallel calls.
# Check cost for key at the end.
# assert it's > min cost.
# """
# model = "gpt-3.5-turbo"
# messages = [{"role": "user", "content": "Hey, how's it going?"}]
# number_of_calls = 1
# min_cost = litellm.completion_cost(model=model, messages=messages) * number_of_calls
# try:
# ### CREATE NEW KEY ###
# test_data = {
# "models": ["azure-model"],
# }
# # Your bearer token
# token = os.getenv("PROXY_MASTER_KEY")
# headers = {
# "Authorization": f"Bearer {token}"
# }
# create_new_key = client.post("/key/generate", json=test_data, headers=headers)
# key = create_new_key.json()["key"]
# print(f"received key: {key}")
# ### MAKE PARALLEL CALLS ###
# async def test_chat_completions():
# # Your test data
# test_data = {
# "model": "azure-model",
# "messages": messages
# }
# tmp_headers = {
# "Authorization": f"Bearer {key}"
# }
# response = client.post("/v1/chat/completions", json=test_data, headers=tmp_headers)
# assert response.status_code == 200
# result = response.json()
# print(f"Received response: {result}")
# tasks = [test_chat_completions() for _ in range(number_of_calls)]
# chat_completions = await asyncio.gather(*tasks)
# ### CHECK SPEND ###
# get_key_spend = client.get(f"/key/info?key={key}", headers=headers)
# assert get_key_spend.json()["info"]["spend"] > min_cost
# # print(f"chat_completions: {chat_completions}")
# # except Exception as e:
# # pytest.fail(f"LiteLLM Proxy test failed. Exception - {str(e)}")
# #### JUST TEST LOCAL PROXY SERVER
# import requests, os
# from concurrent.futures import ThreadPoolExecutor
# import dotenv
# dotenv.load_dotenv()
# api_url = "http://0.0.0.0:8000/chat/completions"
# def make_api_call(api_url):
# # Your test data
# test_data = {
# "model": "azure-model",
# "messages": [
# {
# "role": "user",
# "content": "hi"
# },
# ],
# "max_tokens": 10,
# }
# # Your bearer token
# token = os.getenv("PROXY_MASTER_KEY")
# headers = {
# "Authorization": f"Bearer {token}"
# }
# print("testing proxy server")
# response = requests.post(api_url, json=test_data, headers=headers)
# return response.json()
# # Number of parallel API calls
# num_parallel_calls = 3
# # List to store results
# results = []
# # Create a ThreadPoolExecutor
# with ThreadPoolExecutor() as executor:
# # Submit the API calls concurrently
# futures = [executor.submit(make_api_call, api_url) for _ in range(num_parallel_calls)]
# # Gather the results as they become available
# for future in futures:
# try:
# result = future.result()
# results.append(result)
# except Exception as e:
# print(f"Error: {e}")
# # Print the results
# for idx, result in enumerate(results, start=1):
# print(f"Result {idx}: {result}")
|