|
|
__all__ = ('Runner', 'run') |
|
|
|
|
|
import contextvars |
|
|
import enum |
|
|
import functools |
|
|
import threading |
|
|
import signal |
|
|
import sys |
|
|
from . import coroutines |
|
|
from . import events |
|
|
from . import exceptions |
|
|
from . import tasks |
|
|
|
|
|
|
|
|
class _State(enum.Enum): |
|
|
CREATED = "created" |
|
|
INITIALIZED = "initialized" |
|
|
CLOSED = "closed" |
|
|
|
|
|
|
|
|
class Runner: |
|
|
"""A context manager that controls event loop life cycle. |
|
|
|
|
|
The context manager always creates a new event loop, |
|
|
allows to run async functions inside it, |
|
|
and properly finalizes the loop at the context manager exit. |
|
|
|
|
|
If debug is True, the event loop will be run in debug mode. |
|
|
If loop_factory is passed, it is used for new event loop creation. |
|
|
|
|
|
asyncio.run(main(), debug=True) |
|
|
|
|
|
is a shortcut for |
|
|
|
|
|
with asyncio.Runner(debug=True) as runner: |
|
|
runner.run(main()) |
|
|
|
|
|
The run() method can be called multiple times within the runner's context. |
|
|
|
|
|
This can be useful for interactive console (e.g. IPython), |
|
|
unittest runners, console tools, -- everywhere when async code |
|
|
is called from existing sync framework and where the preferred single |
|
|
asyncio.run() call doesn't work. |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, *, debug=None, loop_factory=None): |
|
|
self._state = _State.CREATED |
|
|
self._debug = debug |
|
|
self._loop_factory = loop_factory |
|
|
self._loop = None |
|
|
self._context = None |
|
|
self._interrupt_count = 0 |
|
|
self._set_event_loop = False |
|
|
|
|
|
def __enter__(self): |
|
|
self._lazy_init() |
|
|
return self |
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
|
self.close() |
|
|
|
|
|
def close(self): |
|
|
"""Shutdown and close event loop.""" |
|
|
if self._state is not _State.INITIALIZED: |
|
|
return |
|
|
try: |
|
|
loop = self._loop |
|
|
_cancel_all_tasks(loop) |
|
|
loop.run_until_complete(loop.shutdown_asyncgens()) |
|
|
loop.run_until_complete(loop.shutdown_default_executor()) |
|
|
finally: |
|
|
if self._set_event_loop: |
|
|
events.set_event_loop(None) |
|
|
loop.close() |
|
|
self._loop = None |
|
|
self._state = _State.CLOSED |
|
|
|
|
|
def get_loop(self): |
|
|
"""Return embedded event loop.""" |
|
|
self._lazy_init() |
|
|
return self._loop |
|
|
|
|
|
def run(self, coro, *, context=None): |
|
|
"""Run a coroutine inside the embedded event loop.""" |
|
|
if not coroutines.iscoroutine(coro): |
|
|
raise ValueError("a coroutine was expected, got {!r}".format(coro)) |
|
|
|
|
|
if events._get_running_loop() is not None: |
|
|
|
|
|
raise RuntimeError( |
|
|
"Runner.run() cannot be called from a running event loop") |
|
|
|
|
|
self._lazy_init() |
|
|
|
|
|
if context is None: |
|
|
context = self._context |
|
|
task = self._loop.create_task(coro, context=context) |
|
|
|
|
|
if (threading.current_thread() is threading.main_thread() |
|
|
and signal.getsignal(signal.SIGINT) is signal.default_int_handler |
|
|
): |
|
|
sigint_handler = functools.partial(self._on_sigint, main_task=task) |
|
|
try: |
|
|
signal.signal(signal.SIGINT, sigint_handler) |
|
|
except ValueError: |
|
|
|
|
|
|
|
|
|
|
|
sigint_handler = None |
|
|
else: |
|
|
sigint_handler = None |
|
|
|
|
|
self._interrupt_count = 0 |
|
|
try: |
|
|
return self._loop.run_until_complete(task) |
|
|
except exceptions.CancelledError: |
|
|
if self._interrupt_count > 0: |
|
|
uncancel = getattr(task, "uncancel", None) |
|
|
if uncancel is not None and uncancel() == 0: |
|
|
raise KeyboardInterrupt() |
|
|
raise |
|
|
finally: |
|
|
if (sigint_handler is not None |
|
|
and signal.getsignal(signal.SIGINT) is sigint_handler |
|
|
): |
|
|
signal.signal(signal.SIGINT, signal.default_int_handler) |
|
|
|
|
|
def _lazy_init(self): |
|
|
if self._state is _State.CLOSED: |
|
|
raise RuntimeError("Runner is closed") |
|
|
if self._state is _State.INITIALIZED: |
|
|
return |
|
|
if self._loop_factory is None: |
|
|
self._loop = events.new_event_loop() |
|
|
if not self._set_event_loop: |
|
|
|
|
|
|
|
|
events.set_event_loop(self._loop) |
|
|
self._set_event_loop = True |
|
|
else: |
|
|
self._loop = self._loop_factory() |
|
|
if self._debug is not None: |
|
|
self._loop.set_debug(self._debug) |
|
|
self._context = contextvars.copy_context() |
|
|
self._state = _State.INITIALIZED |
|
|
|
|
|
def _on_sigint(self, signum, frame, main_task): |
|
|
self._interrupt_count += 1 |
|
|
if self._interrupt_count == 1 and not main_task.done(): |
|
|
main_task.cancel() |
|
|
|
|
|
self._loop.call_soon_threadsafe(lambda: None) |
|
|
return |
|
|
raise KeyboardInterrupt() |
|
|
|
|
|
|
|
|
def run(main, *, debug=None): |
|
|
"""Execute the coroutine and return the result. |
|
|
|
|
|
This function runs the passed coroutine, taking care of |
|
|
managing the asyncio event loop and finalizing asynchronous |
|
|
generators. |
|
|
|
|
|
This function cannot be called when another asyncio event loop is |
|
|
running in the same thread. |
|
|
|
|
|
If debug is True, the event loop will be run in debug mode. |
|
|
|
|
|
This function always creates a new event loop and closes it at the end. |
|
|
It should be used as a main entry point for asyncio programs, and should |
|
|
ideally only be called once. |
|
|
|
|
|
Example: |
|
|
|
|
|
async def main(): |
|
|
await asyncio.sleep(1) |
|
|
print('hello') |
|
|
|
|
|
asyncio.run(main()) |
|
|
""" |
|
|
if events._get_running_loop() is not None: |
|
|
|
|
|
raise RuntimeError( |
|
|
"asyncio.run() cannot be called from a running event loop") |
|
|
|
|
|
with Runner(debug=debug) as runner: |
|
|
return runner.run(main) |
|
|
|
|
|
|
|
|
def _cancel_all_tasks(loop): |
|
|
to_cancel = tasks.all_tasks(loop) |
|
|
if not to_cancel: |
|
|
return |
|
|
|
|
|
for task in to_cancel: |
|
|
task.cancel() |
|
|
|
|
|
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) |
|
|
|
|
|
for task in to_cancel: |
|
|
if task.cancelled(): |
|
|
continue |
|
|
if task.exception() is not None: |
|
|
loop.call_exception_handler({ |
|
|
'message': 'unhandled exception during asyncio.run() shutdown', |
|
|
'exception': task.exception(), |
|
|
'task': task, |
|
|
}) |
|
|
|