File size: 2,352 Bytes
ed1e314
 
1aa1aec
ed1e314
 
 
 
1aa1aec
 
 
 
 
 
 
ed1e314
 
 
 
 
 
 
 
 
 
1aa1aec
ed1e314
 
 
 
 
 
 
 
 
 
 
1aa1aec
 
 
ed1e314
1aa1aec
ed1e314
 
1aa1aec
ed1e314
 
 
 
1aa1aec
 
 
 
 
 
ed1e314
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import ray
from ray import serve
import time
import asyncio

# Create a Semaphore object
semaphore = asyncio.Semaphore(10)

test_image_url = "https://static.wixstatic.com/media/4d6b49_42b9435ce1104008b1b5f7a3c9bfcd69~mv2.jpg/v1/fill/w_454,h_333,fp_0.50_0.50,q_90/4d6b49_42b9435ce1104008b1b5f7a3c9bfcd69~mv2.jpg"
english_text = (
    "It was the best of times, it was the worst of times, it was the age "
    "of wisdom, it was the age of foolishness, it was the epoch of belief"
)

async def send_text_request(serve_client, number):
    async with semaphore:
        # async_handle = serve_client.get_handle("CLIPTransform", sync=False)
        async_handle = serve.get_deployment("CLIPTransform").get_handle(sync=False)
        # async_handle = serve.get_deployment("CLIPTransform").get_handle()
        embeddings = ray.get(await async_handle.text_to_embeddings.remote(english_text))
        # embeddings = await async_handle.text_to_embeddings.remote(english_text)
        # embeddings = async_handle.text_to_embeddings.remote(english_text)
        # embeddings = await ray.get(embeddings)
        return number, embeddings

# def process_text(server_client, numbers, max_workers=10):
#     with ThreadPoolExecutor(max_workers=max_workers) as executor:
#         futures = [executor.submit(send_text_request, server_client, number) for number in numbers]
#         for future in as_completed(futures):
#             n_result, result = future.result()
#             print (f"{n_result} : {len(result[0])}")
async def process_text(server_client, numbers):
    tasks = [send_text_request(server_client, number) for number in numbers]
    for future in asyncio.as_completed(tasks):
        n_result, result = await future
        print (f"{n_result} : {len(result[0])}")

if __name__ == "__main__":
    # n_calls = 100000
    n_calls = 1
    numbers = list(range(n_calls))
    ray.init()
    server_client = serve.start(detached=True) 
    start_time = time.monotonic()

    # Run the async function
    asyncio.run(process_text(server_client, numbers))

    end_time = time.monotonic()
    total_time = end_time - start_time
    avg_time_ms = total_time / n_calls * 1000
    calls_per_sec = n_calls / total_time
    print(f"Average time taken: {avg_time_ms:.2f} ms")
    print(f"Number of calls per second: {calls_per_sec:.2f}")    
    ray.shutdown()