Spaces:
Build error
Build error
"""Support for running coroutines in parallel with staggered start times.""" | |
__all__ = 'staggered_race', | |
import contextlib | |
import typing | |
from . import events | |
from . import exceptions as exceptions_mod | |
from . import locks | |
from . import tasks | |
async def staggered_race( | |
coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]], | |
delay: typing.Optional[float], | |
*, | |
loop: events.AbstractEventLoop = None, | |
) -> typing.Tuple[ | |
typing.Any, | |
typing.Optional[int], | |
typing.List[typing.Optional[Exception]] | |
]: | |
"""Run coroutines with staggered start times and take the first to finish. | |
This method takes an iterable of coroutine functions. The first one is | |
started immediately. From then on, whenever the immediately preceding one | |
fails (raises an exception), or when *delay* seconds has passed, the next | |
coroutine is started. This continues until one of the coroutines complete | |
successfully, in which case all others are cancelled, or until all | |
coroutines fail. | |
The coroutines provided should be well-behaved in the following way: | |
* They should only ``return`` if completed successfully. | |
* They should always raise an exception if they did not complete | |
successfully. In particular, if they handle cancellation, they should | |
probably reraise, like this:: | |
try: | |
# do work | |
except asyncio.CancelledError: | |
# undo partially completed work | |
raise | |
Args: | |
coro_fns: an iterable of coroutine functions, i.e. callables that | |
return a coroutine object when called. Use ``functools.partial`` or | |
lambdas to pass arguments. | |
delay: amount of time, in seconds, between starting coroutines. If | |
``None``, the coroutines will run sequentially. | |
loop: the event loop to use. | |
Returns: | |
tuple *(winner_result, winner_index, exceptions)* where | |
- *winner_result*: the result of the winning coroutine, or ``None`` | |
if no coroutines won. | |
- *winner_index*: the index of the winning coroutine in | |
``coro_fns``, or ``None`` if no coroutines won. If the winning | |
coroutine may return None on success, *winner_index* can be used | |
to definitively determine whether any coroutine won. | |
- *exceptions*: list of exceptions returned by the coroutines. | |
``len(exceptions)`` is equal to the number of coroutines actually | |
started, and the order is the same as in ``coro_fns``. The winning | |
coroutine's entry is ``None``. | |
""" | |
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns. | |
loop = loop or events.get_running_loop() | |
enum_coro_fns = enumerate(coro_fns) | |
winner_result = None | |
winner_index = None | |
exceptions = [] | |
running_tasks = [] | |
async def run_one_coro( | |
previous_failed: typing.Optional[locks.Event]) -> None: | |
# Wait for the previous task to finish, or for delay seconds | |
if previous_failed is not None: | |
with contextlib.suppress(exceptions_mod.TimeoutError): | |
# Use asyncio.wait_for() instead of asyncio.wait() here, so | |
# that if we get cancelled at this point, Event.wait() is also | |
# cancelled, otherwise there will be a "Task destroyed but it is | |
# pending" later. | |
await tasks.wait_for(previous_failed.wait(), delay) | |
# Get the next coroutine to run | |
try: | |
this_index, coro_fn = next(enum_coro_fns) | |
except StopIteration: | |
return | |
# Start task that will run the next coroutine | |
this_failed = locks.Event() | |
next_task = loop.create_task(run_one_coro(this_failed)) | |
running_tasks.append(next_task) | |
assert len(running_tasks) == this_index + 2 | |
# Prepare place to put this coroutine's exceptions if not won | |
exceptions.append(None) | |
assert len(exceptions) == this_index + 1 | |
try: | |
result = await coro_fn() | |
except (SystemExit, KeyboardInterrupt): | |
raise | |
except BaseException as e: | |
exceptions[this_index] = e | |
this_failed.set() # Kickstart the next coroutine | |
else: | |
# Store winner's results | |
nonlocal winner_index, winner_result | |
assert winner_index is None | |
winner_index = this_index | |
winner_result = result | |
# Cancel all other tasks. We take care to not cancel the current | |
# task as well. If we do so, then since there is no `await` after | |
# here and CancelledError are usually thrown at one, we will | |
# encounter a curious corner case where the current task will end | |
# up as done() == True, cancelled() == False, exception() == | |
# asyncio.CancelledError. This behavior is specified in | |
# https://bugs.python.org/issue30048 | |
for i, t in enumerate(running_tasks): | |
if i != this_index: | |
t.cancel() | |
first_task = loop.create_task(run_one_coro(None)) | |
running_tasks.append(first_task) | |
try: | |
# Wait for a growing list of tasks to all finish: poor man's version of | |
# curio's TaskGroup or trio's nursery | |
done_count = 0 | |
while done_count != len(running_tasks): | |
done, _ = await tasks.wait(running_tasks) | |
done_count = len(done) | |
# If run_one_coro raises an unhandled exception, it's probably a | |
# programming error, and I want to see it. | |
if __debug__: | |
for d in done: | |
if d.done() and not d.cancelled() and d.exception(): | |
raise d.exception() | |
return winner_result, winner_index, exceptions | |
finally: | |
# Make sure no tasks are left running if we leave this function | |
for t in running_tasks: | |
t.cancel() | |