gMAS / src /utils /async_utils.py
Артём Боярских
chore: initial commit
3193174
import asyncio
from collections.abc import Awaitable, Coroutine
from typing import Any
__all__ = ["gather_with_concurrency", "run_sync"]
def run_sync[T](coro: Awaitable[T] | Coroutine[Any, Any, T], *, context: str = "run_sync") -> T:
"""
Run a coroutine synchronously when there is no active event loop.
Raises RuntimeError if called from within an already-running loop.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
if not asyncio.iscoroutine(coro):
async def _wrap() -> T:
return await coro
return asyncio.run(_wrap())
return asyncio.run(coro) # type: ignore[arg-type]
msg = f"{context} cannot be called while an event loop is running. Use the asynchronous variant instead."
raise RuntimeError(msg)
async def gather_with_concurrency[T](
n: int,
*coros: Awaitable[T],
) -> list[T]:
"""Run coroutines with a concurrency limit of n simultaneous tasks."""
semaphore = asyncio.Semaphore(n)
async def bounded(coro: Awaitable[T]) -> T:
async with semaphore:
return await coro
return await asyncio.gather(*[bounded(c) for c in coros])
async def timeout_wrapper[T](
coro: Awaitable[T],
timeout_seconds: float,
error_message: str = "Operation timed out",
) -> T:
"""Wrap a coroutine with a timeout, raising TimeoutError with the given message."""
try:
return await asyncio.wait_for(coro, timeout=timeout_seconds)
except TimeoutError as exc:
raise TimeoutError(error_message) from exc