|
""" |
|
Renders the command line on the console. |
|
(Redraws parts of the input line that were changed.) |
|
""" |
|
from __future__ import annotations |
|
|
|
from asyncio import FIRST_COMPLETED, Future, ensure_future, sleep, wait |
|
from collections import deque |
|
from enum import Enum |
|
from typing import TYPE_CHECKING, Any, Callable, Dict, Hashable |
|
|
|
from prompt_toolkit.application.current import get_app |
|
from prompt_toolkit.cursor_shapes import CursorShape |
|
from prompt_toolkit.data_structures import Point, Size |
|
from prompt_toolkit.filters import FilterOrBool, to_filter |
|
from prompt_toolkit.formatted_text import AnyFormattedText, to_formatted_text |
|
from prompt_toolkit.layout.mouse_handlers import MouseHandlers |
|
from prompt_toolkit.layout.screen import Char, Screen, WritePosition |
|
from prompt_toolkit.output import ColorDepth, Output |
|
from prompt_toolkit.styles import ( |
|
Attrs, |
|
BaseStyle, |
|
DummyStyleTransformation, |
|
StyleTransformation, |
|
) |
|
|
|
if TYPE_CHECKING: |
|
from prompt_toolkit.application import Application |
|
from prompt_toolkit.layout.layout import Layout |
|
|
|
|
|
__all__ = [ |
|
"Renderer", |
|
"print_formatted_text", |
|
] |
|
|
|
|
|
def _output_screen_diff( |
|
app: Application[Any], |
|
output: Output, |
|
screen: Screen, |
|
current_pos: Point, |
|
color_depth: ColorDepth, |
|
previous_screen: Screen | None, |
|
last_style: str | None, |
|
is_done: bool, |
|
full_screen: bool, |
|
attrs_for_style_string: _StyleStringToAttrsCache, |
|
style_string_has_style: _StyleStringHasStyleCache, |
|
size: Size, |
|
previous_width: int, |
|
) -> tuple[Point, str | None]: |
|
""" |
|
Render the diff between this screen and the previous screen. |
|
|
|
This takes two `Screen` instances. The one that represents the output like |
|
it was during the last rendering and one that represents the current |
|
output raster. Looking at these two `Screen` instances, this function will |
|
render the difference by calling the appropriate methods of the `Output` |
|
object that only paint the changes to the terminal. |
|
|
|
This is some performance-critical code which is heavily optimized. |
|
Don't change things without profiling first. |
|
|
|
:param current_pos: Current cursor position. |
|
:param last_style: The style string, used for drawing the last drawn |
|
character. (Color/attributes.) |
|
:param attrs_for_style_string: :class:`._StyleStringToAttrsCache` instance. |
|
:param width: The width of the terminal. |
|
:param previous_width: The width of the terminal during the last rendering. |
|
""" |
|
width, height = size.columns, size.rows |
|
|
|
|
|
write = output.write |
|
write_raw = output.write_raw |
|
|
|
|
|
|
|
_output_set_attributes = output.set_attributes |
|
_output_reset_attributes = output.reset_attributes |
|
_output_cursor_forward = output.cursor_forward |
|
_output_cursor_up = output.cursor_up |
|
_output_cursor_backward = output.cursor_backward |
|
|
|
|
|
output.hide_cursor() |
|
|
|
def reset_attributes() -> None: |
|
"Wrapper around Output.reset_attributes." |
|
nonlocal last_style |
|
_output_reset_attributes() |
|
last_style = None |
|
|
|
def move_cursor(new: Point) -> Point: |
|
"Move cursor to this `new` point. Returns the given Point." |
|
current_x, current_y = current_pos.x, current_pos.y |
|
|
|
if new.y > current_y: |
|
|
|
|
|
|
|
|
|
reset_attributes() |
|
write("\r\n" * (new.y - current_y)) |
|
current_x = 0 |
|
_output_cursor_forward(new.x) |
|
return new |
|
elif new.y < current_y: |
|
_output_cursor_up(current_y - new.y) |
|
|
|
if current_x >= width - 1: |
|
write("\r") |
|
_output_cursor_forward(new.x) |
|
elif new.x < current_x or current_x >= width - 1: |
|
_output_cursor_backward(current_x - new.x) |
|
elif new.x > current_x: |
|
_output_cursor_forward(new.x - current_x) |
|
|
|
return new |
|
|
|
def output_char(char: Char) -> None: |
|
""" |
|
Write the output of this character. |
|
""" |
|
nonlocal last_style |
|
|
|
|
|
|
|
if last_style == char.style: |
|
write(char.char) |
|
else: |
|
|
|
|
|
|
|
|
|
new_attrs = attrs_for_style_string[char.style] |
|
if not last_style or new_attrs != attrs_for_style_string[last_style]: |
|
_output_set_attributes(new_attrs, color_depth) |
|
|
|
write(char.char) |
|
last_style = char.style |
|
|
|
def get_max_column_index(row: dict[int, Char]) -> int: |
|
""" |
|
Return max used column index, ignoring whitespace (without style) at |
|
the end of the line. This is important for people that copy/paste |
|
terminal output. |
|
|
|
There are two reasons we are sometimes seeing whitespace at the end: |
|
- `BufferControl` adds a trailing space to each line, because it's a |
|
possible cursor position, so that the line wrapping won't change if |
|
the cursor position moves around. |
|
- The `Window` adds a style class to the current line for highlighting |
|
(cursor-line). |
|
""" |
|
numbers = ( |
|
index |
|
for index, cell in row.items() |
|
if cell.char != " " or style_string_has_style[cell.style] |
|
) |
|
return max(numbers, default=0) |
|
|
|
|
|
if not previous_screen: |
|
reset_attributes() |
|
|
|
|
|
|
|
|
|
|
|
if not previous_screen or not full_screen: |
|
output.disable_autowrap() |
|
|
|
|
|
|
|
if ( |
|
is_done or not previous_screen or previous_width != width |
|
): |
|
current_pos = move_cursor(Point(x=0, y=0)) |
|
reset_attributes() |
|
output.erase_down() |
|
|
|
previous_screen = Screen() |
|
|
|
|
|
|
|
|
|
current_height = min(screen.height, height) |
|
|
|
|
|
row_count = min(max(screen.height, previous_screen.height), height) |
|
|
|
for y in range(row_count): |
|
new_row = screen.data_buffer[y] |
|
previous_row = previous_screen.data_buffer[y] |
|
zero_width_escapes_row = screen.zero_width_escapes[y] |
|
|
|
new_max_line_len = min(width - 1, get_max_column_index(new_row)) |
|
previous_max_line_len = min(width - 1, get_max_column_index(previous_row)) |
|
|
|
|
|
c = 0 |
|
while c <= new_max_line_len: |
|
new_char = new_row[c] |
|
old_char = previous_row[c] |
|
char_width = new_char.width or 1 |
|
|
|
|
|
|
|
|
|
if new_char.char != old_char.char or new_char.style != old_char.style: |
|
current_pos = move_cursor(Point(x=c, y=y)) |
|
|
|
|
|
if c in zero_width_escapes_row: |
|
write_raw(zero_width_escapes_row[c]) |
|
|
|
output_char(new_char) |
|
current_pos = Point(x=current_pos.x + char_width, y=current_pos.y) |
|
|
|
c += char_width |
|
|
|
|
|
if previous_screen and new_max_line_len < previous_max_line_len: |
|
current_pos = move_cursor(Point(x=new_max_line_len + 1, y=y)) |
|
reset_attributes() |
|
output.erase_end_of_line() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if current_height > previous_screen.height: |
|
current_pos = move_cursor(Point(x=0, y=current_height - 1)) |
|
|
|
|
|
if is_done: |
|
current_pos = move_cursor(Point(x=0, y=current_height)) |
|
output.erase_down() |
|
else: |
|
current_pos = move_cursor(screen.get_cursor_position(app.layout.current_window)) |
|
|
|
if is_done or not full_screen: |
|
output.enable_autowrap() |
|
|
|
|
|
|
|
|
|
|
|
reset_attributes() |
|
|
|
if screen.show_cursor or is_done: |
|
output.show_cursor() |
|
|
|
return current_pos, last_style |
|
|
|
|
|
class HeightIsUnknownError(Exception): |
|
"Information unavailable. Did not yet receive the CPR response." |
|
|
|
|
|
class _StyleStringToAttrsCache(Dict[str, Attrs]): |
|
""" |
|
A cache structure that maps style strings to :class:`.Attr`. |
|
(This is an important speed up.) |
|
""" |
|
|
|
def __init__( |
|
self, |
|
get_attrs_for_style_str: Callable[[str], Attrs], |
|
style_transformation: StyleTransformation, |
|
) -> None: |
|
self.get_attrs_for_style_str = get_attrs_for_style_str |
|
self.style_transformation = style_transformation |
|
|
|
def __missing__(self, style_str: str) -> Attrs: |
|
attrs = self.get_attrs_for_style_str(style_str) |
|
attrs = self.style_transformation.transform_attrs(attrs) |
|
|
|
self[style_str] = attrs |
|
return attrs |
|
|
|
|
|
class _StyleStringHasStyleCache(Dict[str, bool]): |
|
""" |
|
Cache for remember which style strings don't render the default output |
|
style (default fg/bg, no underline and no reverse and no blink). That way |
|
we know that we should render these cells, even when they're empty (when |
|
they contain a space). |
|
|
|
Note: we don't consider bold/italic/hidden because they don't change the |
|
output if there's no text in the cell. |
|
""" |
|
|
|
def __init__(self, style_string_to_attrs: dict[str, Attrs]) -> None: |
|
self.style_string_to_attrs = style_string_to_attrs |
|
|
|
def __missing__(self, style_str: str) -> bool: |
|
attrs = self.style_string_to_attrs[style_str] |
|
is_default = bool( |
|
attrs.color |
|
or attrs.bgcolor |
|
or attrs.underline |
|
or attrs.strike |
|
or attrs.blink |
|
or attrs.reverse |
|
) |
|
|
|
self[style_str] = is_default |
|
return is_default |
|
|
|
|
|
class CPR_Support(Enum): |
|
"Enum: whether or not CPR is supported." |
|
|
|
SUPPORTED = "SUPPORTED" |
|
NOT_SUPPORTED = "NOT_SUPPORTED" |
|
UNKNOWN = "UNKNOWN" |
|
|
|
|
|
class Renderer: |
|
""" |
|
Typical usage: |
|
|
|
:: |
|
|
|
output = Vt100_Output.from_pty(sys.stdout) |
|
r = Renderer(style, output) |
|
r.render(app, layout=...) |
|
""" |
|
|
|
CPR_TIMEOUT = 2 |
|
|
|
def __init__( |
|
self, |
|
style: BaseStyle, |
|
output: Output, |
|
full_screen: bool = False, |
|
mouse_support: FilterOrBool = False, |
|
cpr_not_supported_callback: Callable[[], None] | None = None, |
|
) -> None: |
|
self.style = style |
|
self.output = output |
|
self.full_screen = full_screen |
|
self.mouse_support = to_filter(mouse_support) |
|
self.cpr_not_supported_callback = cpr_not_supported_callback |
|
|
|
self._in_alternate_screen = False |
|
self._mouse_support_enabled = False |
|
self._bracketed_paste_enabled = False |
|
self._cursor_key_mode_reset = False |
|
|
|
|
|
self._waiting_for_cpr_futures: deque[Future[None]] = deque() |
|
self.cpr_support = CPR_Support.UNKNOWN |
|
|
|
if not output.responds_to_cpr: |
|
self.cpr_support = CPR_Support.NOT_SUPPORTED |
|
|
|
|
|
self._attrs_for_style: _StyleStringToAttrsCache | None = None |
|
self._style_string_has_style: _StyleStringHasStyleCache | None = None |
|
self._last_style_hash: Hashable | None = None |
|
self._last_transformation_hash: Hashable | None = None |
|
self._last_color_depth: ColorDepth | None = None |
|
|
|
self.reset(_scroll=True) |
|
|
|
def reset(self, _scroll: bool = False, leave_alternate_screen: bool = True) -> None: |
|
|
|
self._cursor_pos = Point(x=0, y=0) |
|
|
|
|
|
|
|
|
|
|
|
self._last_screen: Screen | None = None |
|
self._last_size: Size | None = None |
|
self._last_style: str | None = None |
|
self._last_cursor_shape: CursorShape | None = None |
|
|
|
|
|
self.mouse_handlers = MouseHandlers() |
|
|
|
|
|
|
|
self._min_available_height = 0 |
|
|
|
|
|
|
|
|
|
if _scroll: |
|
self.output.scroll_buffer_to_prompt() |
|
|
|
|
|
if self._in_alternate_screen and leave_alternate_screen: |
|
self.output.quit_alternate_screen() |
|
self._in_alternate_screen = False |
|
|
|
|
|
if self._mouse_support_enabled: |
|
self.output.disable_mouse_support() |
|
self._mouse_support_enabled = False |
|
|
|
|
|
if self._bracketed_paste_enabled: |
|
self.output.disable_bracketed_paste() |
|
self._bracketed_paste_enabled = False |
|
|
|
self.output.reset_cursor_shape() |
|
|
|
|
|
|
|
|
|
self.output.flush() |
|
|
|
@property |
|
def last_rendered_screen(self) -> Screen | None: |
|
""" |
|
The `Screen` class that was generated during the last rendering. |
|
This can be `None`. |
|
""" |
|
return self._last_screen |
|
|
|
@property |
|
def height_is_known(self) -> bool: |
|
""" |
|
True when the height from the cursor until the bottom of the terminal |
|
is known. (It's often nicer to draw bottom toolbars only if the height |
|
is known, in order to avoid flickering when the CPR response arrives.) |
|
""" |
|
if self.full_screen or self._min_available_height > 0: |
|
return True |
|
try: |
|
self._min_available_height = self.output.get_rows_below_cursor_position() |
|
return True |
|
except NotImplementedError: |
|
return False |
|
|
|
@property |
|
def rows_above_layout(self) -> int: |
|
""" |
|
Return the number of rows visible in the terminal above the layout. |
|
""" |
|
if self._in_alternate_screen: |
|
return 0 |
|
elif self._min_available_height > 0: |
|
total_rows = self.output.get_size().rows |
|
last_screen_height = self._last_screen.height if self._last_screen else 0 |
|
return total_rows - max(self._min_available_height, last_screen_height) |
|
else: |
|
raise HeightIsUnknownError("Rows above layout is unknown.") |
|
|
|
def request_absolute_cursor_position(self) -> None: |
|
""" |
|
Get current cursor position. |
|
|
|
We do this to calculate the minimum available height that we can |
|
consume for rendering the prompt. This is the available space below te |
|
cursor. |
|
|
|
For vt100: Do CPR request. (answer will arrive later.) |
|
For win32: Do API call. (Answer comes immediately.) |
|
""" |
|
|
|
|
|
assert self._cursor_pos.y == 0 |
|
|
|
|
|
if self.full_screen: |
|
self._min_available_height = self.output.get_size().rows |
|
return |
|
|
|
|
|
|
|
try: |
|
self._min_available_height = self.output.get_rows_below_cursor_position() |
|
return |
|
except NotImplementedError: |
|
pass |
|
|
|
|
|
if self.cpr_support == CPR_Support.NOT_SUPPORTED: |
|
return |
|
|
|
def do_cpr() -> None: |
|
|
|
self._waiting_for_cpr_futures.append(Future()) |
|
self.output.ask_for_cpr() |
|
|
|
if self.cpr_support == CPR_Support.SUPPORTED: |
|
do_cpr() |
|
return |
|
|
|
|
|
|
|
if self.waiting_for_cpr: |
|
return |
|
|
|
do_cpr() |
|
|
|
async def timer() -> None: |
|
await sleep(self.CPR_TIMEOUT) |
|
|
|
|
|
if self.cpr_support == CPR_Support.UNKNOWN: |
|
self.cpr_support = CPR_Support.NOT_SUPPORTED |
|
|
|
if self.cpr_not_supported_callback: |
|
|
|
self.cpr_not_supported_callback() |
|
|
|
get_app().create_background_task(timer()) |
|
|
|
def report_absolute_cursor_row(self, row: int) -> None: |
|
""" |
|
To be called when we know the absolute cursor position. |
|
(As an answer of a "Cursor Position Request" response.) |
|
""" |
|
self.cpr_support = CPR_Support.SUPPORTED |
|
|
|
|
|
|
|
total_rows = self.output.get_size().rows |
|
rows_below_cursor = total_rows - row + 1 |
|
|
|
|
|
self._min_available_height = rows_below_cursor |
|
|
|
|
|
try: |
|
f = self._waiting_for_cpr_futures.popleft() |
|
except IndexError: |
|
pass |
|
else: |
|
f.set_result(None) |
|
|
|
@property |
|
def waiting_for_cpr(self) -> bool: |
|
""" |
|
Waiting for CPR flag. True when we send the request, but didn't got a |
|
response. |
|
""" |
|
return bool(self._waiting_for_cpr_futures) |
|
|
|
async def wait_for_cpr_responses(self, timeout: int = 1) -> None: |
|
""" |
|
Wait for a CPR response. |
|
""" |
|
cpr_futures = list(self._waiting_for_cpr_futures) |
|
|
|
|
|
if not cpr_futures or self.cpr_support == CPR_Support.NOT_SUPPORTED: |
|
return None |
|
|
|
async def wait_for_responses() -> None: |
|
for response_f in cpr_futures: |
|
await response_f |
|
|
|
async def wait_for_timeout() -> None: |
|
await sleep(timeout) |
|
|
|
|
|
for response_f in cpr_futures: |
|
response_f.cancel() |
|
self._waiting_for_cpr_futures = deque() |
|
|
|
tasks = { |
|
ensure_future(wait_for_responses()), |
|
ensure_future(wait_for_timeout()), |
|
} |
|
_, pending = await wait(tasks, return_when=FIRST_COMPLETED) |
|
for task in pending: |
|
task.cancel() |
|
|
|
def render( |
|
self, app: Application[Any], layout: Layout, is_done: bool = False |
|
) -> None: |
|
""" |
|
Render the current interface to the output. |
|
|
|
:param is_done: When True, put the cursor at the end of the interface. We |
|
won't print any changes to this part. |
|
""" |
|
output = self.output |
|
|
|
|
|
if self.full_screen and not self._in_alternate_screen: |
|
self._in_alternate_screen = True |
|
output.enter_alternate_screen() |
|
|
|
|
|
if not self._bracketed_paste_enabled: |
|
self.output.enable_bracketed_paste() |
|
self._bracketed_paste_enabled = True |
|
|
|
|
|
if not self._cursor_key_mode_reset: |
|
self.output.reset_cursor_key_mode() |
|
self._cursor_key_mode_reset = True |
|
|
|
|
|
needs_mouse_support = self.mouse_support() |
|
|
|
if needs_mouse_support and not self._mouse_support_enabled: |
|
output.enable_mouse_support() |
|
self._mouse_support_enabled = True |
|
|
|
elif not needs_mouse_support and self._mouse_support_enabled: |
|
output.disable_mouse_support() |
|
self._mouse_support_enabled = False |
|
|
|
|
|
size = output.get_size() |
|
screen = Screen() |
|
screen.show_cursor = False |
|
|
|
mouse_handlers = MouseHandlers() |
|
|
|
|
|
if self.full_screen: |
|
height = size.rows |
|
elif is_done: |
|
|
|
height = layout.container.preferred_height( |
|
size.columns, size.rows |
|
).preferred |
|
else: |
|
last_height = self._last_screen.height if self._last_screen else 0 |
|
height = max( |
|
self._min_available_height, |
|
last_height, |
|
layout.container.preferred_height(size.columns, size.rows).preferred, |
|
) |
|
|
|
height = min(height, size.rows) |
|
|
|
|
|
if self._last_size != size: |
|
self._last_screen = None |
|
|
|
|
|
|
|
|
|
if ( |
|
self.style.invalidation_hash() != self._last_style_hash |
|
or app.style_transformation.invalidation_hash() |
|
!= self._last_transformation_hash |
|
or app.color_depth != self._last_color_depth |
|
): |
|
self._last_screen = None |
|
self._attrs_for_style = None |
|
self._style_string_has_style = None |
|
|
|
if self._attrs_for_style is None: |
|
self._attrs_for_style = _StyleStringToAttrsCache( |
|
self.style.get_attrs_for_style_str, app.style_transformation |
|
) |
|
if self._style_string_has_style is None: |
|
self._style_string_has_style = _StyleStringHasStyleCache( |
|
self._attrs_for_style |
|
) |
|
|
|
self._last_style_hash = self.style.invalidation_hash() |
|
self._last_transformation_hash = app.style_transformation.invalidation_hash() |
|
self._last_color_depth = app.color_depth |
|
|
|
layout.container.write_to_screen( |
|
screen, |
|
mouse_handlers, |
|
WritePosition(xpos=0, ypos=0, width=size.columns, height=height), |
|
parent_style="", |
|
erase_bg=False, |
|
z_index=None, |
|
) |
|
screen.draw_all_floats() |
|
|
|
|
|
if app.exit_style: |
|
screen.append_style_to_content(app.exit_style) |
|
|
|
|
|
self._cursor_pos, self._last_style = _output_screen_diff( |
|
app, |
|
output, |
|
screen, |
|
self._cursor_pos, |
|
app.color_depth, |
|
self._last_screen, |
|
self._last_style, |
|
is_done, |
|
full_screen=self.full_screen, |
|
attrs_for_style_string=self._attrs_for_style, |
|
style_string_has_style=self._style_string_has_style, |
|
size=size, |
|
previous_width=(self._last_size.columns if self._last_size else 0), |
|
) |
|
self._last_screen = screen |
|
self._last_size = size |
|
self.mouse_handlers = mouse_handlers |
|
|
|
|
|
new_cursor_shape = app.cursor.get_cursor_shape(app) |
|
if ( |
|
self._last_cursor_shape is None |
|
or self._last_cursor_shape != new_cursor_shape |
|
): |
|
output.set_cursor_shape(new_cursor_shape) |
|
self._last_cursor_shape = new_cursor_shape |
|
|
|
|
|
output.flush() |
|
|
|
|
|
app.layout.visible_windows = screen.visible_windows |
|
|
|
if is_done: |
|
self.reset() |
|
|
|
def erase(self, leave_alternate_screen: bool = True) -> None: |
|
""" |
|
Hide all output and put the cursor back at the first line. This is for |
|
instance used for running a system command (while hiding the CLI) and |
|
later resuming the same CLI.) |
|
|
|
:param leave_alternate_screen: When True, and when inside an alternate |
|
screen buffer, quit the alternate screen. |
|
""" |
|
output = self.output |
|
|
|
output.cursor_backward(self._cursor_pos.x) |
|
output.cursor_up(self._cursor_pos.y) |
|
output.erase_down() |
|
output.reset_attributes() |
|
output.enable_autowrap() |
|
|
|
output.flush() |
|
|
|
self.reset(leave_alternate_screen=leave_alternate_screen) |
|
|
|
def clear(self) -> None: |
|
""" |
|
Clear screen and go to 0,0 |
|
""" |
|
|
|
self.erase() |
|
|
|
|
|
output = self.output |
|
|
|
output.erase_screen() |
|
output.cursor_goto(0, 0) |
|
output.flush() |
|
|
|
self.request_absolute_cursor_position() |
|
|
|
|
|
def print_formatted_text( |
|
output: Output, |
|
formatted_text: AnyFormattedText, |
|
style: BaseStyle, |
|
style_transformation: StyleTransformation | None = None, |
|
color_depth: ColorDepth | None = None, |
|
) -> None: |
|
""" |
|
Print a list of (style_str, text) tuples in the given style to the output. |
|
""" |
|
fragments = to_formatted_text(formatted_text) |
|
style_transformation = style_transformation or DummyStyleTransformation() |
|
color_depth = color_depth or output.get_default_color_depth() |
|
|
|
|
|
output.reset_attributes() |
|
output.enable_autowrap() |
|
last_attrs: Attrs | None = None |
|
|
|
|
|
attrs_for_style_string = _StyleStringToAttrsCache( |
|
style.get_attrs_for_style_str, style_transformation |
|
) |
|
|
|
for style_str, text, *_ in fragments: |
|
attrs = attrs_for_style_string[style_str] |
|
|
|
|
|
if attrs != last_attrs: |
|
if attrs: |
|
output.set_attributes(attrs, color_depth) |
|
else: |
|
output.reset_attributes() |
|
last_attrs = attrs |
|
|
|
|
|
if "[ZeroWidthEscape]" in style_str: |
|
output.write_raw(text) |
|
else: |
|
|
|
text = text.replace("\r", "") |
|
|
|
|
|
text = text.replace("\n", "\r\n") |
|
output.write(text) |
|
|
|
|
|
output.reset_attributes() |
|
output.flush() |
|
|