| from __future__ import annotations |
|
|
| import math |
| import sys |
| import threading |
| from collections.abc import Awaitable, Callable, Generator |
| from contextlib import contextmanager |
| from contextvars import Token |
| from importlib import import_module |
| from typing import TYPE_CHECKING, Any, TypeVar |
|
|
| from ._exceptions import NoEventLoopError |
|
|
| if sys.version_info >= (3, 11): |
| from typing import TypeVarTuple, Unpack |
| else: |
| from typing_extensions import TypeVarTuple, Unpack |
|
|
| sniffio: Any |
| try: |
| import sniffio |
| except ModuleNotFoundError: |
| sniffio = None |
|
|
| if TYPE_CHECKING: |
| from ..abc import AsyncBackend |
|
|
| |
| BACKENDS = "asyncio", "trio" |
|
|
| T_Retval = TypeVar("T_Retval") |
| PosArgsT = TypeVarTuple("PosArgsT") |
|
|
| threadlocals = threading.local() |
| loaded_backends: dict[str, type[AsyncBackend]] = {} |
|
|
|
|
| def run( |
| func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], |
| *args: Unpack[PosArgsT], |
| backend: str = "asyncio", |
| backend_options: dict[str, Any] | None = None, |
| ) -> T_Retval: |
| """ |
| Run the given coroutine function in an asynchronous event loop. |
| |
| The current thread must not be already running an event loop. |
| |
| :param func: a coroutine function |
| :param args: positional arguments to ``func`` |
| :param backend: name of the asynchronous event loop implementation – currently |
| either ``asyncio`` or ``trio`` |
| :param backend_options: keyword arguments to call the backend ``run()`` |
| implementation with (documented :ref:`here <backend options>`) |
| :return: the return value of the coroutine function |
| :raises RuntimeError: if an asynchronous event loop is already running in this |
| thread |
| :raises LookupError: if the named backend is not found |
| |
| """ |
| if asynclib_name := current_async_library(): |
| raise RuntimeError(f"Already running {asynclib_name} in this thread") |
|
|
| try: |
| async_backend = get_async_backend(backend) |
| except ImportError as exc: |
| raise LookupError(f"No such backend: {backend}") from exc |
|
|
| token = None |
| if asynclib_name is None: |
| |
| |
| token = set_current_async_library(backend) |
|
|
| try: |
| backend_options = backend_options or {} |
| return async_backend.run(func, args, {}, backend_options) |
| finally: |
| reset_current_async_library(token) |
|
|
|
|
| async def sleep(delay: float) -> None: |
| """ |
| Pause the current task for the specified duration. |
| |
| :param delay: the duration, in seconds |
| |
| """ |
| return await get_async_backend().sleep(delay) |
|
|
|
|
| async def sleep_forever() -> None: |
| """ |
| Pause the current task until it's cancelled. |
| |
| This is a shortcut for ``sleep(math.inf)``. |
| |
| .. versionadded:: 3.1 |
| |
| """ |
| await sleep(math.inf) |
|
|
|
|
| async def sleep_until(deadline: float) -> None: |
| """ |
| Pause the current task until the given time. |
| |
| :param deadline: the absolute time to wake up at (according to the internal |
| monotonic clock of the event loop) |
| |
| .. versionadded:: 3.1 |
| |
| """ |
| now = current_time() |
| await sleep(max(deadline - now, 0)) |
|
|
|
|
| def current_time() -> float: |
| """ |
| Return the current value of the event loop's internal clock. |
| |
| :return: the clock value (seconds) |
| :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| current thread |
| |
| """ |
| return get_async_backend().current_time() |
|
|
|
|
| def get_all_backends() -> tuple[str, ...]: |
| """Return a tuple of the names of all built-in backends.""" |
| return BACKENDS |
|
|
|
|
| def get_available_backends() -> tuple[str, ...]: |
| """ |
| Test for the availability of built-in backends. |
| |
| :return a tuple of the built-in backend names that were successfully imported |
| |
| .. versionadded:: 4.12 |
| |
| """ |
| available_backends: list[str] = [] |
| for backend_name in get_all_backends(): |
| try: |
| get_async_backend(backend_name) |
| except ImportError: |
| continue |
|
|
| available_backends.append(backend_name) |
|
|
| return tuple(available_backends) |
|
|
|
|
| def get_cancelled_exc_class() -> type[BaseException]: |
| """ |
| Return the current async library's cancellation exception class. |
| |
| :raises NoEventLoopError: if no supported asynchronous event loop is running in the |
| current thread |
| |
| """ |
| return get_async_backend().cancelled_exception_class() |
|
|
|
|
| |
| |
| |
|
|
|
|
| @contextmanager |
| def claim_worker_thread( |
| backend_class: type[AsyncBackend], token: object |
| ) -> Generator[Any, None, None]: |
| from ..lowlevel import EventLoopToken |
|
|
| threadlocals.current_token = EventLoopToken(backend_class, token) |
| try: |
| yield |
| finally: |
| del threadlocals.current_token |
|
|
|
|
| def get_async_backend(asynclib_name: str | None = None) -> type[AsyncBackend]: |
| if asynclib_name is None: |
| asynclib_name = current_async_library() |
| if not asynclib_name: |
| raise NoEventLoopError( |
| f"Not currently running on any asynchronous event loop. " |
| f"Available async backends: {', '.join(get_all_backends())}" |
| ) |
|
|
| |
| |
| |
| try: |
| return loaded_backends[asynclib_name] |
| except KeyError: |
| module = import_module(f"anyio._backends._{asynclib_name}") |
| loaded_backends[asynclib_name] = module.backend_class |
| return module.backend_class |
|
|
|
|
| def current_async_library() -> str | None: |
| if sniffio is None: |
| |
| import asyncio |
|
|
| try: |
| asyncio.get_running_loop() |
| return "asyncio" |
| except RuntimeError: |
| pass |
| else: |
| try: |
| return sniffio.current_async_library() |
| except sniffio.AsyncLibraryNotFoundError: |
| pass |
|
|
| return None |
|
|
|
|
| def set_current_async_library(asynclib_name: str | None) -> Token | None: |
| |
| if sniffio is None: |
| return None |
|
|
| return sniffio.current_async_library_cvar.set(asynclib_name) |
|
|
|
|
| def reset_current_async_library(token: Token | None) -> None: |
| if token is not None: |
| sniffio.current_async_library_cvar.reset(token) |
|
|