# #### What this tests #### # # This profiles a router call to find where calls are taking the most time. # import sys, os, time, logging # import traceback, asyncio, uuid # import pytest # import cProfile # from pstats import Stats # sys.path.insert( # 0, os.path.abspath("../..") # ) # Adds the parent directory to the system path # import litellm # from litellm import Router # from concurrent.futures import ThreadPoolExecutor # from dotenv import load_dotenv # from aiodebug import log_slow_callbacks # Import the aiodebug utility for logging slow callbacks # # litellm.telemetry = False # load_dotenv() # logging.basicConfig( # level=logging.DEBUG, # format='%(asctime)s %(levelname)s: %(message)s', # datefmt='%I:%M:%S %p', # filename='aiologs.log', # Name of the log file where logs will be written # filemode='w' # 'w' to overwrite the log file on each run, use 'a' to append # ) # # Dictionary to store exception types and their counts # exception_counts = {} # exception_data = [] # litellm.telemetry = False # num_task_cancelled_errors = 0 # model_list = [{ # "model_name": "azure-model", # "litellm_params": { # "model": "azure/gpt-turbo", # "api_key": "os.environ/AZURE_FRANCE_API_KEY", # "api_base": "https://openai-france-1234.openai.azure.com", # "rpm": 1440, # } # }, { # "model_name": "azure-model", # "litellm_params": { # "model": "azure/gpt-35-turbo", # "api_key": "os.environ/AZURE_EUROPE_API_KEY", # "api_base": "https://my-endpoint-europe-berri-992.openai.azure.com", # "rpm": 6 # } # }, { # "model_name": "azure-model", # "litellm_params": { # "model": "azure/gpt-35-turbo", # "api_key": "os.environ/AZURE_CANADA_API_KEY", # "api_base": "https://my-endpoint-canada-berri992.openai.azure.com", # "rpm": 6 # } # }] # router = Router(model_list=model_list, set_verbose=False, num_retries=3) # async def router_completion(): # global num_task_cancelled_errors, exception_counts # try: # messages=[{"role": "user", "content": f"This is a test: {uuid.uuid4()}"}] # response = await router.acompletion(model="azure-model", messages=messages) # return response # except asyncio.exceptions.CancelledError: # exception_type = "CancelledError" # exception_counts[exception_type] = exception_counts.get(exception_type, 0) + 1 # print("Task was cancelled") # num_task_cancelled_errors += 1 # exception_data.append({ # "type": exception_type, # "traceback": None # }) # return None # except Exception as e: # exception_type = type(e).__name__ # exception_counts[exception_type] = exception_counts.get(exception_type, 0) + 1 # exception_data.append({ # "type": exception_type, # "traceback": traceback.format_exc() # }) # return None # async def loadtest_fn(n = 1452): # global num_task_cancelled_errors, exception_counts # start = time.time() # tasks = [router_completion() for _ in range(n)] # chat_completions = await asyncio.gather(*tasks) # successful_completions = [c for c in chat_completions if c is not None] # print(n, time.time() - start, len(successful_completions)) # # Print exception breakdown # print("Exception Breakdown:") # for exception_type, count in exception_counts.items(): # print(f"{exception_type}: {count}") # # Store exception_data in a file # with open('exception_data.txt', 'w') as file: # for data in exception_data: # file.write(f"Type: {data['type']}\n") # if data['traceback']: # file.write(f"Traceback:\n{data['traceback']}\n\n") # loop = asyncio.get_event_loop() # loop.set_debug(True) # log_slow_callbacks.enable(0.05) # Log callbacks slower than 0.05 seconds # # Excute the load testing function within the asyncio event loop # loop.run_until_complete(loadtest_fn()) # # ### SUSTAINED LOAD TESTS ### # # import time, asyncio # # async def make_requests(n): # # tasks = [router_completion() for _ in range(n)] # # print(f"num tasks: {len(tasks)}") # # chat_completions = await asyncio.gather(*tasks) # # successful_completions = [c for c in chat_completions if c is not None] # # print(f"successful_completions: {len(successful_completions)}") # # return successful_completions # # async def main(): # # start_time = time.time() # # total_successful_requests = 0 # # request_limit = 1000 # # batches = 2 # batches of 1k requests # # start = time.time() # # tasks = [] # list to hold all tasks # # async def request_loop(): # # nonlocal tasks # # for _ in range(batches): # # # Make 1,000 requests # # task = asyncio.create_task(make_requests(request_limit)) # # tasks.append(task) # # # Introduce a delay to achieve 1,000 requests per second # # await asyncio.sleep(1) # # await request_loop() # # results = await asyncio.gather(*tasks) # # total_successful_requests = sum(len(res) for res in results) # # print(request_limit*batches, time.time() - start, total_successful_requests) # # asyncio.run(main())