|
""" |
|
Helper to run a script in a pseudo-terminal. |
|
""" |
|
import os |
|
import selectors |
|
import subprocess |
|
import sys |
|
from contextlib import ExitStack |
|
from errno import EIO |
|
|
|
from test.support.import_helper import import_module |
|
|
|
def run_pty(script, input=b"dummy input\r", env=None): |
|
pty = import_module('pty') |
|
output = bytearray() |
|
[master, slave] = pty.openpty() |
|
args = (sys.executable, '-c', script) |
|
proc = subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave, env=env) |
|
os.close(slave) |
|
with ExitStack() as cleanup: |
|
cleanup.enter_context(proc) |
|
def terminate(proc): |
|
try: |
|
proc.terminate() |
|
except ProcessLookupError: |
|
|
|
pass |
|
cleanup.callback(terminate, proc) |
|
cleanup.callback(os.close, master) |
|
|
|
|
|
|
|
|
|
|
|
sel = cleanup.enter_context(selectors.SelectSelector()) |
|
sel.register(master, selectors.EVENT_READ | selectors.EVENT_WRITE) |
|
os.set_blocking(master, False) |
|
while True: |
|
for [_, events] in sel.select(): |
|
if events & selectors.EVENT_READ: |
|
try: |
|
chunk = os.read(master, 0x10000) |
|
except OSError as err: |
|
|
|
if err.errno != EIO: |
|
raise |
|
chunk = b"" |
|
if not chunk: |
|
return output |
|
output.extend(chunk) |
|
if events & selectors.EVENT_WRITE: |
|
try: |
|
input = input[os.write(master, input):] |
|
except OSError as err: |
|
|
|
if err.errno != EIO: |
|
raise |
|
input = b"" |
|
if not input: |
|
sel.modify(master, selectors.EVENT_READ) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class FakeInput: |
|
""" |
|
A fake input stream for pdb's interactive debugger. Whenever a |
|
line is read, print it (to simulate the user typing it), and then |
|
return it. The set of lines to return is specified in the |
|
constructor; they should not have trailing newlines. |
|
""" |
|
def __init__(self, lines): |
|
self.lines = lines |
|
|
|
def readline(self): |
|
line = self.lines.pop(0) |
|
print(line) |
|
return line + '\n' |
|
|