Spaces:
Running
Running
| import asyncio | |
| from collections.abc import Awaitable, Coroutine | |
| from typing import Any | |
| __all__ = ["gather_with_concurrency", "run_sync"] | |
| def run_sync[T](coro: Awaitable[T] | Coroutine[Any, Any, T], *, context: str = "run_sync") -> T: | |
| """ | |
| Run a coroutine synchronously when there is no active event loop. | |
| Raises RuntimeError if called from within an already-running loop. | |
| """ | |
| try: | |
| asyncio.get_running_loop() | |
| except RuntimeError: | |
| if not asyncio.iscoroutine(coro): | |
| async def _wrap() -> T: | |
| return await coro | |
| return asyncio.run(_wrap()) | |
| return asyncio.run(coro) # type: ignore[arg-type] | |
| msg = f"{context} cannot be called while an event loop is running. Use the asynchronous variant instead." | |
| raise RuntimeError(msg) | |
| async def gather_with_concurrency[T]( | |
| n: int, | |
| *coros: Awaitable[T], | |
| ) -> list[T]: | |
| """Run coroutines with a concurrency limit of n simultaneous tasks.""" | |
| semaphore = asyncio.Semaphore(n) | |
| async def bounded(coro: Awaitable[T]) -> T: | |
| async with semaphore: | |
| return await coro | |
| return await asyncio.gather(*[bounded(c) for c in coros]) | |
| async def timeout_wrapper[T]( | |
| coro: Awaitable[T], | |
| timeout_seconds: float, | |
| error_message: str = "Operation timed out", | |
| ) -> T: | |
| """Wrap a coroutine with a timeout, raising TimeoutError with the given message.""" | |
| try: | |
| return await asyncio.wait_for(coro, timeout=timeout_seconds) | |
| except TimeoutError as exc: | |
| raise TimeoutError(error_message) from exc | |