|
""" |
|
Output for vt100 terminals. |
|
|
|
A lot of thanks, regarding outputting of colors, goes to the Pygments project: |
|
(We don't rely on Pygments anymore, because many things are very custom, and |
|
everything has been highly optimized.) |
|
http://pygments.org/ |
|
""" |
|
from __future__ import annotations |
|
|
|
import io |
|
import os |
|
import sys |
|
from typing import Callable, Dict, Hashable, Iterable, Sequence, TextIO, Tuple |
|
|
|
from prompt_toolkit.cursor_shapes import CursorShape |
|
from prompt_toolkit.data_structures import Size |
|
from prompt_toolkit.output import Output |
|
from prompt_toolkit.styles import ANSI_COLOR_NAMES, Attrs |
|
from prompt_toolkit.utils import is_dumb_terminal |
|
|
|
from .color_depth import ColorDepth |
|
from .flush_stdout import flush_stdout |
|
|
|
__all__ = [ |
|
"Vt100_Output", |
|
] |
|
|
|
|
|
FG_ANSI_COLORS = { |
|
"ansidefault": 39, |
|
|
|
"ansiblack": 30, |
|
"ansired": 31, |
|
"ansigreen": 32, |
|
"ansiyellow": 33, |
|
"ansiblue": 34, |
|
"ansimagenta": 35, |
|
"ansicyan": 36, |
|
"ansigray": 37, |
|
|
|
"ansibrightblack": 90, |
|
"ansibrightred": 91, |
|
"ansibrightgreen": 92, |
|
"ansibrightyellow": 93, |
|
"ansibrightblue": 94, |
|
"ansibrightmagenta": 95, |
|
"ansibrightcyan": 96, |
|
"ansiwhite": 97, |
|
} |
|
|
|
BG_ANSI_COLORS = { |
|
"ansidefault": 49, |
|
|
|
"ansiblack": 40, |
|
"ansired": 41, |
|
"ansigreen": 42, |
|
"ansiyellow": 43, |
|
"ansiblue": 44, |
|
"ansimagenta": 45, |
|
"ansicyan": 46, |
|
"ansigray": 47, |
|
|
|
"ansibrightblack": 100, |
|
"ansibrightred": 101, |
|
"ansibrightgreen": 102, |
|
"ansibrightyellow": 103, |
|
"ansibrightblue": 104, |
|
"ansibrightmagenta": 105, |
|
"ansibrightcyan": 106, |
|
"ansiwhite": 107, |
|
} |
|
|
|
|
|
ANSI_COLORS_TO_RGB = { |
|
"ansidefault": ( |
|
0x00, |
|
0x00, |
|
0x00, |
|
), |
|
"ansiblack": (0x00, 0x00, 0x00), |
|
"ansigray": (0xE5, 0xE5, 0xE5), |
|
"ansibrightblack": (0x7F, 0x7F, 0x7F), |
|
"ansiwhite": (0xFF, 0xFF, 0xFF), |
|
|
|
"ansired": (0xCD, 0x00, 0x00), |
|
"ansigreen": (0x00, 0xCD, 0x00), |
|
"ansiyellow": (0xCD, 0xCD, 0x00), |
|
"ansiblue": (0x00, 0x00, 0xCD), |
|
"ansimagenta": (0xCD, 0x00, 0xCD), |
|
"ansicyan": (0x00, 0xCD, 0xCD), |
|
|
|
"ansibrightred": (0xFF, 0x00, 0x00), |
|
"ansibrightgreen": (0x00, 0xFF, 0x00), |
|
"ansibrightyellow": (0xFF, 0xFF, 0x00), |
|
"ansibrightblue": (0x00, 0x00, 0xFF), |
|
"ansibrightmagenta": (0xFF, 0x00, 0xFF), |
|
"ansibrightcyan": (0x00, 0xFF, 0xFF), |
|
} |
|
|
|
|
|
assert set(FG_ANSI_COLORS) == set(ANSI_COLOR_NAMES) |
|
assert set(BG_ANSI_COLORS) == set(ANSI_COLOR_NAMES) |
|
assert set(ANSI_COLORS_TO_RGB) == set(ANSI_COLOR_NAMES) |
|
|
|
|
|
def _get_closest_ansi_color(r: int, g: int, b: int, exclude: Sequence[str] = ()) -> str: |
|
""" |
|
Find closest ANSI color. Return it by name. |
|
|
|
:param r: Red (Between 0 and 255.) |
|
:param g: Green (Between 0 and 255.) |
|
:param b: Blue (Between 0 and 255.) |
|
:param exclude: A tuple of color names to exclude. (E.g. ``('ansired', )``.) |
|
""" |
|
exclude = list(exclude) |
|
|
|
|
|
|
|
saturation = abs(r - g) + abs(g - b) + abs(b - r) |
|
|
|
if saturation > 30: |
|
exclude.extend(["ansilightgray", "ansidarkgray", "ansiwhite", "ansiblack"]) |
|
|
|
|
|
|
|
distance = 257 * 257 * 3 |
|
match = "ansidefault" |
|
|
|
for name, (r2, g2, b2) in ANSI_COLORS_TO_RGB.items(): |
|
if name != "ansidefault" and name not in exclude: |
|
d = (r - r2) ** 2 + (g - g2) ** 2 + (b - b2) ** 2 |
|
|
|
if d < distance: |
|
match = name |
|
distance = d |
|
|
|
return match |
|
|
|
|
|
_ColorCodeAndName = Tuple[int, str] |
|
|
|
|
|
class _16ColorCache: |
|
""" |
|
Cache which maps (r, g, b) tuples to 16 ansi colors. |
|
|
|
:param bg: Cache for background colors, instead of foreground. |
|
""" |
|
|
|
def __init__(self, bg: bool = False) -> None: |
|
self.bg = bg |
|
self._cache: dict[Hashable, _ColorCodeAndName] = {} |
|
|
|
def get_code( |
|
self, value: tuple[int, int, int], exclude: Sequence[str] = () |
|
) -> _ColorCodeAndName: |
|
""" |
|
Return a (ansi_code, ansi_name) tuple. (E.g. ``(44, 'ansiblue')``.) for |
|
a given (r,g,b) value. |
|
""" |
|
key: Hashable = (value, tuple(exclude)) |
|
cache = self._cache |
|
|
|
if key not in cache: |
|
cache[key] = self._get(value, exclude) |
|
|
|
return cache[key] |
|
|
|
def _get( |
|
self, value: tuple[int, int, int], exclude: Sequence[str] = () |
|
) -> _ColorCodeAndName: |
|
r, g, b = value |
|
match = _get_closest_ansi_color(r, g, b, exclude=exclude) |
|
|
|
|
|
if self.bg: |
|
code = BG_ANSI_COLORS[match] |
|
else: |
|
code = FG_ANSI_COLORS[match] |
|
|
|
return code, match |
|
|
|
|
|
class _256ColorCache(Dict[Tuple[int, int, int], int]): |
|
""" |
|
Cache which maps (r, g, b) tuples to 256 colors. |
|
""" |
|
|
|
def __init__(self) -> None: |
|
|
|
colors: list[tuple[int, int, int]] = [] |
|
|
|
|
|
colors.append((0x00, 0x00, 0x00)) |
|
colors.append((0xCD, 0x00, 0x00)) |
|
colors.append((0x00, 0xCD, 0x00)) |
|
colors.append((0xCD, 0xCD, 0x00)) |
|
colors.append((0x00, 0x00, 0xEE)) |
|
colors.append((0xCD, 0x00, 0xCD)) |
|
colors.append((0x00, 0xCD, 0xCD)) |
|
colors.append((0xE5, 0xE5, 0xE5)) |
|
colors.append((0x7F, 0x7F, 0x7F)) |
|
colors.append((0xFF, 0x00, 0x00)) |
|
colors.append((0x00, 0xFF, 0x00)) |
|
colors.append((0xFF, 0xFF, 0x00)) |
|
colors.append((0x5C, 0x5C, 0xFF)) |
|
colors.append((0xFF, 0x00, 0xFF)) |
|
colors.append((0x00, 0xFF, 0xFF)) |
|
colors.append((0xFF, 0xFF, 0xFF)) |
|
|
|
|
|
valuerange = (0x00, 0x5F, 0x87, 0xAF, 0xD7, 0xFF) |
|
|
|
for i in range(217): |
|
r = valuerange[(i // 36) % 6] |
|
g = valuerange[(i // 6) % 6] |
|
b = valuerange[i % 6] |
|
colors.append((r, g, b)) |
|
|
|
|
|
for i in range(1, 22): |
|
v = 8 + i * 10 |
|
colors.append((v, v, v)) |
|
|
|
self.colors = colors |
|
|
|
def __missing__(self, value: tuple[int, int, int]) -> int: |
|
r, g, b = value |
|
|
|
|
|
|
|
distance = 257 * 257 * 3 |
|
match = 0 |
|
|
|
for i, (r2, g2, b2) in enumerate(self.colors): |
|
if i >= 16: |
|
|
|
|
|
d = (r - r2) ** 2 + (g - g2) ** 2 + (b - b2) ** 2 |
|
|
|
if d < distance: |
|
match = i |
|
distance = d |
|
|
|
|
|
self[value] = match |
|
return match |
|
|
|
|
|
_16_fg_colors = _16ColorCache(bg=False) |
|
_16_bg_colors = _16ColorCache(bg=True) |
|
_256_colors = _256ColorCache() |
|
|
|
|
|
class _EscapeCodeCache(Dict[Attrs, str]): |
|
""" |
|
Cache for VT100 escape codes. It maps |
|
(fgcolor, bgcolor, bold, underline, strike, reverse) tuples to VT100 |
|
escape sequences. |
|
|
|
:param true_color: When True, use 24bit colors instead of 256 colors. |
|
""" |
|
|
|
def __init__(self, color_depth: ColorDepth) -> None: |
|
self.color_depth = color_depth |
|
|
|
def __missing__(self, attrs: Attrs) -> str: |
|
( |
|
fgcolor, |
|
bgcolor, |
|
bold, |
|
underline, |
|
strike, |
|
italic, |
|
blink, |
|
reverse, |
|
hidden, |
|
) = attrs |
|
parts: list[str] = [] |
|
|
|
parts.extend(self._colors_to_code(fgcolor or "", bgcolor or "")) |
|
|
|
if bold: |
|
parts.append("1") |
|
if italic: |
|
parts.append("3") |
|
if blink: |
|
parts.append("5") |
|
if underline: |
|
parts.append("4") |
|
if reverse: |
|
parts.append("7") |
|
if hidden: |
|
parts.append("8") |
|
if strike: |
|
parts.append("9") |
|
|
|
if parts: |
|
result = "\x1b[0;" + ";".join(parts) + "m" |
|
else: |
|
result = "\x1b[0m" |
|
|
|
self[attrs] = result |
|
return result |
|
|
|
def _color_name_to_rgb(self, color: str) -> tuple[int, int, int]: |
|
"Turn 'ffffff', into (0xff, 0xff, 0xff)." |
|
try: |
|
rgb = int(color, 16) |
|
except ValueError: |
|
raise |
|
else: |
|
r = (rgb >> 16) & 0xFF |
|
g = (rgb >> 8) & 0xFF |
|
b = rgb & 0xFF |
|
return r, g, b |
|
|
|
def _colors_to_code(self, fg_color: str, bg_color: str) -> Iterable[str]: |
|
""" |
|
Return a tuple with the vt100 values that represent this color. |
|
""" |
|
|
|
|
|
|
|
fg_ansi = "" |
|
|
|
def get(color: str, bg: bool) -> list[int]: |
|
nonlocal fg_ansi |
|
|
|
table = BG_ANSI_COLORS if bg else FG_ANSI_COLORS |
|
|
|
if not color or self.color_depth == ColorDepth.DEPTH_1_BIT: |
|
return [] |
|
|
|
|
|
elif color in table: |
|
return [table[color]] |
|
|
|
|
|
else: |
|
try: |
|
rgb = self._color_name_to_rgb(color) |
|
except ValueError: |
|
return [] |
|
|
|
|
|
if self.color_depth == ColorDepth.DEPTH_4_BIT: |
|
if bg: |
|
if fg_color != bg_color: |
|
exclude = [fg_ansi] |
|
else: |
|
exclude = [] |
|
code, name = _16_bg_colors.get_code(rgb, exclude=exclude) |
|
return [code] |
|
else: |
|
code, name = _16_fg_colors.get_code(rgb) |
|
fg_ansi = name |
|
return [code] |
|
|
|
|
|
elif self.color_depth == ColorDepth.DEPTH_24_BIT: |
|
r, g, b = rgb |
|
return [(48 if bg else 38), 2, r, g, b] |
|
|
|
|
|
else: |
|
return [(48 if bg else 38), 5, _256_colors[rgb]] |
|
|
|
result: list[int] = [] |
|
result.extend(get(fg_color, False)) |
|
result.extend(get(bg_color, True)) |
|
|
|
return map(str, result) |
|
|
|
|
|
def _get_size(fileno: int) -> tuple[int, int]: |
|
""" |
|
Get the size of this pseudo terminal. |
|
|
|
:param fileno: stdout.fileno() |
|
:returns: A (rows, cols) tuple. |
|
""" |
|
size = os.get_terminal_size(fileno) |
|
return size.lines, size.columns |
|
|
|
|
|
class Vt100_Output(Output): |
|
""" |
|
:param get_size: A callable which returns the `Size` of the output terminal. |
|
:param stdout: Any object with has a `write` and `flush` method + an 'encoding' property. |
|
:param term: The terminal environment variable. (xterm, xterm-256color, linux, ...) |
|
:param enable_cpr: When `True` (the default), send "cursor position |
|
request" escape sequences to the output in order to detect the cursor |
|
position. That way, we can properly determine how much space there is |
|
available for the UI (especially for drop down menus) to render. The |
|
`Renderer` will still try to figure out whether the current terminal |
|
does respond to CPR escapes. When `False`, never attempt to send CPR |
|
requests. |
|
""" |
|
|
|
|
|
|
|
_fds_not_a_terminal: set[int] = set() |
|
|
|
def __init__( |
|
self, |
|
stdout: TextIO, |
|
get_size: Callable[[], Size], |
|
term: str | None = None, |
|
default_color_depth: ColorDepth | None = None, |
|
enable_bell: bool = True, |
|
enable_cpr: bool = True, |
|
) -> None: |
|
assert all(hasattr(stdout, a) for a in ("write", "flush")) |
|
|
|
self._buffer: list[str] = [] |
|
self.stdout: TextIO = stdout |
|
self.default_color_depth = default_color_depth |
|
self._get_size = get_size |
|
self.term = term |
|
self.enable_bell = enable_bell |
|
self.enable_cpr = enable_cpr |
|
|
|
|
|
self._escape_code_caches: dict[ColorDepth, _EscapeCodeCache] = { |
|
ColorDepth.DEPTH_1_BIT: _EscapeCodeCache(ColorDepth.DEPTH_1_BIT), |
|
ColorDepth.DEPTH_4_BIT: _EscapeCodeCache(ColorDepth.DEPTH_4_BIT), |
|
ColorDepth.DEPTH_8_BIT: _EscapeCodeCache(ColorDepth.DEPTH_8_BIT), |
|
ColorDepth.DEPTH_24_BIT: _EscapeCodeCache(ColorDepth.DEPTH_24_BIT), |
|
} |
|
|
|
|
|
|
|
|
|
self._cursor_shape_changed = False |
|
|
|
@classmethod |
|
def from_pty( |
|
cls, |
|
stdout: TextIO, |
|
term: str | None = None, |
|
default_color_depth: ColorDepth | None = None, |
|
enable_bell: bool = True, |
|
) -> Vt100_Output: |
|
""" |
|
Create an Output class from a pseudo terminal. |
|
(This will take the dimensions by reading the pseudo |
|
terminal attributes.) |
|
""" |
|
fd: int | None |
|
|
|
|
|
|
|
try: |
|
fd = stdout.fileno() |
|
except io.UnsupportedOperation: |
|
fd = None |
|
|
|
if not stdout.isatty() and (fd is None or fd not in cls._fds_not_a_terminal): |
|
msg = "Warning: Output is not a terminal (fd=%r).\n" |
|
sys.stderr.write(msg % fd) |
|
sys.stderr.flush() |
|
if fd is not None: |
|
cls._fds_not_a_terminal.add(fd) |
|
|
|
def get_size() -> Size: |
|
|
|
|
|
|
|
rows, columns = (None, None) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
rows, columns = _get_size(stdout.fileno()) |
|
except OSError: |
|
pass |
|
return Size(rows=rows or 24, columns=columns or 80) |
|
|
|
return cls( |
|
stdout, |
|
get_size, |
|
term=term, |
|
default_color_depth=default_color_depth, |
|
enable_bell=enable_bell, |
|
) |
|
|
|
def get_size(self) -> Size: |
|
return self._get_size() |
|
|
|
def fileno(self) -> int: |
|
"Return file descriptor." |
|
return self.stdout.fileno() |
|
|
|
def encoding(self) -> str: |
|
"Return encoding used for stdout." |
|
return self.stdout.encoding |
|
|
|
def write_raw(self, data: str) -> None: |
|
""" |
|
Write raw data to output. |
|
""" |
|
self._buffer.append(data) |
|
|
|
def write(self, data: str) -> None: |
|
""" |
|
Write text to output. |
|
(Removes vt100 escape codes. -- used for safely writing text.) |
|
""" |
|
self._buffer.append(data.replace("\x1b", "?")) |
|
|
|
def set_title(self, title: str) -> None: |
|
""" |
|
Set terminal title. |
|
""" |
|
if self.term not in ( |
|
"linux", |
|
"eterm-color", |
|
): |
|
self.write_raw( |
|
"\x1b]2;%s\x07" % title.replace("\x1b", "").replace("\x07", "") |
|
) |
|
|
|
def clear_title(self) -> None: |
|
self.set_title("") |
|
|
|
def erase_screen(self) -> None: |
|
""" |
|
Erases the screen with the background color and moves the cursor to |
|
home. |
|
""" |
|
self.write_raw("\x1b[2J") |
|
|
|
def enter_alternate_screen(self) -> None: |
|
self.write_raw("\x1b[?1049h\x1b[H") |
|
|
|
def quit_alternate_screen(self) -> None: |
|
self.write_raw("\x1b[?1049l") |
|
|
|
def enable_mouse_support(self) -> None: |
|
self.write_raw("\x1b[?1000h") |
|
|
|
|
|
self.write_raw("\x1b[?1003h") |
|
|
|
|
|
self.write_raw("\x1b[?1015h") |
|
|
|
|
|
self.write_raw("\x1b[?1006h") |
|
|
|
|
|
|
|
|
|
def disable_mouse_support(self) -> None: |
|
self.write_raw("\x1b[?1000l") |
|
self.write_raw("\x1b[?1015l") |
|
self.write_raw("\x1b[?1006l") |
|
self.write_raw("\x1b[?1003l") |
|
|
|
def erase_end_of_line(self) -> None: |
|
""" |
|
Erases from the current cursor position to the end of the current line. |
|
""" |
|
self.write_raw("\x1b[K") |
|
|
|
def erase_down(self) -> None: |
|
""" |
|
Erases the screen from the current line down to the bottom of the |
|
screen. |
|
""" |
|
self.write_raw("\x1b[J") |
|
|
|
def reset_attributes(self) -> None: |
|
self.write_raw("\x1b[0m") |
|
|
|
def set_attributes(self, attrs: Attrs, color_depth: ColorDepth) -> None: |
|
""" |
|
Create new style and output. |
|
|
|
:param attrs: `Attrs` instance. |
|
""" |
|
|
|
escape_code_cache = self._escape_code_caches[color_depth] |
|
|
|
|
|
self.write_raw(escape_code_cache[attrs]) |
|
|
|
def disable_autowrap(self) -> None: |
|
self.write_raw("\x1b[?7l") |
|
|
|
def enable_autowrap(self) -> None: |
|
self.write_raw("\x1b[?7h") |
|
|
|
def enable_bracketed_paste(self) -> None: |
|
self.write_raw("\x1b[?2004h") |
|
|
|
def disable_bracketed_paste(self) -> None: |
|
self.write_raw("\x1b[?2004l") |
|
|
|
def reset_cursor_key_mode(self) -> None: |
|
""" |
|
For vt100 only. |
|
Put the terminal in cursor mode (instead of application mode). |
|
""" |
|
|
|
self.write_raw("\x1b[?1l") |
|
|
|
def cursor_goto(self, row: int = 0, column: int = 0) -> None: |
|
""" |
|
Move cursor position. |
|
""" |
|
self.write_raw("\x1b[%i;%iH" % (row, column)) |
|
|
|
def cursor_up(self, amount: int) -> None: |
|
if amount == 0: |
|
pass |
|
elif amount == 1: |
|
self.write_raw("\x1b[A") |
|
else: |
|
self.write_raw("\x1b[%iA" % amount) |
|
|
|
def cursor_down(self, amount: int) -> None: |
|
if amount == 0: |
|
pass |
|
elif amount == 1: |
|
|
|
|
|
self.write_raw("\x1b[B") |
|
else: |
|
self.write_raw("\x1b[%iB" % amount) |
|
|
|
def cursor_forward(self, amount: int) -> None: |
|
if amount == 0: |
|
pass |
|
elif amount == 1: |
|
self.write_raw("\x1b[C") |
|
else: |
|
self.write_raw("\x1b[%iC" % amount) |
|
|
|
def cursor_backward(self, amount: int) -> None: |
|
if amount == 0: |
|
pass |
|
elif amount == 1: |
|
self.write_raw("\b") |
|
else: |
|
self.write_raw("\x1b[%iD" % amount) |
|
|
|
def hide_cursor(self) -> None: |
|
self.write_raw("\x1b[?25l") |
|
|
|
def show_cursor(self) -> None: |
|
self.write_raw("\x1b[?12l\x1b[?25h") |
|
|
|
def set_cursor_shape(self, cursor_shape: CursorShape) -> None: |
|
if cursor_shape == CursorShape._NEVER_CHANGE: |
|
return |
|
|
|
self._cursor_shape_changed = True |
|
self.write_raw( |
|
{ |
|
CursorShape.BLOCK: "\x1b[2 q", |
|
CursorShape.BEAM: "\x1b[6 q", |
|
CursorShape.UNDERLINE: "\x1b[4 q", |
|
CursorShape.BLINKING_BLOCK: "\x1b[1 q", |
|
CursorShape.BLINKING_BEAM: "\x1b[5 q", |
|
CursorShape.BLINKING_UNDERLINE: "\x1b[3 q", |
|
}.get(cursor_shape, "") |
|
) |
|
|
|
def reset_cursor_shape(self) -> None: |
|
"Reset cursor shape." |
|
|
|
if self._cursor_shape_changed: |
|
self._cursor_shape_changed = False |
|
|
|
|
|
self.write_raw("\x1b[0 q") |
|
|
|
def flush(self) -> None: |
|
""" |
|
Write to output stream and flush. |
|
""" |
|
if not self._buffer: |
|
return |
|
|
|
data = "".join(self._buffer) |
|
self._buffer = [] |
|
|
|
flush_stdout(self.stdout, data) |
|
|
|
def ask_for_cpr(self) -> None: |
|
""" |
|
Asks for a cursor position report (CPR). |
|
""" |
|
self.write_raw("\x1b[6n") |
|
self.flush() |
|
|
|
@property |
|
def responds_to_cpr(self) -> bool: |
|
if not self.enable_cpr: |
|
return False |
|
|
|
|
|
|
|
if os.environ.get("PROMPT_TOOLKIT_NO_CPR", "") == "1": |
|
return False |
|
|
|
if is_dumb_terminal(self.term): |
|
return False |
|
try: |
|
return self.stdout.isatty() |
|
except ValueError: |
|
return False |
|
|
|
def bell(self) -> None: |
|
"Sound bell." |
|
if self.enable_bell: |
|
self.write_raw("\a") |
|
self.flush() |
|
|
|
def get_default_color_depth(self) -> ColorDepth: |
|
""" |
|
Return the default color depth for a vt100 terminal, according to the |
|
our term value. |
|
|
|
We prefer 256 colors almost always, because this is what most terminals |
|
support these days, and is a good default. |
|
""" |
|
if self.default_color_depth is not None: |
|
return self.default_color_depth |
|
|
|
term = self.term |
|
|
|
if term is None: |
|
return ColorDepth.DEFAULT |
|
|
|
if is_dumb_terminal(term): |
|
return ColorDepth.DEPTH_1_BIT |
|
|
|
if term in ("linux", "eterm-color"): |
|
return ColorDepth.DEPTH_4_BIT |
|
|
|
return ColorDepth.DEFAULT |
|
|