Spaces:
Build error
Build error
__all__ = 'run', | |
from . import coroutines | |
from . import events | |
from . import tasks | |
def run(main, *, debug=None): | |
"""Execute the coroutine and return the result. | |
This function runs the passed coroutine, taking care of | |
managing the asyncio event loop and finalizing asynchronous | |
generators. | |
This function cannot be called when another asyncio event loop is | |
running in the same thread. | |
If debug is True, the event loop will be run in debug mode. | |
This function always creates a new event loop and closes it at the end. | |
It should be used as a main entry point for asyncio programs, and should | |
ideally only be called once. | |
Example: | |
async def main(): | |
await asyncio.sleep(1) | |
print('hello') | |
asyncio.run(main()) | |
""" | |
if events._get_running_loop() is not None: | |
raise RuntimeError( | |
"asyncio.run() cannot be called from a running event loop") | |
if not coroutines.iscoroutine(main): | |
raise ValueError("a coroutine was expected, got {!r}".format(main)) | |
loop = events.new_event_loop() | |
try: | |
events.set_event_loop(loop) | |
if debug is not None: | |
loop.set_debug(debug) | |
return loop.run_until_complete(main) | |
finally: | |
try: | |
_cancel_all_tasks(loop) | |
loop.run_until_complete(loop.shutdown_asyncgens()) | |
loop.run_until_complete(loop.shutdown_default_executor()) | |
finally: | |
events.set_event_loop(None) | |
loop.close() | |
def _cancel_all_tasks(loop): | |
to_cancel = tasks.all_tasks(loop) | |
if not to_cancel: | |
return | |
for task in to_cancel: | |
task.cancel() | |
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) | |
for task in to_cancel: | |
if task.cancelled(): | |
continue | |
if task.exception() is not None: | |
loop.call_exception_handler({ | |
'message': 'unhandled exception during asyncio.run() shutdown', | |
'exception': task.exception(), | |
'task': task, | |
}) | |