| """ |
| Renders the command line on the console. |
| (Redraws parts of the input line that were changed.) |
| """ |
| from __future__ import annotations |
|
|
| from asyncio import FIRST_COMPLETED, Future, ensure_future, sleep, wait |
| from collections import deque |
| from enum import Enum |
| from typing import TYPE_CHECKING, Any, Callable, Dict, Hashable |
|
|
| from prompt_toolkit.application.current import get_app |
| from prompt_toolkit.cursor_shapes import CursorShape |
| from prompt_toolkit.data_structures import Point, Size |
| from prompt_toolkit.filters import FilterOrBool, to_filter |
| from prompt_toolkit.formatted_text import AnyFormattedText, to_formatted_text |
| from prompt_toolkit.layout.mouse_handlers import MouseHandlers |
| from prompt_toolkit.layout.screen import Char, Screen, WritePosition |
| from prompt_toolkit.output import ColorDepth, Output |
| from prompt_toolkit.styles import ( |
| Attrs, |
| BaseStyle, |
| DummyStyleTransformation, |
| StyleTransformation, |
| ) |
|
|
| if TYPE_CHECKING: |
| from prompt_toolkit.application import Application |
| from prompt_toolkit.layout.layout import Layout |
|
|
|
|
| __all__ = [ |
| "Renderer", |
| "print_formatted_text", |
| ] |
|
|
|
|
| def _output_screen_diff( |
| app: Application[Any], |
| output: Output, |
| screen: Screen, |
| current_pos: Point, |
| color_depth: ColorDepth, |
| previous_screen: Screen | None, |
| last_style: str | None, |
| is_done: bool, |
| full_screen: bool, |
| attrs_for_style_string: _StyleStringToAttrsCache, |
| style_string_has_style: _StyleStringHasStyleCache, |
| size: Size, |
| previous_width: int, |
| ) -> tuple[Point, str | None]: |
| """ |
| Render the diff between this screen and the previous screen. |
| |
| This takes two `Screen` instances. The one that represents the output like |
| it was during the last rendering and one that represents the current |
| output raster. Looking at these two `Screen` instances, this function will |
| render the difference by calling the appropriate methods of the `Output` |
| object that only paint the changes to the terminal. |
| |
| This is some performance-critical code which is heavily optimized. |
| Don't change things without profiling first. |
| |
| :param current_pos: Current cursor position. |
| :param last_style: The style string, used for drawing the last drawn |
| character. (Color/attributes.) |
| :param attrs_for_style_string: :class:`._StyleStringToAttrsCache` instance. |
| :param width: The width of the terminal. |
| :param previous_width: The width of the terminal during the last rendering. |
| """ |
| width, height = size.columns, size.rows |
|
|
| |
| write = output.write |
| write_raw = output.write_raw |
|
|
| |
| |
| _output_set_attributes = output.set_attributes |
| _output_reset_attributes = output.reset_attributes |
| _output_cursor_forward = output.cursor_forward |
| _output_cursor_up = output.cursor_up |
| _output_cursor_backward = output.cursor_backward |
|
|
| |
| output.hide_cursor() |
|
|
| def reset_attributes() -> None: |
| "Wrapper around Output.reset_attributes." |
| nonlocal last_style |
| _output_reset_attributes() |
| last_style = None |
|
|
| def move_cursor(new: Point) -> Point: |
| "Move cursor to this `new` point. Returns the given Point." |
| current_x, current_y = current_pos.x, current_pos.y |
|
|
| if new.y > current_y: |
| |
| |
| |
| |
| reset_attributes() |
| write("\r\n" * (new.y - current_y)) |
| current_x = 0 |
| _output_cursor_forward(new.x) |
| return new |
| elif new.y < current_y: |
| _output_cursor_up(current_y - new.y) |
|
|
| if current_x >= width - 1: |
| write("\r") |
| _output_cursor_forward(new.x) |
| elif new.x < current_x or current_x >= width - 1: |
| _output_cursor_backward(current_x - new.x) |
| elif new.x > current_x: |
| _output_cursor_forward(new.x - current_x) |
|
|
| return new |
|
|
| def output_char(char: Char) -> None: |
| """ |
| Write the output of this character. |
| """ |
| nonlocal last_style |
|
|
| |
| |
| if last_style == char.style: |
| write(char.char) |
| else: |
| |
| |
| |
| |
| new_attrs = attrs_for_style_string[char.style] |
| if not last_style or new_attrs != attrs_for_style_string[last_style]: |
| _output_set_attributes(new_attrs, color_depth) |
|
|
| write(char.char) |
| last_style = char.style |
|
|
| def get_max_column_index(row: dict[int, Char]) -> int: |
| """ |
| Return max used column index, ignoring whitespace (without style) at |
| the end of the line. This is important for people that copy/paste |
| terminal output. |
| |
| There are two reasons we are sometimes seeing whitespace at the end: |
| - `BufferControl` adds a trailing space to each line, because it's a |
| possible cursor position, so that the line wrapping won't change if |
| the cursor position moves around. |
| - The `Window` adds a style class to the current line for highlighting |
| (cursor-line). |
| """ |
| numbers = ( |
| index |
| for index, cell in row.items() |
| if cell.char != " " or style_string_has_style[cell.style] |
| ) |
| return max(numbers, default=0) |
|
|
| |
| if not previous_screen: |
| reset_attributes() |
|
|
| |
| |
| |
| |
| if not previous_screen or not full_screen: |
| output.disable_autowrap() |
|
|
| |
| |
| if ( |
| is_done or not previous_screen or previous_width != width |
| ): |
| current_pos = move_cursor(Point(x=0, y=0)) |
| reset_attributes() |
| output.erase_down() |
|
|
| previous_screen = Screen() |
|
|
| |
| |
| |
| current_height = min(screen.height, height) |
|
|
| |
| row_count = min(max(screen.height, previous_screen.height), height) |
|
|
| for y in range(row_count): |
| new_row = screen.data_buffer[y] |
| previous_row = previous_screen.data_buffer[y] |
| zero_width_escapes_row = screen.zero_width_escapes[y] |
|
|
| new_max_line_len = min(width - 1, get_max_column_index(new_row)) |
| previous_max_line_len = min(width - 1, get_max_column_index(previous_row)) |
|
|
| |
| c = 0 |
| while c <= new_max_line_len: |
| new_char = new_row[c] |
| old_char = previous_row[c] |
| char_width = new_char.width or 1 |
|
|
| |
| |
| |
| if new_char.char != old_char.char or new_char.style != old_char.style: |
| current_pos = move_cursor(Point(x=c, y=y)) |
|
|
| |
| if c in zero_width_escapes_row: |
| write_raw(zero_width_escapes_row[c]) |
|
|
| output_char(new_char) |
| current_pos = Point(x=current_pos.x + char_width, y=current_pos.y) |
|
|
| c += char_width |
|
|
| |
| if previous_screen and new_max_line_len < previous_max_line_len: |
| current_pos = move_cursor(Point(x=new_max_line_len + 1, y=y)) |
| reset_attributes() |
| output.erase_end_of_line() |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| if current_height > previous_screen.height: |
| current_pos = move_cursor(Point(x=0, y=current_height - 1)) |
|
|
| |
| if is_done: |
| current_pos = move_cursor(Point(x=0, y=current_height)) |
| output.erase_down() |
| else: |
| current_pos = move_cursor(screen.get_cursor_position(app.layout.current_window)) |
|
|
| if is_done or not full_screen: |
| output.enable_autowrap() |
|
|
| |
| |
| |
| |
| reset_attributes() |
|
|
| if screen.show_cursor or is_done: |
| output.show_cursor() |
|
|
| return current_pos, last_style |
|
|
|
|
| class HeightIsUnknownError(Exception): |
| "Information unavailable. Did not yet receive the CPR response." |
|
|
|
|
| class _StyleStringToAttrsCache(Dict[str, Attrs]): |
| """ |
| A cache structure that maps style strings to :class:`.Attr`. |
| (This is an important speed up.) |
| """ |
|
|
| def __init__( |
| self, |
| get_attrs_for_style_str: Callable[[str], Attrs], |
| style_transformation: StyleTransformation, |
| ) -> None: |
| self.get_attrs_for_style_str = get_attrs_for_style_str |
| self.style_transformation = style_transformation |
|
|
| def __missing__(self, style_str: str) -> Attrs: |
| attrs = self.get_attrs_for_style_str(style_str) |
| attrs = self.style_transformation.transform_attrs(attrs) |
|
|
| self[style_str] = attrs |
| return attrs |
|
|
|
|
| class _StyleStringHasStyleCache(Dict[str, bool]): |
| """ |
| Cache for remember which style strings don't render the default output |
| style (default fg/bg, no underline and no reverse and no blink). That way |
| we know that we should render these cells, even when they're empty (when |
| they contain a space). |
| |
| Note: we don't consider bold/italic/hidden because they don't change the |
| output if there's no text in the cell. |
| """ |
|
|
| def __init__(self, style_string_to_attrs: dict[str, Attrs]) -> None: |
| self.style_string_to_attrs = style_string_to_attrs |
|
|
| def __missing__(self, style_str: str) -> bool: |
| attrs = self.style_string_to_attrs[style_str] |
| is_default = bool( |
| attrs.color |
| or attrs.bgcolor |
| or attrs.underline |
| or attrs.strike |
| or attrs.blink |
| or attrs.reverse |
| ) |
|
|
| self[style_str] = is_default |
| return is_default |
|
|
|
|
| class CPR_Support(Enum): |
| "Enum: whether or not CPR is supported." |
|
|
| SUPPORTED = "SUPPORTED" |
| NOT_SUPPORTED = "NOT_SUPPORTED" |
| UNKNOWN = "UNKNOWN" |
|
|
|
|
| class Renderer: |
| """ |
| Typical usage: |
| |
| :: |
| |
| output = Vt100_Output.from_pty(sys.stdout) |
| r = Renderer(style, output) |
| r.render(app, layout=...) |
| """ |
|
|
| CPR_TIMEOUT = 2 |
|
|
| def __init__( |
| self, |
| style: BaseStyle, |
| output: Output, |
| full_screen: bool = False, |
| mouse_support: FilterOrBool = False, |
| cpr_not_supported_callback: Callable[[], None] | None = None, |
| ) -> None: |
| self.style = style |
| self.output = output |
| self.full_screen = full_screen |
| self.mouse_support = to_filter(mouse_support) |
| self.cpr_not_supported_callback = cpr_not_supported_callback |
|
|
| self._in_alternate_screen = False |
| self._mouse_support_enabled = False |
| self._bracketed_paste_enabled = False |
| self._cursor_key_mode_reset = False |
|
|
| |
| self._waiting_for_cpr_futures: deque[Future[None]] = deque() |
| self.cpr_support = CPR_Support.UNKNOWN |
|
|
| if not output.responds_to_cpr: |
| self.cpr_support = CPR_Support.NOT_SUPPORTED |
|
|
| |
| self._attrs_for_style: _StyleStringToAttrsCache | None = None |
| self._style_string_has_style: _StyleStringHasStyleCache | None = None |
| self._last_style_hash: Hashable | None = None |
| self._last_transformation_hash: Hashable | None = None |
| self._last_color_depth: ColorDepth | None = None |
|
|
| self.reset(_scroll=True) |
|
|
| def reset(self, _scroll: bool = False, leave_alternate_screen: bool = True) -> None: |
| |
| self._cursor_pos = Point(x=0, y=0) |
|
|
| |
| |
| |
| |
| self._last_screen: Screen | None = None |
| self._last_size: Size | None = None |
| self._last_style: str | None = None |
| self._last_cursor_shape: CursorShape | None = None |
|
|
| |
| self.mouse_handlers = MouseHandlers() |
|
|
| |
| |
| self._min_available_height = 0 |
|
|
| |
| |
| |
| if _scroll: |
| self.output.scroll_buffer_to_prompt() |
|
|
| |
| if self._in_alternate_screen and leave_alternate_screen: |
| self.output.quit_alternate_screen() |
| self._in_alternate_screen = False |
|
|
| |
| if self._mouse_support_enabled: |
| self.output.disable_mouse_support() |
| self._mouse_support_enabled = False |
|
|
| |
| if self._bracketed_paste_enabled: |
| self.output.disable_bracketed_paste() |
| self._bracketed_paste_enabled = False |
|
|
| self.output.reset_cursor_shape() |
|
|
| |
|
|
| |
| self.output.flush() |
|
|
| @property |
| def last_rendered_screen(self) -> Screen | None: |
| """ |
| The `Screen` class that was generated during the last rendering. |
| This can be `None`. |
| """ |
| return self._last_screen |
|
|
| @property |
| def height_is_known(self) -> bool: |
| """ |
| True when the height from the cursor until the bottom of the terminal |
| is known. (It's often nicer to draw bottom toolbars only if the height |
| is known, in order to avoid flickering when the CPR response arrives.) |
| """ |
| if self.full_screen or self._min_available_height > 0: |
| return True |
| try: |
| self._min_available_height = self.output.get_rows_below_cursor_position() |
| return True |
| except NotImplementedError: |
| return False |
|
|
| @property |
| def rows_above_layout(self) -> int: |
| """ |
| Return the number of rows visible in the terminal above the layout. |
| """ |
| if self._in_alternate_screen: |
| return 0 |
| elif self._min_available_height > 0: |
| total_rows = self.output.get_size().rows |
| last_screen_height = self._last_screen.height if self._last_screen else 0 |
| return total_rows - max(self._min_available_height, last_screen_height) |
| else: |
| raise HeightIsUnknownError("Rows above layout is unknown.") |
|
|
| def request_absolute_cursor_position(self) -> None: |
| """ |
| Get current cursor position. |
| |
| We do this to calculate the minimum available height that we can |
| consume for rendering the prompt. This is the available space below te |
| cursor. |
| |
| For vt100: Do CPR request. (answer will arrive later.) |
| For win32: Do API call. (Answer comes immediately.) |
| """ |
| |
| |
| assert self._cursor_pos.y == 0 |
|
|
| |
| if self.full_screen: |
| self._min_available_height = self.output.get_size().rows |
| return |
|
|
| |
| |
| try: |
| self._min_available_height = self.output.get_rows_below_cursor_position() |
| return |
| except NotImplementedError: |
| pass |
|
|
| |
| if self.cpr_support == CPR_Support.NOT_SUPPORTED: |
| return |
|
|
| def do_cpr() -> None: |
| |
| self._waiting_for_cpr_futures.append(Future()) |
| self.output.ask_for_cpr() |
|
|
| if self.cpr_support == CPR_Support.SUPPORTED: |
| do_cpr() |
| return |
|
|
| |
| |
| if self.waiting_for_cpr: |
| return |
|
|
| do_cpr() |
|
|
| async def timer() -> None: |
| await sleep(self.CPR_TIMEOUT) |
|
|
| |
| if self.cpr_support == CPR_Support.UNKNOWN: |
| self.cpr_support = CPR_Support.NOT_SUPPORTED |
|
|
| if self.cpr_not_supported_callback: |
| |
| self.cpr_not_supported_callback() |
|
|
| get_app().create_background_task(timer()) |
|
|
| def report_absolute_cursor_row(self, row: int) -> None: |
| """ |
| To be called when we know the absolute cursor position. |
| (As an answer of a "Cursor Position Request" response.) |
| """ |
| self.cpr_support = CPR_Support.SUPPORTED |
|
|
| |
| |
| total_rows = self.output.get_size().rows |
| rows_below_cursor = total_rows - row + 1 |
|
|
| |
| self._min_available_height = rows_below_cursor |
|
|
| |
| try: |
| f = self._waiting_for_cpr_futures.popleft() |
| except IndexError: |
| pass |
| else: |
| f.set_result(None) |
|
|
| @property |
| def waiting_for_cpr(self) -> bool: |
| """ |
| Waiting for CPR flag. True when we send the request, but didn't got a |
| response. |
| """ |
| return bool(self._waiting_for_cpr_futures) |
|
|
| async def wait_for_cpr_responses(self, timeout: int = 1) -> None: |
| """ |
| Wait for a CPR response. |
| """ |
| cpr_futures = list(self._waiting_for_cpr_futures) |
|
|
| |
| if not cpr_futures or self.cpr_support == CPR_Support.NOT_SUPPORTED: |
| return None |
|
|
| async def wait_for_responses() -> None: |
| for response_f in cpr_futures: |
| await response_f |
|
|
| async def wait_for_timeout() -> None: |
| await sleep(timeout) |
|
|
| |
| for response_f in cpr_futures: |
| response_f.cancel() |
| self._waiting_for_cpr_futures = deque() |
|
|
| tasks = { |
| ensure_future(wait_for_responses()), |
| ensure_future(wait_for_timeout()), |
| } |
| _, pending = await wait(tasks, return_when=FIRST_COMPLETED) |
| for task in pending: |
| task.cancel() |
|
|
| def render( |
| self, app: Application[Any], layout: Layout, is_done: bool = False |
| ) -> None: |
| """ |
| Render the current interface to the output. |
| |
| :param is_done: When True, put the cursor at the end of the interface. We |
| won't print any changes to this part. |
| """ |
| output = self.output |
|
|
| |
| if self.full_screen and not self._in_alternate_screen: |
| self._in_alternate_screen = True |
| output.enter_alternate_screen() |
|
|
| |
| if not self._bracketed_paste_enabled: |
| self.output.enable_bracketed_paste() |
| self._bracketed_paste_enabled = True |
|
|
| |
| if not self._cursor_key_mode_reset: |
| self.output.reset_cursor_key_mode() |
| self._cursor_key_mode_reset = True |
|
|
| |
| needs_mouse_support = self.mouse_support() |
|
|
| if needs_mouse_support and not self._mouse_support_enabled: |
| output.enable_mouse_support() |
| self._mouse_support_enabled = True |
|
|
| elif not needs_mouse_support and self._mouse_support_enabled: |
| output.disable_mouse_support() |
| self._mouse_support_enabled = False |
|
|
| |
| size = output.get_size() |
| screen = Screen() |
| screen.show_cursor = False |
| |
| mouse_handlers = MouseHandlers() |
|
|
| |
| if self.full_screen: |
| height = size.rows |
| elif is_done: |
| |
| height = layout.container.preferred_height( |
| size.columns, size.rows |
| ).preferred |
| else: |
| last_height = self._last_screen.height if self._last_screen else 0 |
| height = max( |
| self._min_available_height, |
| last_height, |
| layout.container.preferred_height(size.columns, size.rows).preferred, |
| ) |
|
|
| height = min(height, size.rows) |
|
|
| |
| if self._last_size != size: |
| self._last_screen = None |
|
|
| |
| |
| |
| if ( |
| self.style.invalidation_hash() != self._last_style_hash |
| or app.style_transformation.invalidation_hash() |
| != self._last_transformation_hash |
| or app.color_depth != self._last_color_depth |
| ): |
| self._last_screen = None |
| self._attrs_for_style = None |
| self._style_string_has_style = None |
|
|
| if self._attrs_for_style is None: |
| self._attrs_for_style = _StyleStringToAttrsCache( |
| self.style.get_attrs_for_style_str, app.style_transformation |
| ) |
| if self._style_string_has_style is None: |
| self._style_string_has_style = _StyleStringHasStyleCache( |
| self._attrs_for_style |
| ) |
|
|
| self._last_style_hash = self.style.invalidation_hash() |
| self._last_transformation_hash = app.style_transformation.invalidation_hash() |
| self._last_color_depth = app.color_depth |
|
|
| layout.container.write_to_screen( |
| screen, |
| mouse_handlers, |
| WritePosition(xpos=0, ypos=0, width=size.columns, height=height), |
| parent_style="", |
| erase_bg=False, |
| z_index=None, |
| ) |
| screen.draw_all_floats() |
|
|
| |
| if app.exit_style: |
| screen.append_style_to_content(app.exit_style) |
|
|
| |
| self._cursor_pos, self._last_style = _output_screen_diff( |
| app, |
| output, |
| screen, |
| self._cursor_pos, |
| app.color_depth, |
| self._last_screen, |
| self._last_style, |
| is_done, |
| full_screen=self.full_screen, |
| attrs_for_style_string=self._attrs_for_style, |
| style_string_has_style=self._style_string_has_style, |
| size=size, |
| previous_width=(self._last_size.columns if self._last_size else 0), |
| ) |
| self._last_screen = screen |
| self._last_size = size |
| self.mouse_handlers = mouse_handlers |
|
|
| |
| new_cursor_shape = app.cursor.get_cursor_shape(app) |
| if ( |
| self._last_cursor_shape is None |
| or self._last_cursor_shape != new_cursor_shape |
| ): |
| output.set_cursor_shape(new_cursor_shape) |
| self._last_cursor_shape = new_cursor_shape |
|
|
| |
| output.flush() |
|
|
| |
| app.layout.visible_windows = screen.visible_windows |
|
|
| if is_done: |
| self.reset() |
|
|
| def erase(self, leave_alternate_screen: bool = True) -> None: |
| """ |
| Hide all output and put the cursor back at the first line. This is for |
| instance used for running a system command (while hiding the CLI) and |
| later resuming the same CLI.) |
| |
| :param leave_alternate_screen: When True, and when inside an alternate |
| screen buffer, quit the alternate screen. |
| """ |
| output = self.output |
|
|
| output.cursor_backward(self._cursor_pos.x) |
| output.cursor_up(self._cursor_pos.y) |
| output.erase_down() |
| output.reset_attributes() |
| output.enable_autowrap() |
|
|
| output.flush() |
|
|
| self.reset(leave_alternate_screen=leave_alternate_screen) |
|
|
| def clear(self) -> None: |
| """ |
| Clear screen and go to 0,0 |
| """ |
| |
| self.erase() |
|
|
| |
| output = self.output |
|
|
| output.erase_screen() |
| output.cursor_goto(0, 0) |
| output.flush() |
|
|
| self.request_absolute_cursor_position() |
|
|
|
|
| def print_formatted_text( |
| output: Output, |
| formatted_text: AnyFormattedText, |
| style: BaseStyle, |
| style_transformation: StyleTransformation | None = None, |
| color_depth: ColorDepth | None = None, |
| ) -> None: |
| """ |
| Print a list of (style_str, text) tuples in the given style to the output. |
| """ |
| fragments = to_formatted_text(formatted_text) |
| style_transformation = style_transformation or DummyStyleTransformation() |
| color_depth = color_depth or output.get_default_color_depth() |
|
|
| |
| output.reset_attributes() |
| output.enable_autowrap() |
| last_attrs: Attrs | None = None |
|
|
| |
| attrs_for_style_string = _StyleStringToAttrsCache( |
| style.get_attrs_for_style_str, style_transformation |
| ) |
|
|
| for style_str, text, *_ in fragments: |
| attrs = attrs_for_style_string[style_str] |
|
|
| |
| if attrs != last_attrs: |
| if attrs: |
| output.set_attributes(attrs, color_depth) |
| else: |
| output.reset_attributes() |
| last_attrs = attrs |
|
|
| |
| if "[ZeroWidthEscape]" in style_str: |
| output.write_raw(text) |
| else: |
| |
| text = text.replace("\r", "") |
| |
| |
| text = text.replace("\n", "\r\n") |
| output.write(text) |
|
|
| |
| output.reset_attributes() |
| output.flush() |
|
|