Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
from collections import deque | |
from dataclasses import dataclass | |
from types import TracebackType | |
from ..lowlevel import cancel_shielded_checkpoint, checkpoint, checkpoint_if_cancelled | |
from ._eventloop import get_async_backend | |
from ._exceptions import BusyResourceError, WouldBlock | |
from ._tasks import CancelScope | |
from ._testing import TaskInfo, get_current_task | |
class EventStatistics: | |
""" | |
:ivar int tasks_waiting: number of tasks waiting on :meth:`~.Event.wait` | |
""" | |
tasks_waiting: int | |
class CapacityLimiterStatistics: | |
""" | |
:ivar int borrowed_tokens: number of tokens currently borrowed by tasks | |
:ivar float total_tokens: total number of available tokens | |
:ivar tuple borrowers: tasks or other objects currently holding tokens borrowed from | |
this limiter | |
:ivar int tasks_waiting: number of tasks waiting on | |
:meth:`~.CapacityLimiter.acquire` or | |
:meth:`~.CapacityLimiter.acquire_on_behalf_of` | |
""" | |
borrowed_tokens: int | |
total_tokens: float | |
borrowers: tuple[object, ...] | |
tasks_waiting: int | |
class LockStatistics: | |
""" | |
:ivar bool locked: flag indicating if this lock is locked or not | |
:ivar ~anyio.TaskInfo owner: task currently holding the lock (or ``None`` if the | |
lock is not held by any task) | |
:ivar int tasks_waiting: number of tasks waiting on :meth:`~.Lock.acquire` | |
""" | |
locked: bool | |
owner: TaskInfo | None | |
tasks_waiting: int | |
class ConditionStatistics: | |
""" | |
:ivar int tasks_waiting: number of tasks blocked on :meth:`~.Condition.wait` | |
:ivar ~anyio.LockStatistics lock_statistics: statistics of the underlying | |
:class:`~.Lock` | |
""" | |
tasks_waiting: int | |
lock_statistics: LockStatistics | |
class SemaphoreStatistics: | |
""" | |
:ivar int tasks_waiting: number of tasks waiting on :meth:`~.Semaphore.acquire` | |
""" | |
tasks_waiting: int | |
class Event: | |
def __new__(cls) -> Event: | |
return get_async_backend().create_event() | |
def set(self) -> None: | |
"""Set the flag, notifying all listeners.""" | |
raise NotImplementedError | |
def is_set(self) -> bool: | |
"""Return ``True`` if the flag is set, ``False`` if not.""" | |
raise NotImplementedError | |
async def wait(self) -> None: | |
""" | |
Wait until the flag has been set. | |
If the flag has already been set when this method is called, it returns | |
immediately. | |
""" | |
raise NotImplementedError | |
def statistics(self) -> EventStatistics: | |
"""Return statistics about the current state of this event.""" | |
raise NotImplementedError | |
class Lock: | |
_owner_task: TaskInfo | None = None | |
def __init__(self) -> None: | |
self._waiters: deque[tuple[TaskInfo, Event]] = deque() | |
async def __aenter__(self) -> None: | |
await self.acquire() | |
async def __aexit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc_val: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> None: | |
self.release() | |
async def acquire(self) -> None: | |
"""Acquire the lock.""" | |
await checkpoint_if_cancelled() | |
try: | |
self.acquire_nowait() | |
except WouldBlock: | |
task = get_current_task() | |
event = Event() | |
token = task, event | |
self._waiters.append(token) | |
try: | |
await event.wait() | |
except BaseException: | |
if not event.is_set(): | |
self._waiters.remove(token) | |
elif self._owner_task == task: | |
self.release() | |
raise | |
assert self._owner_task == task | |
else: | |
try: | |
await cancel_shielded_checkpoint() | |
except BaseException: | |
self.release() | |
raise | |
def acquire_nowait(self) -> None: | |
""" | |
Acquire the lock, without blocking. | |
:raises ~anyio.WouldBlock: if the operation would block | |
""" | |
task = get_current_task() | |
if self._owner_task == task: | |
raise RuntimeError("Attempted to acquire an already held Lock") | |
if self._owner_task is not None: | |
raise WouldBlock | |
self._owner_task = task | |
def release(self) -> None: | |
"""Release the lock.""" | |
if self._owner_task != get_current_task(): | |
raise RuntimeError("The current task is not holding this lock") | |
if self._waiters: | |
self._owner_task, event = self._waiters.popleft() | |
event.set() | |
else: | |
del self._owner_task | |
def locked(self) -> bool: | |
"""Return True if the lock is currently held.""" | |
return self._owner_task is not None | |
def statistics(self) -> LockStatistics: | |
""" | |
Return statistics about the current state of this lock. | |
.. versionadded:: 3.0 | |
""" | |
return LockStatistics(self.locked(), self._owner_task, len(self._waiters)) | |
class Condition: | |
_owner_task: TaskInfo | None = None | |
def __init__(self, lock: Lock | None = None): | |
self._lock = lock or Lock() | |
self._waiters: deque[Event] = deque() | |
async def __aenter__(self) -> None: | |
await self.acquire() | |
async def __aexit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc_val: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> None: | |
self.release() | |
def _check_acquired(self) -> None: | |
if self._owner_task != get_current_task(): | |
raise RuntimeError("The current task is not holding the underlying lock") | |
async def acquire(self) -> None: | |
"""Acquire the underlying lock.""" | |
await self._lock.acquire() | |
self._owner_task = get_current_task() | |
def acquire_nowait(self) -> None: | |
""" | |
Acquire the underlying lock, without blocking. | |
:raises ~anyio.WouldBlock: if the operation would block | |
""" | |
self._lock.acquire_nowait() | |
self._owner_task = get_current_task() | |
def release(self) -> None: | |
"""Release the underlying lock.""" | |
self._lock.release() | |
def locked(self) -> bool: | |
"""Return True if the lock is set.""" | |
return self._lock.locked() | |
def notify(self, n: int = 1) -> None: | |
"""Notify exactly n listeners.""" | |
self._check_acquired() | |
for _ in range(n): | |
try: | |
event = self._waiters.popleft() | |
except IndexError: | |
break | |
event.set() | |
def notify_all(self) -> None: | |
"""Notify all the listeners.""" | |
self._check_acquired() | |
for event in self._waiters: | |
event.set() | |
self._waiters.clear() | |
async def wait(self) -> None: | |
"""Wait for a notification.""" | |
await checkpoint() | |
event = Event() | |
self._waiters.append(event) | |
self.release() | |
try: | |
await event.wait() | |
except BaseException: | |
if not event.is_set(): | |
self._waiters.remove(event) | |
raise | |
finally: | |
with CancelScope(shield=True): | |
await self.acquire() | |
def statistics(self) -> ConditionStatistics: | |
""" | |
Return statistics about the current state of this condition. | |
.. versionadded:: 3.0 | |
""" | |
return ConditionStatistics(len(self._waiters), self._lock.statistics()) | |
class Semaphore: | |
def __init__(self, initial_value: int, *, max_value: int | None = None): | |
if not isinstance(initial_value, int): | |
raise TypeError("initial_value must be an integer") | |
if initial_value < 0: | |
raise ValueError("initial_value must be >= 0") | |
if max_value is not None: | |
if not isinstance(max_value, int): | |
raise TypeError("max_value must be an integer or None") | |
if max_value < initial_value: | |
raise ValueError( | |
"max_value must be equal to or higher than initial_value" | |
) | |
self._value = initial_value | |
self._max_value = max_value | |
self._waiters: deque[Event] = deque() | |
async def __aenter__(self) -> Semaphore: | |
await self.acquire() | |
return self | |
async def __aexit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc_val: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> None: | |
self.release() | |
async def acquire(self) -> None: | |
"""Decrement the semaphore value, blocking if necessary.""" | |
await checkpoint_if_cancelled() | |
try: | |
self.acquire_nowait() | |
except WouldBlock: | |
event = Event() | |
self._waiters.append(event) | |
try: | |
await event.wait() | |
except BaseException: | |
if not event.is_set(): | |
self._waiters.remove(event) | |
else: | |
self.release() | |
raise | |
else: | |
try: | |
await cancel_shielded_checkpoint() | |
except BaseException: | |
self.release() | |
raise | |
def acquire_nowait(self) -> None: | |
""" | |
Acquire the underlying lock, without blocking. | |
:raises ~anyio.WouldBlock: if the operation would block | |
""" | |
if self._value == 0: | |
raise WouldBlock | |
self._value -= 1 | |
def release(self) -> None: | |
"""Increment the semaphore value.""" | |
if self._max_value is not None and self._value == self._max_value: | |
raise ValueError("semaphore released too many times") | |
if self._waiters: | |
self._waiters.popleft().set() | |
else: | |
self._value += 1 | |
def value(self) -> int: | |
"""The current value of the semaphore.""" | |
return self._value | |
def max_value(self) -> int | None: | |
"""The maximum value of the semaphore.""" | |
return self._max_value | |
def statistics(self) -> SemaphoreStatistics: | |
""" | |
Return statistics about the current state of this semaphore. | |
.. versionadded:: 3.0 | |
""" | |
return SemaphoreStatistics(len(self._waiters)) | |
class CapacityLimiter: | |
def __new__(cls, total_tokens: float) -> CapacityLimiter: | |
return get_async_backend().create_capacity_limiter(total_tokens) | |
async def __aenter__(self) -> None: | |
raise NotImplementedError | |
async def __aexit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc_val: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> bool | None: | |
raise NotImplementedError | |
def total_tokens(self) -> float: | |
""" | |
The total number of tokens available for borrowing. | |
This is a read-write property. If the total number of tokens is increased, the | |
proportionate number of tasks waiting on this limiter will be granted their | |
tokens. | |
.. versionchanged:: 3.0 | |
The property is now writable. | |
""" | |
raise NotImplementedError | |
def total_tokens(self, value: float) -> None: | |
raise NotImplementedError | |
def borrowed_tokens(self) -> int: | |
"""The number of tokens that have currently been borrowed.""" | |
raise NotImplementedError | |
def available_tokens(self) -> float: | |
"""The number of tokens currently available to be borrowed""" | |
raise NotImplementedError | |
def acquire_nowait(self) -> None: | |
""" | |
Acquire a token for the current task without waiting for one to become | |
available. | |
:raises ~anyio.WouldBlock: if there are no tokens available for borrowing | |
""" | |
raise NotImplementedError | |
def acquire_on_behalf_of_nowait(self, borrower: object) -> None: | |
""" | |
Acquire a token without waiting for one to become available. | |
:param borrower: the entity borrowing a token | |
:raises ~anyio.WouldBlock: if there are no tokens available for borrowing | |
""" | |
raise NotImplementedError | |
async def acquire(self) -> None: | |
""" | |
Acquire a token for the current task, waiting if necessary for one to become | |
available. | |
""" | |
raise NotImplementedError | |
async def acquire_on_behalf_of(self, borrower: object) -> None: | |
""" | |
Acquire a token, waiting if necessary for one to become available. | |
:param borrower: the entity borrowing a token | |
""" | |
raise NotImplementedError | |
def release(self) -> None: | |
""" | |
Release the token held by the current task. | |
:raises RuntimeError: if the current task has not borrowed a token from this | |
limiter. | |
""" | |
raise NotImplementedError | |
def release_on_behalf_of(self, borrower: object) -> None: | |
""" | |
Release the token held by the given borrower. | |
:raises RuntimeError: if the borrower has not borrowed a token from this | |
limiter. | |
""" | |
raise NotImplementedError | |
def statistics(self) -> CapacityLimiterStatistics: | |
""" | |
Return statistics about the current state of this limiter. | |
.. versionadded:: 3.0 | |
""" | |
raise NotImplementedError | |
class ResourceGuard: | |
""" | |
A context manager for ensuring that a resource is only used by a single task at a | |
time. | |
Entering this context manager while the previous has not exited it yet will trigger | |
:exc:`BusyResourceError`. | |
:param action: the action to guard against (visible in the :exc:`BusyResourceError` | |
when triggered, e.g. "Another task is already {action} this resource") | |
""" | |
__slots__ = "action", "_guarded" | |
def __init__(self, action: str = "using"): | |
self.action: str = action | |
self._guarded = False | |
def __enter__(self) -> None: | |
if self._guarded: | |
raise BusyResourceError(self.action) | |
self._guarded = True | |
def __exit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc_val: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> bool | None: | |
self._guarded = False | |
return None | |