Spaces:
Build error
Build error
import ast | |
import asyncio | |
import code | |
import concurrent.futures | |
import inspect | |
import sys | |
import threading | |
import types | |
import warnings | |
from . import futures | |
class AsyncIOInteractiveConsole(code.InteractiveConsole): | |
def __init__(self, locals, loop): | |
super().__init__(locals) | |
self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT | |
self.loop = loop | |
def runcode(self, code): | |
future = concurrent.futures.Future() | |
def callback(): | |
global repl_future | |
global repl_future_interrupted | |
repl_future = None | |
repl_future_interrupted = False | |
func = types.FunctionType(code, self.locals) | |
try: | |
coro = func() | |
except SystemExit: | |
raise | |
except KeyboardInterrupt as ex: | |
repl_future_interrupted = True | |
future.set_exception(ex) | |
return | |
except BaseException as ex: | |
future.set_exception(ex) | |
return | |
if not inspect.iscoroutine(coro): | |
future.set_result(coro) | |
return | |
try: | |
repl_future = self.loop.create_task(coro) | |
futures._chain_future(repl_future, future) | |
except BaseException as exc: | |
future.set_exception(exc) | |
loop.call_soon_threadsafe(callback) | |
try: | |
return future.result() | |
except SystemExit: | |
raise | |
except BaseException: | |
if repl_future_interrupted: | |
self.write("\nKeyboardInterrupt\n") | |
else: | |
self.showtraceback() | |
class REPLThread(threading.Thread): | |
def run(self): | |
try: | |
banner = ( | |
f'asyncio REPL {sys.version} on {sys.platform}\n' | |
f'Use "await" directly instead of "asyncio.run()".\n' | |
f'Type "help", "copyright", "credits" or "license" ' | |
f'for more information.\n' | |
f'{getattr(sys, "ps1", ">>> ")}import asyncio' | |
) | |
console.interact( | |
banner=banner, | |
exitmsg='exiting asyncio REPL...') | |
finally: | |
warnings.filterwarnings( | |
'ignore', | |
message=r'^coroutine .* was never awaited$', | |
category=RuntimeWarning) | |
loop.call_soon_threadsafe(loop.stop) | |
if __name__ == '__main__': | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
repl_locals = {'asyncio': asyncio} | |
for key in {'__name__', '__package__', | |
'__loader__', '__spec__', | |
'__builtins__', '__file__'}: | |
repl_locals[key] = locals()[key] | |
console = AsyncIOInteractiveConsole(repl_locals, loop) | |
repl_future = None | |
repl_future_interrupted = False | |
try: | |
import readline # NoQA | |
except ImportError: | |
pass | |
repl_thread = REPLThread() | |
repl_thread.daemon = True | |
repl_thread.start() | |
while True: | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
if repl_future and not repl_future.done(): | |
repl_future.cancel() | |
repl_future_interrupted = True | |
continue | |
else: | |
break | |