File size: 3,867 Bytes
395201c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
import requests
import time
import os
from dotenv import load_dotenv
load_dotenv()
# Set the base URL as needed
base_url = "https://api.litellm.ai"
# # Uncomment the line below if you want to switch to the local server
# base_url = "http://0.0.0.0:8000"
# Step 1 Add a config to the proxy, generate a temp key
config = {
"model_list": [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.environ['OPENAI_API_KEY'],
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.environ['AZURE_API_KEY'],
"api_base": "https://openai-gpt-4-test-v-1.openai.azure.com/",
"api_version": "2023-07-01-preview"
}
}
]
}
print("STARTING LOAD TEST Q")
print(os.environ['AZURE_API_KEY'])
response = requests.post(
url=f"{base_url}/key/generate",
json={
"config": config,
"duration": "30d" # default to 30d, set it to 30m if you want a temp key
},
headers={
"Authorization": "Bearer sk-hosted-litellm"
}
)
print("\nresponse from generating key", response.text)
print("\n json response from gen key", response.json())
generated_key = response.json()["key"]
print("\ngenerated key for proxy", generated_key)
# Step 2: Queue 50 requests to the proxy, using your generated_key
import concurrent.futures
def create_job_and_poll(request_num):
print(f"Creating a job on the proxy for request {request_num}")
job_response = requests.post(
url=f"{base_url}/queue/request",
json={
'model': 'gpt-3.5-turbo',
'messages': [
{'role': 'system', 'content': 'write a short poem'},
],
},
headers={
"Authorization": f"Bearer {generated_key}"
}
)
print(job_response.status_code)
print(job_response.text)
print("\nResponse from creating job", job_response.text)
job_response = job_response.json()
job_id = job_response["id"]
polling_url = job_response["url"]
polling_url = f"{base_url}{polling_url}"
print(f"\nCreated Job {request_num}, Polling Url {polling_url}")
# Poll each request
while True:
try:
print(f"\nPolling URL for request {request_num}", polling_url)
polling_response = requests.get(
url=polling_url,
headers={
"Authorization": f"Bearer {generated_key}"
}
)
print(f"\nResponse from polling url for request {request_num}", polling_response.text)
polling_response = polling_response.json()
status = polling_response.get("status", None)
if status == "finished":
llm_response = polling_response["result"]
print(f"LLM Response for request {request_num}")
print(llm_response)
# Write the llm_response to load_test_log.txt
try:
with open("load_test_log.txt", "a") as response_file:
response_file.write(
f"Response for request: {request_num}\n{llm_response}\n\n"
)
except Exception as e:
print("GOT EXCEPTION", e)
break
time.sleep(0.5)
except Exception as e:
print("got exception when polling", e)
# Number of requests
num_requests = 100
# Use ThreadPoolExecutor for parallel execution
with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as executor:
# Create and poll each request in parallel
futures = [executor.submit(create_job_and_poll, i) for i in range(num_requests)]
# Wait for all futures to complete
concurrent.futures.wait(futures) |