import ray from ray import serve import time import asyncio # Create a Semaphore object semaphore = asyncio.Semaphore(10) test_image_url = "https://static.wixstatic.com/media/4d6b49_42b9435ce1104008b1b5f7a3c9bfcd69~mv2.jpg/v1/fill/w_454,h_333,fp_0.50_0.50,q_90/4d6b49_42b9435ce1104008b1b5f7a3c9bfcd69~mv2.jpg" english_text = ( "It was the best of times, it was the worst of times, it was the age " "of wisdom, it was the age of foolishness, it was the epoch of belief" ) async def send_text_request(serve_client, number): async with semaphore: # async_handle = serve_client.get_handle("CLIPTransform", sync=False) async_handle = serve.get_deployment("CLIPTransform").get_handle(sync=False) # async_handle = serve.get_deployment("CLIPTransform").get_handle() embeddings = ray.get(await async_handle.text_to_embeddings.remote(english_text)) # embeddings = await async_handle.text_to_embeddings.remote(english_text) # embeddings = async_handle.text_to_embeddings.remote(english_text) # embeddings = await ray.get(embeddings) return number, embeddings # def process_text(server_client, numbers, max_workers=10): # with ThreadPoolExecutor(max_workers=max_workers) as executor: # futures = [executor.submit(send_text_request, server_client, number) for number in numbers] # for future in as_completed(futures): # n_result, result = future.result() # print (f"{n_result} : {len(result[0])}") async def process_text(server_client, numbers): tasks = [send_text_request(server_client, number) for number in numbers] for future in asyncio.as_completed(tasks): n_result, result = await future print (f"{n_result} : {len(result[0])}") if __name__ == "__main__": # n_calls = 100000 n_calls = 1 numbers = list(range(n_calls)) ray.init() server_client = serve.start(detached=True) start_time = time.monotonic() # Run the async function asyncio.run(process_text(server_client, numbers)) end_time = time.monotonic() total_time = end_time - start_time avg_time_ms = total_time / n_calls * 1000 calls_per_sec = n_calls / total_time print(f"Average time taken: {avg_time_ms:.2f} ms") print(f"Number of calls per second: {calls_per_sec:.2f}") ray.shutdown()