Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
import math | |
from collections.abc import Generator | |
from contextlib import contextmanager | |
from types import TracebackType | |
from ..abc._tasks import TaskGroup, TaskStatus | |
from ._eventloop import get_async_backend | |
class _IgnoredTaskStatus(TaskStatus[object]): | |
def started(self, value: object = None) -> None: | |
pass | |
TASK_STATUS_IGNORED = _IgnoredTaskStatus() | |
class CancelScope: | |
""" | |
Wraps a unit of work that can be made separately cancellable. | |
:param deadline: The time (clock value) when this scope is cancelled automatically | |
:param shield: ``True`` to shield the cancel scope from external cancellation | |
""" | |
def __new__( | |
cls, *, deadline: float = math.inf, shield: bool = False | |
) -> CancelScope: | |
return get_async_backend().create_cancel_scope(shield=shield, deadline=deadline) | |
def cancel(self) -> None: | |
"""Cancel this scope immediately.""" | |
raise NotImplementedError | |
def deadline(self) -> float: | |
""" | |
The time (clock value) when this scope is cancelled automatically. | |
Will be ``float('inf')`` if no timeout has been set. | |
""" | |
raise NotImplementedError | |
def deadline(self, value: float) -> None: | |
raise NotImplementedError | |
def cancel_called(self) -> bool: | |
"""``True`` if :meth:`cancel` has been called.""" | |
raise NotImplementedError | |
def cancelled_caught(self) -> bool: | |
""" | |
``True`` if this scope suppressed a cancellation exception it itself raised. | |
This is typically used to check if any work was interrupted, or to see if the | |
scope was cancelled due to its deadline being reached. The value will, however, | |
only be ``True`` if the cancellation was triggered by the scope itself (and not | |
an outer scope). | |
""" | |
raise NotImplementedError | |
def shield(self) -> bool: | |
""" | |
``True`` if this scope is shielded from external cancellation. | |
While a scope is shielded, it will not receive cancellations from outside. | |
""" | |
raise NotImplementedError | |
def shield(self, value: bool) -> None: | |
raise NotImplementedError | |
def __enter__(self) -> CancelScope: | |
raise NotImplementedError | |
def __exit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc_val: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> bool | None: | |
raise NotImplementedError | |
def fail_after( | |
delay: float | None, shield: bool = False | |
) -> Generator[CancelScope, None, None]: | |
""" | |
Create a context manager which raises a :class:`TimeoutError` if does not finish in | |
time. | |
:param delay: maximum allowed time (in seconds) before raising the exception, or | |
``None`` to disable the timeout | |
:param shield: ``True`` to shield the cancel scope from external cancellation | |
:return: a context manager that yields a cancel scope | |
:rtype: :class:`~typing.ContextManager`\\[:class:`~anyio.CancelScope`\\] | |
""" | |
current_time = get_async_backend().current_time | |
deadline = (current_time() + delay) if delay is not None else math.inf | |
with get_async_backend().create_cancel_scope( | |
deadline=deadline, shield=shield | |
) as cancel_scope: | |
yield cancel_scope | |
if cancel_scope.cancelled_caught and current_time() >= cancel_scope.deadline: | |
raise TimeoutError | |
def move_on_after(delay: float | None, shield: bool = False) -> CancelScope: | |
""" | |
Create a cancel scope with a deadline that expires after the given delay. | |
:param delay: maximum allowed time (in seconds) before exiting the context block, or | |
``None`` to disable the timeout | |
:param shield: ``True`` to shield the cancel scope from external cancellation | |
:return: a cancel scope | |
""" | |
deadline = ( | |
(get_async_backend().current_time() + delay) if delay is not None else math.inf | |
) | |
return get_async_backend().create_cancel_scope(deadline=deadline, shield=shield) | |
def current_effective_deadline() -> float: | |
""" | |
Return the nearest deadline among all the cancel scopes effective for the current | |
task. | |
:return: a clock value from the event loop's internal clock (or ``float('inf')`` if | |
there is no deadline in effect, or ``float('-inf')`` if the current scope has | |
been cancelled) | |
:rtype: float | |
""" | |
return get_async_backend().current_effective_deadline() | |
def create_task_group() -> TaskGroup: | |
""" | |
Create a task group. | |
:return: a task group | |
""" | |
return get_async_backend().create_task_group() | |