|
""" |
|
Based on sacred/stdout_capturing.py in project Sacred |
|
https://github.com/IDSIA/sacred |
|
|
|
Author: Paul-Edouard Sarlin (skydes) |
|
""" |
|
|
|
from __future__ import division, print_function, unicode_literals |
|
|
|
import os |
|
import subprocess |
|
import sys |
|
from contextlib import contextmanager |
|
from threading import Timer |
|
|
|
|
|
|
|
|
|
|
|
def apply_backspaces_and_linefeeds(text): |
|
""" |
|
Interpret backspaces and linefeeds in text like a terminal would. |
|
Interpret text like a terminal by removing backspace and linefeed |
|
characters and applying them line by line. |
|
If final line ends with a carriage it keeps it to be concatenable with next |
|
output chunk. |
|
""" |
|
orig_lines = text.split("\n") |
|
orig_lines_len = len(orig_lines) |
|
new_lines = [] |
|
for orig_line_idx, orig_line in enumerate(orig_lines): |
|
chars, cursor = [], 0 |
|
orig_line_len = len(orig_line) |
|
for orig_char_idx, orig_char in enumerate(orig_line): |
|
if orig_char == "\r" and ( |
|
orig_char_idx != orig_line_len - 1 or orig_line_idx != orig_lines_len - 1 |
|
): |
|
cursor = 0 |
|
elif orig_char == "\b": |
|
cursor = max(0, cursor - 1) |
|
else: |
|
if orig_char == "\r": |
|
cursor = len(chars) |
|
if cursor == len(chars): |
|
chars.append(orig_char) |
|
else: |
|
chars[cursor] = orig_char |
|
cursor += 1 |
|
new_lines.append("".join(chars)) |
|
return "\n".join(new_lines) |
|
|
|
|
|
def flush(): |
|
"""Try to flush all stdio buffers, both from python and from C.""" |
|
try: |
|
sys.stdout.flush() |
|
sys.stderr.flush() |
|
except (AttributeError, ValueError, IOError): |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
@contextmanager |
|
def capture_outputs(filename): |
|
"""Duplicate stdout and stderr to a file on the file descriptor level.""" |
|
with open(str(filename), "a+") as target: |
|
original_stdout_fd = 1 |
|
original_stderr_fd = 2 |
|
target_fd = target.fileno() |
|
|
|
|
|
saved_stdout_fd = os.dup(original_stdout_fd) |
|
saved_stderr_fd = os.dup(original_stderr_fd) |
|
|
|
tee_stdout = subprocess.Popen( |
|
["tee", "-a", "-i", "/dev/stderr"], |
|
start_new_session=True, |
|
stdin=subprocess.PIPE, |
|
stderr=target_fd, |
|
stdout=1, |
|
) |
|
tee_stderr = subprocess.Popen( |
|
["tee", "-a", "-i", "/dev/stderr"], |
|
start_new_session=True, |
|
stdin=subprocess.PIPE, |
|
stderr=target_fd, |
|
stdout=2, |
|
) |
|
|
|
flush() |
|
os.dup2(tee_stdout.stdin.fileno(), original_stdout_fd) |
|
os.dup2(tee_stderr.stdin.fileno(), original_stderr_fd) |
|
|
|
try: |
|
yield |
|
finally: |
|
flush() |
|
|
|
|
|
tee_stdout.stdin.close() |
|
tee_stderr.stdin.close() |
|
|
|
|
|
os.dup2(saved_stdout_fd, original_stdout_fd) |
|
os.dup2(saved_stderr_fd, original_stderr_fd) |
|
|
|
|
|
|
|
def kill_tees(): |
|
tee_stdout.kill() |
|
tee_stderr.kill() |
|
|
|
tee_timer = Timer(1, kill_tees) |
|
try: |
|
tee_timer.start() |
|
tee_stdout.wait() |
|
tee_stderr.wait() |
|
finally: |
|
tee_timer.cancel() |
|
|
|
os.close(saved_stdout_fd) |
|
os.close(saved_stderr_fd) |
|
|
|
|
|
with open(str(filename), "r") as target: |
|
text = target.read() |
|
text = apply_backspaces_and_linefeeds(text) |
|
with open(str(filename), "w") as target: |
|
target.write(text) |
|
|