Spaces:
				
			
			
	
			
			
		Build error
		
	
	
	
			
			
	
	
	
	
		
		
		Build error
		
	| # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import asyncio | |
| from io import BytesIO | |
| import pickle | |
| import time | |
| from typing import Callable | |
| import httpx | |
| from tqdm import tqdm | |
| def content_with_progress(content, chunk_size=1024, desc="Upload", | |
| progress_callback: Callable[[str, float, tqdm], None] | None = None): | |
| total = len(content) | |
| with tqdm(total=total, unit_scale=True, unit_divisor=1024, unit="B", desc=desc) as progress: | |
| for i in range(0, total, chunk_size): | |
| chunk = content[i:i + chunk_size] | |
| yield chunk | |
| report_progress("upload", len(chunk), progress, callback=progress_callback) | |
| async def async_content_with_progress(*args, **kwargs): | |
| for chunk in content_with_progress(*args, **kwargs): | |
| yield chunk | |
| def streaming_response_to_response(response: httpx.Response, content_bytes: BytesIO) -> httpx.Response: | |
| """ | |
| Convert a streaming response to a non-streaming response. | |
| """ | |
| # TODO: is there a nicer way to get a non-streaming-style Response object, despite | |
| # having used the streaming API above? (for uniform consumption by the caller). | |
| to_remove = set(["is_stream_consumed", "next_request", "is_closed", "content", "stream"] + [ | |
| k for k in response.__dict__ if k.startswith("_") | |
| ]) | |
| kwargs = { k: v for k, v in response.__dict__.items() if k not in to_remove } | |
| content_bytes.seek(0) | |
| kwargs["content"] = content_bytes.read() | |
| return httpx.Response(**kwargs) | |
| def report_progress(direction: str, progress_absolute: int | float, | |
| bar: tqdm, callback: Callable[[str, float, tqdm], None] | None = None): | |
| bar.update(progress_absolute) | |
| if callback is not None: | |
| progress_percent = bar.n / bar.total | |
| callback(direction=direction, progress=progress_percent, bar=bar) | |
| def httpx_request(method: str, | |
| *args, | |
| progress: bool = False, | |
| progress_direction: str = "auto", | |
| desc: str | None = None, | |
| async_client: httpx.AsyncClient | None = None, | |
| callback: Callable | None = None, | |
| progress_callback: Callable[[str, float, tqdm], None] | None = None, | |
| **kwargs) -> httpx.Response | asyncio.Task[httpx.Response]: | |
| is_async = async_client is not None | |
| progress_download = progress and ( | |
| progress_direction in ("both", "download") | |
| or (progress_direction == "auto" and method.lower() == "get") | |
| ) | |
| progress_upload = progress and ( | |
| progress_direction in ("both", "upload") | |
| or (progress_direction == "auto" and method.lower() == "post") | |
| ) | |
| if progress_upload: | |
| for key in ("content", "data"): | |
| if key in kwargs: | |
| upload_desc = f"{desc} (upload)" if desc else "Upload" | |
| wrapper = async_content_with_progress if is_async else content_with_progress | |
| kwargs[key] = wrapper(kwargs[key], desc=upload_desc, progress_callback=progress_callback) | |
| if progress_download: | |
| # Progress bar requested for download, need to use streaming API | |
| if async_client is None: | |
| content_bytes = BytesIO() | |
| with httpx.stream(method, *args, **kwargs) as response: | |
| total = int(response.headers["Content-Length"]) | |
| with tqdm(total=total, unit_scale=True, unit_divisor=1024, unit="B", desc=desc) as progress: | |
| num_bytes_downloaded = response.num_bytes_downloaded | |
| for chunk in response.iter_bytes(): | |
| report_progress("download", response.num_bytes_downloaded - num_bytes_downloaded, | |
| progress, callback=progress_callback) | |
| num_bytes_downloaded = response.num_bytes_downloaded | |
| content_bytes.write(chunk) | |
| response = streaming_response_to_response(response, content_bytes) | |
| if callback is not None: | |
| callback(response) | |
| return response | |
| else: | |
| async def inner(): | |
| content_bytes = BytesIO() | |
| async with async_client.stream(method, *args, **kwargs) as response: | |
| total = int(response.headers["Content-Length"]) | |
| with tqdm(total=total, unit_scale=True, unit_divisor=1024, unit="B", desc=desc) as progress: | |
| num_bytes_downloaded = response.num_bytes_downloaded | |
| async for chunk in response.aiter_bytes(): | |
| report_progress("download", response.num_bytes_downloaded - num_bytes_downloaded, | |
| progress, callback=progress_callback) | |
| num_bytes_downloaded = response.num_bytes_downloaded | |
| content_bytes.write(chunk) | |
| response = streaming_response_to_response(response, content_bytes) | |
| return response | |
| task = asyncio.create_task(inner()) | |
| if callback is not None: | |
| task.add_done_callback(callback) | |
| return task | |
| else: | |
| # No download progress bar needed, use standard httpx methods | |
| if is_async: | |
| task = asyncio.create_task( | |
| async_client.request(method, *args, **kwargs) | |
| ) | |
| if callback is not None: | |
| task.add_done_callback(callback) | |
| return task | |
| else: | |
| res = httpx.request(method, *args, **kwargs) | |
| if callback is not None: | |
| callback(res) | |
| return res | |
| def benchmark_requests(host, port, n=100): | |
| url = f"http://{host}:{port}/image" | |
| t0 = time.time() | |
| for i in range(n): | |
| res = httpx.get(url) | |
| loaded = pickle.loads(res.content) | |
| assert "image" in loaded | |
| elapsed = time.time() - t0 | |
| print(f"Took {elapsed} s = {1000 * elapsed / n} ms/it") | |
