Spaces:
Sleeping
Sleeping
File size: 2,144 Bytes
5d7b3b3 2ce9cdd 5d7b3b3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
import requests
import time
from concurrent.futures import ThreadPoolExecutor
import csv
NUM_REQUESTS = 5
CONCURRENT_THREADS = 10
URL = "https://mo01018-Deployment-Trial.hf.space"
def send_request():
data = {
'search': "A MRI, magnetic resonance imaging, scan is a very useful diagnosis tool.",
'pipeline_select': '1'
}
start_time = time.time()
try:
response = requests.post(URL, data=data)
elapsed = time.time() - start_time
if response.status_code != 200:
print(f"Error {response.status_code}: {response.text[:100]}")
return response.status_code, elapsed
except Exception as e:
print("Request failed:", e)
return 500, 0 # Treat exceptions as failures
def run_stress_test():
results = []
with ThreadPoolExecutor(max_workers=CONCURRENT_THREADS) as executor:
futures = [executor.submit(send_request) for _ in range(NUM_REQUESTS)]
for future in futures:
results.append(future.result())
successes = sum(1 for r in results if r[0] == 200)
failures = NUM_REQUESTS - successes
avg_time = sum(r[1] for r in results) / NUM_REQUESTS
max_time = max(r[1] for r in results)
min_time = min(r[1] for r in results)
print(f"\n=== Stress Test Results ===")
print(f"Total Requests: {NUM_REQUESTS}")
print(f"Concurrency Level: {CONCURRENT_THREADS}")
print(f"Successes: {successes}")
print(f"Failures: {failures}")
print(f"Avg Time: {avg_time:.3f}s")
print(f"Min Time: {min_time:.3f}s")
print(f"Max Time: {max_time:.3f}s")
return [NUM_REQUESTS, CONCURRENT_THREADS, avg_time, max_time]
if __name__ == "__main__":
# Open the CSV file for writing the summary results
with open('stress_test_results.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Total Requests', 'Concurrency Level', 'Avg Time', 'Max Time'])
for users in [1, 5, 10, 20, 50, 100]:
CONCURRENT_THREADS = users
NUM_REQUESTS = users * 5
result = run_stress_test()
writer.writerow(result)
|