Spaces:
Running
Running
from __future__ import annotations | |
import math | |
import sys | |
from abc import ABCMeta, abstractmethod | |
from collections.abc import AsyncIterator, Awaitable, Mapping | |
from os import PathLike | |
from signal import Signals | |
from socket import AddressFamily, SocketKind, socket | |
from typing import ( | |
IO, | |
TYPE_CHECKING, | |
Any, | |
Callable, | |
ContextManager, | |
Sequence, | |
TypeVar, | |
overload, | |
) | |
if sys.version_info >= (3, 11): | |
from typing import TypeVarTuple, Unpack | |
else: | |
from typing_extensions import TypeVarTuple, Unpack | |
if TYPE_CHECKING: | |
from typing import Literal | |
from .._core._synchronization import CapacityLimiter, Event | |
from .._core._tasks import CancelScope | |
from .._core._testing import TaskInfo | |
from ..from_thread import BlockingPortal | |
from ._sockets import ( | |
ConnectedUDPSocket, | |
ConnectedUNIXDatagramSocket, | |
IPSockAddrType, | |
SocketListener, | |
SocketStream, | |
UDPSocket, | |
UNIXDatagramSocket, | |
UNIXSocketStream, | |
) | |
from ._subprocesses import Process | |
from ._tasks import TaskGroup | |
from ._testing import TestRunner | |
T_Retval = TypeVar("T_Retval") | |
PosArgsT = TypeVarTuple("PosArgsT") | |
class AsyncBackend(metaclass=ABCMeta): | |
def run( | |
cls, | |
func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], | |
args: tuple[Unpack[PosArgsT]], | |
kwargs: dict[str, Any], | |
options: dict[str, Any], | |
) -> T_Retval: | |
""" | |
Run the given coroutine function in an asynchronous event loop. | |
The current thread must not be already running an event loop. | |
:param func: a coroutine function | |
:param args: positional arguments to ``func`` | |
:param kwargs: positional arguments to ``func`` | |
:param options: keyword arguments to call the backend ``run()`` implementation | |
with | |
:return: the return value of the coroutine function | |
""" | |
def current_token(cls) -> object: | |
""" | |
:return: | |
""" | |
def current_time(cls) -> float: | |
""" | |
Return the current value of the event loop's internal clock. | |
:return: the clock value (seconds) | |
""" | |
def cancelled_exception_class(cls) -> type[BaseException]: | |
"""Return the exception class that is raised in a task if it's cancelled.""" | |
async def checkpoint(cls) -> None: | |
""" | |
Check if the task has been cancelled, and allow rescheduling of other tasks. | |
This is effectively the same as running :meth:`checkpoint_if_cancelled` and then | |
:meth:`cancel_shielded_checkpoint`. | |
""" | |
async def checkpoint_if_cancelled(cls) -> None: | |
""" | |
Check if the current task group has been cancelled. | |
This will check if the task has been cancelled, but will not allow other tasks | |
to be scheduled if not. | |
""" | |
if cls.current_effective_deadline() == -math.inf: | |
await cls.checkpoint() | |
async def cancel_shielded_checkpoint(cls) -> None: | |
""" | |
Allow the rescheduling of other tasks. | |
This will give other tasks the opportunity to run, but without checking if the | |
current task group has been cancelled, unlike with :meth:`checkpoint`. | |
""" | |
with cls.create_cancel_scope(shield=True): | |
await cls.sleep(0) | |
async def sleep(cls, delay: float) -> None: | |
""" | |
Pause the current task for the specified duration. | |
:param delay: the duration, in seconds | |
""" | |
def create_cancel_scope( | |
cls, *, deadline: float = math.inf, shield: bool = False | |
) -> CancelScope: | |
pass | |
def current_effective_deadline(cls) -> float: | |
""" | |
Return the nearest deadline among all the cancel scopes effective for the | |
current task. | |
:return: | |
- a clock value from the event loop's internal clock | |
- ``inf`` if there is no deadline in effect | |
- ``-inf`` if the current scope has been cancelled | |
:rtype: float | |
""" | |
def create_task_group(cls) -> TaskGroup: | |
pass | |
def create_event(cls) -> Event: | |
pass | |
def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter: | |
pass | |
async def run_sync_in_worker_thread( | |
cls, | |
func: Callable[[Unpack[PosArgsT]], T_Retval], | |
args: tuple[Unpack[PosArgsT]], | |
abandon_on_cancel: bool = False, | |
limiter: CapacityLimiter | None = None, | |
) -> T_Retval: | |
pass | |
def check_cancelled(cls) -> None: | |
pass | |
def run_async_from_thread( | |
cls, | |
func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]], | |
args: tuple[Unpack[PosArgsT]], | |
token: object, | |
) -> T_Retval: | |
pass | |
def run_sync_from_thread( | |
cls, | |
func: Callable[[Unpack[PosArgsT]], T_Retval], | |
args: tuple[Unpack[PosArgsT]], | |
token: object, | |
) -> T_Retval: | |
pass | |
def create_blocking_portal(cls) -> BlockingPortal: | |
pass | |
async def open_process( | |
cls, | |
command: str | bytes, | |
*, | |
shell: Literal[True], | |
stdin: int | IO[Any] | None, | |
stdout: int | IO[Any] | None, | |
stderr: int | IO[Any] | None, | |
cwd: str | bytes | PathLike[str] | None = None, | |
env: Mapping[str, str] | None = None, | |
start_new_session: bool = False, | |
) -> Process: | |
pass | |
async def open_process( | |
cls, | |
command: Sequence[str | bytes], | |
*, | |
shell: Literal[False], | |
stdin: int | IO[Any] | None, | |
stdout: int | IO[Any] | None, | |
stderr: int | IO[Any] | None, | |
cwd: str | bytes | PathLike[str] | None = None, | |
env: Mapping[str, str] | None = None, | |
start_new_session: bool = False, | |
) -> Process: | |
pass | |
async def open_process( | |
cls, | |
command: str | bytes | Sequence[str | bytes], | |
*, | |
shell: bool, | |
stdin: int | IO[Any] | None, | |
stdout: int | IO[Any] | None, | |
stderr: int | IO[Any] | None, | |
cwd: str | bytes | PathLike[str] | None = None, | |
env: Mapping[str, str] | None = None, | |
start_new_session: bool = False, | |
) -> Process: | |
pass | |
def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None: | |
pass | |
async def connect_tcp( | |
cls, host: str, port: int, local_address: IPSockAddrType | None = None | |
) -> SocketStream: | |
pass | |
async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream: | |
pass | |
def create_tcp_listener(cls, sock: socket) -> SocketListener: | |
pass | |
def create_unix_listener(cls, sock: socket) -> SocketListener: | |
pass | |
async def create_udp_socket( | |
cls, | |
family: AddressFamily, | |
local_address: IPSockAddrType | None, | |
remote_address: IPSockAddrType | None, | |
reuse_port: bool, | |
) -> UDPSocket | ConnectedUDPSocket: | |
pass | |
async def create_unix_datagram_socket( | |
cls, raw_socket: socket, remote_path: None | |
) -> UNIXDatagramSocket: | |
... | |
async def create_unix_datagram_socket( | |
cls, raw_socket: socket, remote_path: str | bytes | |
) -> ConnectedUNIXDatagramSocket: | |
... | |
async def create_unix_datagram_socket( | |
cls, raw_socket: socket, remote_path: str | bytes | None | |
) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket: | |
pass | |
async def getaddrinfo( | |
cls, | |
host: bytes | str | None, | |
port: str | int | None, | |
*, | |
family: int | AddressFamily = 0, | |
type: int | SocketKind = 0, | |
proto: int = 0, | |
flags: int = 0, | |
) -> list[ | |
tuple[ | |
AddressFamily, | |
SocketKind, | |
int, | |
str, | |
tuple[str, int] | tuple[str, int, int, int], | |
] | |
]: | |
pass | |
async def getnameinfo( | |
cls, sockaddr: IPSockAddrType, flags: int = 0 | |
) -> tuple[str, str]: | |
pass | |
async def wait_socket_readable(cls, sock: socket) -> None: | |
pass | |
async def wait_socket_writable(cls, sock: socket) -> None: | |
pass | |
def current_default_thread_limiter(cls) -> CapacityLimiter: | |
pass | |
def open_signal_receiver( | |
cls, *signals: Signals | |
) -> ContextManager[AsyncIterator[Signals]]: | |
pass | |
def get_current_task(cls) -> TaskInfo: | |
pass | |
def get_running_tasks(cls) -> list[TaskInfo]: | |
pass | |
async def wait_all_tasks_blocked(cls) -> None: | |
pass | |
def create_test_runner(cls, options: dict[str, Any]) -> TestRunner: | |
pass | |