|
""" |
|
Data structures for the Buffer. |
|
It holds the text, cursor position, history, etc... |
|
""" |
|
from __future__ import annotations |
|
|
|
import asyncio |
|
import logging |
|
import os |
|
import re |
|
import shlex |
|
import shutil |
|
import subprocess |
|
import tempfile |
|
from collections import deque |
|
from enum import Enum |
|
from functools import wraps |
|
from typing import Any, Callable, Coroutine, Iterable, TypeVar, cast |
|
|
|
from .application.current import get_app |
|
from .application.run_in_terminal import run_in_terminal |
|
from .auto_suggest import AutoSuggest, Suggestion |
|
from .cache import FastDictCache |
|
from .clipboard import ClipboardData |
|
from .completion import ( |
|
CompleteEvent, |
|
Completer, |
|
Completion, |
|
DummyCompleter, |
|
get_common_complete_suffix, |
|
) |
|
from .document import Document |
|
from .eventloop import aclosing |
|
from .filters import FilterOrBool, to_filter |
|
from .history import History, InMemoryHistory |
|
from .search import SearchDirection, SearchState |
|
from .selection import PasteMode, SelectionState, SelectionType |
|
from .utils import Event, to_str |
|
from .validation import ValidationError, Validator |
|
|
|
__all__ = [ |
|
"EditReadOnlyBuffer", |
|
"Buffer", |
|
"CompletionState", |
|
"indent", |
|
"unindent", |
|
"reshape_text", |
|
] |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class EditReadOnlyBuffer(Exception): |
|
"Attempt editing of read-only :class:`.Buffer`." |
|
|
|
|
|
class ValidationState(Enum): |
|
"The validation state of a buffer. This is set after the validation." |
|
|
|
VALID = "VALID" |
|
INVALID = "INVALID" |
|
UNKNOWN = "UNKNOWN" |
|
|
|
|
|
class CompletionState: |
|
""" |
|
Immutable class that contains a completion state. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
original_document: Document, |
|
completions: list[Completion] | None = None, |
|
complete_index: int | None = None, |
|
) -> None: |
|
|
|
self.original_document = original_document |
|
|
|
|
|
|
|
self.completions = completions or [] |
|
|
|
|
|
|
|
self.complete_index = complete_index |
|
|
|
def __repr__(self) -> str: |
|
return "{}({!r}, <{!r}> completions, index={!r})".format( |
|
self.__class__.__name__, |
|
self.original_document, |
|
len(self.completions), |
|
self.complete_index, |
|
) |
|
|
|
def go_to_index(self, index: int | None) -> None: |
|
""" |
|
Create a new :class:`.CompletionState` object with the new index. |
|
|
|
When `index` is `None` deselect the completion. |
|
""" |
|
if self.completions: |
|
assert index is None or 0 <= index < len(self.completions) |
|
self.complete_index = index |
|
|
|
def new_text_and_position(self) -> tuple[str, int]: |
|
""" |
|
Return (new_text, new_cursor_position) for this completion. |
|
""" |
|
if self.complete_index is None: |
|
return self.original_document.text, self.original_document.cursor_position |
|
else: |
|
original_text_before_cursor = self.original_document.text_before_cursor |
|
original_text_after_cursor = self.original_document.text_after_cursor |
|
|
|
c = self.completions[self.complete_index] |
|
if c.start_position == 0: |
|
before = original_text_before_cursor |
|
else: |
|
before = original_text_before_cursor[: c.start_position] |
|
|
|
new_text = before + c.text + original_text_after_cursor |
|
new_cursor_position = len(before) + len(c.text) |
|
return new_text, new_cursor_position |
|
|
|
@property |
|
def current_completion(self) -> Completion | None: |
|
""" |
|
Return the current completion, or return `None` when no completion is |
|
selected. |
|
""" |
|
if self.complete_index is not None: |
|
return self.completions[self.complete_index] |
|
return None |
|
|
|
|
|
_QUOTED_WORDS_RE = re.compile(r"""(\s+|".*?"|'.*?')""") |
|
|
|
|
|
class YankNthArgState: |
|
""" |
|
For yank-last-arg/yank-nth-arg: Keep track of where we are in the history. |
|
""" |
|
|
|
def __init__( |
|
self, history_position: int = 0, n: int = -1, previous_inserted_word: str = "" |
|
) -> None: |
|
self.history_position = history_position |
|
self.previous_inserted_word = previous_inserted_word |
|
self.n = n |
|
|
|
def __repr__(self) -> str: |
|
return "{}(history_position={!r}, n={!r}, previous_inserted_word={!r})".format( |
|
self.__class__.__name__, |
|
self.history_position, |
|
self.n, |
|
self.previous_inserted_word, |
|
) |
|
|
|
|
|
BufferEventHandler = Callable[["Buffer"], None] |
|
BufferAcceptHandler = Callable[["Buffer"], bool] |
|
|
|
|
|
class Buffer: |
|
""" |
|
The core data structure that holds the text and cursor position of the |
|
current input line and implements all text manipulations on top of it. It |
|
also implements the history, undo stack and the completion state. |
|
|
|
:param completer: :class:`~prompt_toolkit.completion.Completer` instance. |
|
:param history: :class:`~prompt_toolkit.history.History` instance. |
|
:param tempfile_suffix: The tempfile suffix (extension) to be used for the |
|
"open in editor" function. For a Python REPL, this would be ".py", so |
|
that the editor knows the syntax highlighting to use. This can also be |
|
a callable that returns a string. |
|
:param tempfile: For more advanced tempfile situations where you need |
|
control over the subdirectories and filename. For a Git Commit Message, |
|
this would be ".git/COMMIT_EDITMSG", so that the editor knows the syntax |
|
highlighting to use. This can also be a callable that returns a string. |
|
:param name: Name for this buffer. E.g. DEFAULT_BUFFER. This is mostly |
|
useful for key bindings where we sometimes prefer to refer to a buffer |
|
by their name instead of by reference. |
|
:param accept_handler: Called when the buffer input is accepted. (Usually |
|
when the user presses `enter`.) The accept handler receives this |
|
`Buffer` as input and should return True when the buffer text should be |
|
kept instead of calling reset. |
|
|
|
In case of a `PromptSession` for instance, we want to keep the text, |
|
because we will exit the application, and only reset it during the next |
|
run. |
|
|
|
Events: |
|
|
|
:param on_text_changed: When the buffer text changes. (Callable or None.) |
|
:param on_text_insert: When new text is inserted. (Callable or None.) |
|
:param on_cursor_position_changed: When the cursor moves. (Callable or None.) |
|
:param on_completions_changed: When the completions were changed. (Callable or None.) |
|
:param on_suggestion_set: When an auto-suggestion text has been set. (Callable or None.) |
|
|
|
Filters: |
|
|
|
:param complete_while_typing: :class:`~prompt_toolkit.filters.Filter` |
|
or `bool`. Decide whether or not to do asynchronous autocompleting while |
|
typing. |
|
:param validate_while_typing: :class:`~prompt_toolkit.filters.Filter` |
|
or `bool`. Decide whether or not to do asynchronous validation while |
|
typing. |
|
:param enable_history_search: :class:`~prompt_toolkit.filters.Filter` or |
|
`bool` to indicate when up-arrow partial string matching is enabled. It |
|
is advised to not enable this at the same time as |
|
`complete_while_typing`, because when there is an autocompletion found, |
|
the up arrows usually browse through the completions, rather than |
|
through the history. |
|
:param read_only: :class:`~prompt_toolkit.filters.Filter`. When True, |
|
changes will not be allowed. |
|
:param multiline: :class:`~prompt_toolkit.filters.Filter` or `bool`. When |
|
not set, pressing `Enter` will call the `accept_handler`. Otherwise, |
|
pressing `Esc-Enter` is required. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
completer: Completer | None = None, |
|
auto_suggest: AutoSuggest | None = None, |
|
history: History | None = None, |
|
validator: Validator | None = None, |
|
tempfile_suffix: str | Callable[[], str] = "", |
|
tempfile: str | Callable[[], str] = "", |
|
name: str = "", |
|
complete_while_typing: FilterOrBool = False, |
|
validate_while_typing: FilterOrBool = False, |
|
enable_history_search: FilterOrBool = False, |
|
document: Document | None = None, |
|
accept_handler: BufferAcceptHandler | None = None, |
|
read_only: FilterOrBool = False, |
|
multiline: FilterOrBool = True, |
|
on_text_changed: BufferEventHandler | None = None, |
|
on_text_insert: BufferEventHandler | None = None, |
|
on_cursor_position_changed: BufferEventHandler | None = None, |
|
on_completions_changed: BufferEventHandler | None = None, |
|
on_suggestion_set: BufferEventHandler | None = None, |
|
): |
|
|
|
enable_history_search = to_filter(enable_history_search) |
|
complete_while_typing = to_filter(complete_while_typing) |
|
validate_while_typing = to_filter(validate_while_typing) |
|
read_only = to_filter(read_only) |
|
multiline = to_filter(multiline) |
|
|
|
self.completer = completer or DummyCompleter() |
|
self.auto_suggest = auto_suggest |
|
self.validator = validator |
|
self.tempfile_suffix = tempfile_suffix |
|
self.tempfile = tempfile |
|
self.name = name |
|
self.accept_handler = accept_handler |
|
|
|
|
|
self.complete_while_typing = complete_while_typing |
|
self.validate_while_typing = validate_while_typing |
|
self.enable_history_search = enable_history_search |
|
self.read_only = read_only |
|
self.multiline = multiline |
|
|
|
|
|
self.text_width = 0 |
|
|
|
|
|
|
|
|
|
self.history = InMemoryHistory() if history is None else history |
|
|
|
self.__cursor_position = 0 |
|
|
|
|
|
self.on_text_changed: Event[Buffer] = Event(self, on_text_changed) |
|
self.on_text_insert: Event[Buffer] = Event(self, on_text_insert) |
|
self.on_cursor_position_changed: Event[Buffer] = Event( |
|
self, on_cursor_position_changed |
|
) |
|
self.on_completions_changed: Event[Buffer] = Event(self, on_completions_changed) |
|
self.on_suggestion_set: Event[Buffer] = Event(self, on_suggestion_set) |
|
|
|
|
|
self._document_cache: FastDictCache[ |
|
tuple[str, int, SelectionState | None], Document |
|
] = FastDictCache(Document, size=10) |
|
|
|
|
|
self._async_suggester = self._create_auto_suggest_coroutine() |
|
self._async_completer = self._create_completer_coroutine() |
|
self._async_validator = self._create_auto_validate_coroutine() |
|
|
|
|
|
self._load_history_task: asyncio.Future[None] | None = None |
|
|
|
|
|
self.reset(document=document) |
|
|
|
def __repr__(self) -> str: |
|
if len(self.text) < 15: |
|
text = self.text |
|
else: |
|
text = self.text[:12] + "..." |
|
|
|
return f"<Buffer(name={self.name!r}, text={text!r}) at {id(self)!r}>" |
|
|
|
def reset( |
|
self, document: Document | None = None, append_to_history: bool = False |
|
) -> None: |
|
""" |
|
:param append_to_history: Append current input to history first. |
|
""" |
|
if append_to_history: |
|
self.append_to_history() |
|
|
|
document = document or Document() |
|
|
|
self.__cursor_position = document.cursor_position |
|
|
|
|
|
self.validation_error: ValidationError | None = None |
|
self.validation_state: ValidationState | None = ValidationState.UNKNOWN |
|
|
|
|
|
self.selection_state: SelectionState | None = None |
|
|
|
|
|
|
|
|
|
self.multiple_cursor_positions: list[int] = [] |
|
|
|
|
|
self.preferred_column: int | None = None |
|
|
|
|
|
|
|
self.complete_state: CompletionState | None = None |
|
|
|
|
|
self.yank_nth_arg_state: YankNthArgState | None = None |
|
|
|
|
|
|
|
self.document_before_paste: Document | None = None |
|
|
|
|
|
self.suggestion: Suggestion | None = None |
|
|
|
|
|
|
|
self.history_search_text: str | None = None |
|
|
|
|
|
self._undo_stack: list[tuple[str, int]] = [] |
|
self._redo_stack: list[tuple[str, int]] = [] |
|
|
|
|
|
|
|
|
|
if self._load_history_task is not None: |
|
self._load_history_task.cancel() |
|
self._load_history_task = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
self._working_lines: deque[str] = deque([document.text]) |
|
self.__working_index = 0 |
|
|
|
def load_history_if_not_yet_loaded(self) -> None: |
|
""" |
|
Create task for populating the buffer history (if not yet done). |
|
|
|
Note:: |
|
|
|
This needs to be called from within the event loop of the |
|
application, because history loading is async, and we need to be |
|
sure the right event loop is active. Therefor, we call this method |
|
in the `BufferControl.create_content`. |
|
|
|
There are situations where prompt_toolkit applications are created |
|
in one thread, but will later run in a different thread (Ptpython |
|
is one example. The REPL runs in a separate thread, in order to |
|
prevent interfering with a potential different event loop in the |
|
main thread. The REPL UI however is still created in the main |
|
thread.) We could decide to not support creating prompt_toolkit |
|
objects in one thread and running the application in a different |
|
thread, but history loading is the only place where it matters, and |
|
this solves it. |
|
""" |
|
if self._load_history_task is None: |
|
|
|
async def load_history() -> None: |
|
async for item in self.history.load(): |
|
self._working_lines.appendleft(item) |
|
self.__working_index += 1 |
|
|
|
self._load_history_task = get_app().create_background_task(load_history()) |
|
|
|
def load_history_done(f: asyncio.Future[None]) -> None: |
|
""" |
|
Handle `load_history` result when either done, cancelled, or |
|
when an exception was raised. |
|
""" |
|
try: |
|
f.result() |
|
except asyncio.CancelledError: |
|
|
|
|
|
pass |
|
except GeneratorExit: |
|
|
|
|
|
|
|
pass |
|
except BaseException: |
|
|
|
|
|
logger.exception("Loading history failed") |
|
|
|
self._load_history_task.add_done_callback(load_history_done) |
|
|
|
|
|
|
|
def _set_text(self, value: str) -> bool: |
|
"""set text at current working_index. Return whether it changed.""" |
|
working_index = self.working_index |
|
working_lines = self._working_lines |
|
|
|
original_value = working_lines[working_index] |
|
working_lines[working_index] = value |
|
|
|
|
|
if len(value) != len(original_value): |
|
|
|
|
|
|
|
|
|
|
|
return True |
|
elif value != original_value: |
|
return True |
|
return False |
|
|
|
def _set_cursor_position(self, value: int) -> bool: |
|
"""Set cursor position. Return whether it changed.""" |
|
original_position = self.__cursor_position |
|
self.__cursor_position = max(0, value) |
|
|
|
return self.__cursor_position != original_position |
|
|
|
@property |
|
def text(self) -> str: |
|
return self._working_lines[self.working_index] |
|
|
|
@text.setter |
|
def text(self, value: str) -> None: |
|
""" |
|
Setting text. (When doing this, make sure that the cursor_position is |
|
valid for this text. text/cursor_position should be consistent at any time, |
|
otherwise set a Document instead.) |
|
""" |
|
|
|
if self.cursor_position > len(value): |
|
self.cursor_position = len(value) |
|
|
|
|
|
if self.read_only(): |
|
raise EditReadOnlyBuffer() |
|
|
|
changed = self._set_text(value) |
|
|
|
if changed: |
|
self._text_changed() |
|
|
|
|
|
|
|
|
|
|
|
self.history_search_text = None |
|
|
|
@property |
|
def cursor_position(self) -> int: |
|
return self.__cursor_position |
|
|
|
@cursor_position.setter |
|
def cursor_position(self, value: int) -> None: |
|
""" |
|
Setting cursor position. |
|
""" |
|
assert isinstance(value, int) |
|
|
|
|
|
if value > len(self.text): |
|
value = len(self.text) |
|
if value < 0: |
|
value = 0 |
|
|
|
changed = self._set_cursor_position(value) |
|
|
|
if changed: |
|
self._cursor_position_changed() |
|
|
|
@property |
|
def working_index(self) -> int: |
|
return self.__working_index |
|
|
|
@working_index.setter |
|
def working_index(self, value: int) -> None: |
|
if self.__working_index != value: |
|
self.__working_index = value |
|
|
|
|
|
|
|
self.cursor_position = 0 |
|
self._text_changed() |
|
|
|
def _text_changed(self) -> None: |
|
|
|
self.validation_error = None |
|
self.validation_state = ValidationState.UNKNOWN |
|
self.complete_state = None |
|
self.yank_nth_arg_state = None |
|
self.document_before_paste = None |
|
self.selection_state = None |
|
self.suggestion = None |
|
self.preferred_column = None |
|
|
|
|
|
self.on_text_changed.fire() |
|
|
|
|
|
|
|
|
|
if self.validator and self.validate_while_typing(): |
|
get_app().create_background_task(self._async_validator()) |
|
|
|
def _cursor_position_changed(self) -> None: |
|
|
|
|
|
|
|
self.complete_state = None |
|
self.yank_nth_arg_state = None |
|
self.document_before_paste = None |
|
|
|
|
|
|
|
self.preferred_column = None |
|
|
|
|
|
|
|
|
|
|
|
self.on_cursor_position_changed.fire() |
|
|
|
@property |
|
def document(self) -> Document: |
|
""" |
|
Return :class:`~prompt_toolkit.document.Document` instance from the |
|
current text, cursor position and selection state. |
|
""" |
|
return self._document_cache[ |
|
self.text, self.cursor_position, self.selection_state |
|
] |
|
|
|
@document.setter |
|
def document(self, value: Document) -> None: |
|
""" |
|
Set :class:`~prompt_toolkit.document.Document` instance. |
|
|
|
This will set both the text and cursor position at the same time, but |
|
atomically. (Change events will be triggered only after both have been set.) |
|
""" |
|
self.set_document(value) |
|
|
|
def set_document(self, value: Document, bypass_readonly: bool = False) -> None: |
|
""" |
|
Set :class:`~prompt_toolkit.document.Document` instance. Like the |
|
``document`` property, but accept an ``bypass_readonly`` argument. |
|
|
|
:param bypass_readonly: When True, don't raise an |
|
:class:`.EditReadOnlyBuffer` exception, even |
|
when the buffer is read-only. |
|
|
|
.. warning:: |
|
|
|
When this buffer is read-only and `bypass_readonly` was not passed, |
|
the `EditReadOnlyBuffer` exception will be caught by the |
|
`KeyProcessor` and is silently suppressed. This is important to |
|
keep in mind when writing key bindings, because it won't do what |
|
you expect, and there won't be a stack trace. Use try/finally |
|
around this function if you need some cleanup code. |
|
""" |
|
|
|
if not bypass_readonly and self.read_only(): |
|
raise EditReadOnlyBuffer() |
|
|
|
|
|
text_changed = self._set_text(value.text) |
|
cursor_position_changed = self._set_cursor_position(value.cursor_position) |
|
|
|
|
|
|
|
if text_changed: |
|
self._text_changed() |
|
self.history_search_text = None |
|
|
|
if cursor_position_changed: |
|
self._cursor_position_changed() |
|
|
|
@property |
|
def is_returnable(self) -> bool: |
|
""" |
|
True when there is something handling accept. |
|
""" |
|
return bool(self.accept_handler) |
|
|
|
|
|
|
|
def save_to_undo_stack(self, clear_redo_stack: bool = True) -> None: |
|
""" |
|
Safe current state (input text and cursor position), so that we can |
|
restore it by calling undo. |
|
""" |
|
|
|
|
|
if self._undo_stack and self._undo_stack[-1][0] == self.text: |
|
self._undo_stack[-1] = (self._undo_stack[-1][0], self.cursor_position) |
|
else: |
|
self._undo_stack.append((self.text, self.cursor_position)) |
|
|
|
|
|
if clear_redo_stack: |
|
self._redo_stack = [] |
|
|
|
def transform_lines( |
|
self, |
|
line_index_iterator: Iterable[int], |
|
transform_callback: Callable[[str], str], |
|
) -> str: |
|
""" |
|
Transforms the text on a range of lines. |
|
When the iterator yield an index not in the range of lines that the |
|
document contains, it skips them silently. |
|
|
|
To uppercase some lines:: |
|
|
|
new_text = transform_lines(range(5,10), lambda text: text.upper()) |
|
|
|
:param line_index_iterator: Iterator of line numbers (int) |
|
:param transform_callback: callable that takes the original text of a |
|
line, and return the new text for this line. |
|
|
|
:returns: The new text. |
|
""" |
|
|
|
lines = self.text.split("\n") |
|
|
|
|
|
for index in line_index_iterator: |
|
try: |
|
lines[index] = transform_callback(lines[index]) |
|
except IndexError: |
|
pass |
|
|
|
return "\n".join(lines) |
|
|
|
def transform_current_line(self, transform_callback: Callable[[str], str]) -> None: |
|
""" |
|
Apply the given transformation function to the current line. |
|
|
|
:param transform_callback: callable that takes a string and return a new string. |
|
""" |
|
document = self.document |
|
a = document.cursor_position + document.get_start_of_line_position() |
|
b = document.cursor_position + document.get_end_of_line_position() |
|
self.text = ( |
|
document.text[:a] |
|
+ transform_callback(document.text[a:b]) |
|
+ document.text[b:] |
|
) |
|
|
|
def transform_region( |
|
self, from_: int, to: int, transform_callback: Callable[[str], str] |
|
) -> None: |
|
""" |
|
Transform a part of the input string. |
|
|
|
:param from_: (int) start position. |
|
:param to: (int) end position. |
|
:param transform_callback: Callable which accepts a string and returns |
|
the transformed string. |
|
""" |
|
assert from_ < to |
|
|
|
self.text = "".join( |
|
[ |
|
self.text[:from_] |
|
+ transform_callback(self.text[from_:to]) |
|
+ self.text[to:] |
|
] |
|
) |
|
|
|
def cursor_left(self, count: int = 1) -> None: |
|
self.cursor_position += self.document.get_cursor_left_position(count=count) |
|
|
|
def cursor_right(self, count: int = 1) -> None: |
|
self.cursor_position += self.document.get_cursor_right_position(count=count) |
|
|
|
def cursor_up(self, count: int = 1) -> None: |
|
"""(for multiline edit). Move cursor to the previous line.""" |
|
original_column = self.preferred_column or self.document.cursor_position_col |
|
self.cursor_position += self.document.get_cursor_up_position( |
|
count=count, preferred_column=original_column |
|
) |
|
|
|
|
|
self.preferred_column = original_column |
|
|
|
def cursor_down(self, count: int = 1) -> None: |
|
"""(for multiline edit). Move cursor to the next line.""" |
|
original_column = self.preferred_column or self.document.cursor_position_col |
|
self.cursor_position += self.document.get_cursor_down_position( |
|
count=count, preferred_column=original_column |
|
) |
|
|
|
|
|
self.preferred_column = original_column |
|
|
|
def auto_up( |
|
self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False |
|
) -> None: |
|
""" |
|
If we're not on the first line (of a multiline input) go a line up, |
|
otherwise go back in history. (If nothing is selected.) |
|
""" |
|
if self.complete_state: |
|
self.complete_previous(count=count) |
|
elif self.document.cursor_position_row > 0: |
|
self.cursor_up(count=count) |
|
elif not self.selection_state: |
|
self.history_backward(count=count) |
|
|
|
|
|
if go_to_start_of_line_if_history_changes: |
|
self.cursor_position += self.document.get_start_of_line_position() |
|
|
|
def auto_down( |
|
self, count: int = 1, go_to_start_of_line_if_history_changes: bool = False |
|
) -> None: |
|
""" |
|
If we're not on the last line (of a multiline input) go a line down, |
|
otherwise go forward in history. (If nothing is selected.) |
|
""" |
|
if self.complete_state: |
|
self.complete_next(count=count) |
|
elif self.document.cursor_position_row < self.document.line_count - 1: |
|
self.cursor_down(count=count) |
|
elif not self.selection_state: |
|
self.history_forward(count=count) |
|
|
|
|
|
if go_to_start_of_line_if_history_changes: |
|
self.cursor_position += self.document.get_start_of_line_position() |
|
|
|
def delete_before_cursor(self, count: int = 1) -> str: |
|
""" |
|
Delete specified number of characters before cursor and return the |
|
deleted text. |
|
""" |
|
assert count >= 0 |
|
deleted = "" |
|
|
|
if self.cursor_position > 0: |
|
deleted = self.text[self.cursor_position - count : self.cursor_position] |
|
|
|
new_text = ( |
|
self.text[: self.cursor_position - count] |
|
+ self.text[self.cursor_position :] |
|
) |
|
new_cursor_position = self.cursor_position - len(deleted) |
|
|
|
|
|
self.document = Document(new_text, new_cursor_position) |
|
|
|
return deleted |
|
|
|
def delete(self, count: int = 1) -> str: |
|
""" |
|
Delete specified number of characters and Return the deleted text. |
|
""" |
|
if self.cursor_position < len(self.text): |
|
deleted = self.document.text_after_cursor[:count] |
|
self.text = ( |
|
self.text[: self.cursor_position] |
|
+ self.text[self.cursor_position + len(deleted) :] |
|
) |
|
return deleted |
|
else: |
|
return "" |
|
|
|
def join_next_line(self, separator: str = " ") -> None: |
|
""" |
|
Join the next line to the current one by deleting the line ending after |
|
the current line. |
|
""" |
|
if not self.document.on_last_line: |
|
self.cursor_position += self.document.get_end_of_line_position() |
|
self.delete() |
|
|
|
|
|
self.text = ( |
|
self.document.text_before_cursor |
|
+ separator |
|
+ self.document.text_after_cursor.lstrip(" ") |
|
) |
|
|
|
def join_selected_lines(self, separator: str = " ") -> None: |
|
""" |
|
Join the selected lines. |
|
""" |
|
assert self.selection_state |
|
|
|
|
|
from_, to = sorted( |
|
[self.cursor_position, self.selection_state.original_cursor_position] |
|
) |
|
|
|
before = self.text[:from_] |
|
lines = self.text[from_:to].splitlines() |
|
after = self.text[to:] |
|
|
|
|
|
lines = [l.lstrip(" ") + separator for l in lines] |
|
|
|
|
|
self.document = Document( |
|
text=before + "".join(lines) + after, |
|
cursor_position=len(before + "".join(lines[:-1])) - 1, |
|
) |
|
|
|
def swap_characters_before_cursor(self) -> None: |
|
""" |
|
Swap the last two characters before the cursor. |
|
""" |
|
pos = self.cursor_position |
|
|
|
if pos >= 2: |
|
a = self.text[pos - 2] |
|
b = self.text[pos - 1] |
|
|
|
self.text = self.text[: pos - 2] + b + a + self.text[pos:] |
|
|
|
def go_to_history(self, index: int) -> None: |
|
""" |
|
Go to this item in the history. |
|
""" |
|
if index < len(self._working_lines): |
|
self.working_index = index |
|
self.cursor_position = len(self.text) |
|
|
|
def complete_next(self, count: int = 1, disable_wrap_around: bool = False) -> None: |
|
""" |
|
Browse to the next completions. |
|
(Does nothing if there are no completion.) |
|
""" |
|
index: int | None |
|
|
|
if self.complete_state: |
|
completions_count = len(self.complete_state.completions) |
|
|
|
if self.complete_state.complete_index is None: |
|
index = 0 |
|
elif self.complete_state.complete_index == completions_count - 1: |
|
index = None |
|
|
|
if disable_wrap_around: |
|
return |
|
else: |
|
index = min( |
|
completions_count - 1, self.complete_state.complete_index + count |
|
) |
|
self.go_to_completion(index) |
|
|
|
def complete_previous( |
|
self, count: int = 1, disable_wrap_around: bool = False |
|
) -> None: |
|
""" |
|
Browse to the previous completions. |
|
(Does nothing if there are no completion.) |
|
""" |
|
index: int | None |
|
|
|
if self.complete_state: |
|
if self.complete_state.complete_index == 0: |
|
index = None |
|
|
|
if disable_wrap_around: |
|
return |
|
elif self.complete_state.complete_index is None: |
|
index = len(self.complete_state.completions) - 1 |
|
else: |
|
index = max(0, self.complete_state.complete_index - count) |
|
|
|
self.go_to_completion(index) |
|
|
|
def cancel_completion(self) -> None: |
|
""" |
|
Cancel completion, go back to the original text. |
|
""" |
|
if self.complete_state: |
|
self.go_to_completion(None) |
|
self.complete_state = None |
|
|
|
def _set_completions(self, completions: list[Completion]) -> CompletionState: |
|
""" |
|
Start completions. (Generate list of completions and initialize.) |
|
|
|
By default, no completion will be selected. |
|
""" |
|
self.complete_state = CompletionState( |
|
original_document=self.document, completions=completions |
|
) |
|
|
|
|
|
self.on_completions_changed.fire() |
|
|
|
return self.complete_state |
|
|
|
def start_history_lines_completion(self) -> None: |
|
""" |
|
Start a completion based on all the other lines in the document and the |
|
history. |
|
""" |
|
found_completions: set[str] = set() |
|
completions = [] |
|
|
|
|
|
current_line = self.document.current_line_before_cursor.lstrip() |
|
|
|
for i, string in enumerate(self._working_lines): |
|
for j, l in enumerate(string.split("\n")): |
|
l = l.strip() |
|
if l and l.startswith(current_line): |
|
|
|
if l not in found_completions: |
|
found_completions.add(l) |
|
|
|
|
|
if i == self.working_index: |
|
display_meta = "Current, line %s" % (j + 1) |
|
else: |
|
display_meta = f"History {i + 1}, line {j + 1}" |
|
|
|
completions.append( |
|
Completion( |
|
text=l, |
|
start_position=-len(current_line), |
|
display_meta=display_meta, |
|
) |
|
) |
|
|
|
self._set_completions(completions=completions[::-1]) |
|
self.go_to_completion(0) |
|
|
|
def go_to_completion(self, index: int | None) -> None: |
|
""" |
|
Select a completion from the list of current completions. |
|
""" |
|
assert self.complete_state |
|
|
|
|
|
state = self.complete_state |
|
state.go_to_index(index) |
|
|
|
|
|
new_text, new_cursor_position = state.new_text_and_position() |
|
self.document = Document(new_text, new_cursor_position) |
|
|
|
|
|
self.complete_state = state |
|
|
|
def apply_completion(self, completion: Completion) -> None: |
|
""" |
|
Insert a given completion. |
|
""" |
|
|
|
if self.complete_state: |
|
self.go_to_completion(None) |
|
self.complete_state = None |
|
|
|
|
|
self.delete_before_cursor(-completion.start_position) |
|
self.insert_text(completion.text) |
|
|
|
def _set_history_search(self) -> None: |
|
""" |
|
Set `history_search_text`. |
|
(The text before the cursor will be used for filtering the history.) |
|
""" |
|
if self.enable_history_search(): |
|
if self.history_search_text is None: |
|
self.history_search_text = self.document.text_before_cursor |
|
else: |
|
self.history_search_text = None |
|
|
|
def _history_matches(self, i: int) -> bool: |
|
""" |
|
True when the current entry matches the history search. |
|
(when we don't have history search, it's also True.) |
|
""" |
|
return self.history_search_text is None or self._working_lines[i].startswith( |
|
self.history_search_text |
|
) |
|
|
|
def history_forward(self, count: int = 1) -> None: |
|
""" |
|
Move forwards through the history. |
|
|
|
:param count: Amount of items to move forward. |
|
""" |
|
self._set_history_search() |
|
|
|
|
|
found_something = False |
|
|
|
for i in range(self.working_index + 1, len(self._working_lines)): |
|
if self._history_matches(i): |
|
self.working_index = i |
|
count -= 1 |
|
found_something = True |
|
if count == 0: |
|
break |
|
|
|
|
|
if found_something: |
|
self.cursor_position = 0 |
|
self.cursor_position += self.document.get_end_of_line_position() |
|
|
|
def history_backward(self, count: int = 1) -> None: |
|
""" |
|
Move backwards through history. |
|
""" |
|
self._set_history_search() |
|
|
|
|
|
found_something = False |
|
|
|
for i in range(self.working_index - 1, -1, -1): |
|
if self._history_matches(i): |
|
self.working_index = i |
|
count -= 1 |
|
found_something = True |
|
if count == 0: |
|
break |
|
|
|
|
|
if found_something: |
|
self.cursor_position = len(self.text) |
|
|
|
def yank_nth_arg(self, n: int | None = None, _yank_last_arg: bool = False) -> None: |
|
""" |
|
Pick nth word from previous history entry (depending on current |
|
`yank_nth_arg_state`) and insert it at current position. Rotate through |
|
history if called repeatedly. If no `n` has been given, take the first |
|
argument. (The second word.) |
|
|
|
:param n: (None or int), The index of the word from the previous line |
|
to take. |
|
""" |
|
assert n is None or isinstance(n, int) |
|
history_strings = self.history.get_strings() |
|
|
|
if not len(history_strings): |
|
return |
|
|
|
|
|
if self.yank_nth_arg_state is None: |
|
state = YankNthArgState(n=-1 if _yank_last_arg else 1) |
|
else: |
|
state = self.yank_nth_arg_state |
|
|
|
if n is not None: |
|
state.n = n |
|
|
|
|
|
new_pos = state.history_position - 1 |
|
if -new_pos > len(history_strings): |
|
new_pos = -1 |
|
|
|
|
|
line = history_strings[new_pos] |
|
|
|
words = [w.strip() for w in _QUOTED_WORDS_RE.split(line)] |
|
words = [w for w in words if w] |
|
try: |
|
word = words[state.n] |
|
except IndexError: |
|
word = "" |
|
|
|
|
|
if state.previous_inserted_word: |
|
self.delete_before_cursor(len(state.previous_inserted_word)) |
|
self.insert_text(word) |
|
|
|
|
|
|
|
state.previous_inserted_word = word |
|
state.history_position = new_pos |
|
self.yank_nth_arg_state = state |
|
|
|
def yank_last_arg(self, n: int | None = None) -> None: |
|
""" |
|
Like `yank_nth_arg`, but if no argument has been given, yank the last |
|
word by default. |
|
""" |
|
self.yank_nth_arg(n=n, _yank_last_arg=True) |
|
|
|
def start_selection( |
|
self, selection_type: SelectionType = SelectionType.CHARACTERS |
|
) -> None: |
|
""" |
|
Take the current cursor position as the start of this selection. |
|
""" |
|
self.selection_state = SelectionState(self.cursor_position, selection_type) |
|
|
|
def copy_selection(self, _cut: bool = False) -> ClipboardData: |
|
""" |
|
Copy selected text and return :class:`.ClipboardData` instance. |
|
|
|
Notice that this doesn't store the copied data on the clipboard yet. |
|
You can store it like this: |
|
|
|
.. code:: python |
|
|
|
data = buffer.copy_selection() |
|
get_app().clipboard.set_data(data) |
|
""" |
|
new_document, clipboard_data = self.document.cut_selection() |
|
if _cut: |
|
self.document = new_document |
|
|
|
self.selection_state = None |
|
return clipboard_data |
|
|
|
def cut_selection(self) -> ClipboardData: |
|
""" |
|
Delete selected text and return :class:`.ClipboardData` instance. |
|
""" |
|
return self.copy_selection(_cut=True) |
|
|
|
def paste_clipboard_data( |
|
self, |
|
data: ClipboardData, |
|
paste_mode: PasteMode = PasteMode.EMACS, |
|
count: int = 1, |
|
) -> None: |
|
""" |
|
Insert the data from the clipboard. |
|
""" |
|
assert isinstance(data, ClipboardData) |
|
assert paste_mode in (PasteMode.VI_BEFORE, PasteMode.VI_AFTER, PasteMode.EMACS) |
|
|
|
original_document = self.document |
|
self.document = self.document.paste_clipboard_data( |
|
data, paste_mode=paste_mode, count=count |
|
) |
|
|
|
|
|
|
|
self.document_before_paste = original_document |
|
|
|
def newline(self, copy_margin: bool = True) -> None: |
|
""" |
|
Insert a line ending at the current position. |
|
""" |
|
if copy_margin: |
|
self.insert_text("\n" + self.document.leading_whitespace_in_current_line) |
|
else: |
|
self.insert_text("\n") |
|
|
|
def insert_line_above(self, copy_margin: bool = True) -> None: |
|
""" |
|
Insert a new line above the current one. |
|
""" |
|
if copy_margin: |
|
insert = self.document.leading_whitespace_in_current_line + "\n" |
|
else: |
|
insert = "\n" |
|
|
|
self.cursor_position += self.document.get_start_of_line_position() |
|
self.insert_text(insert) |
|
self.cursor_position -= 1 |
|
|
|
def insert_line_below(self, copy_margin: bool = True) -> None: |
|
""" |
|
Insert a new line below the current one. |
|
""" |
|
if copy_margin: |
|
insert = "\n" + self.document.leading_whitespace_in_current_line |
|
else: |
|
insert = "\n" |
|
|
|
self.cursor_position += self.document.get_end_of_line_position() |
|
self.insert_text(insert) |
|
|
|
def insert_text( |
|
self, |
|
data: str, |
|
overwrite: bool = False, |
|
move_cursor: bool = True, |
|
fire_event: bool = True, |
|
) -> None: |
|
""" |
|
Insert characters at cursor position. |
|
|
|
:param fire_event: Fire `on_text_insert` event. This is mainly used to |
|
trigger autocompletion while typing. |
|
""" |
|
|
|
otext = self.text |
|
ocpos = self.cursor_position |
|
|
|
|
|
if overwrite: |
|
|
|
|
|
overwritten_text = otext[ocpos : ocpos + len(data)] |
|
if "\n" in overwritten_text: |
|
overwritten_text = overwritten_text[: overwritten_text.find("\n")] |
|
|
|
text = otext[:ocpos] + data + otext[ocpos + len(overwritten_text) :] |
|
else: |
|
text = otext[:ocpos] + data + otext[ocpos:] |
|
|
|
if move_cursor: |
|
cpos = self.cursor_position + len(data) |
|
else: |
|
cpos = self.cursor_position |
|
|
|
|
|
|
|
|
|
|
|
self.document = Document(text, cpos) |
|
|
|
|
|
if fire_event: |
|
self.on_text_insert.fire() |
|
|
|
|
|
if self.completer and self.complete_while_typing(): |
|
get_app().create_background_task(self._async_completer()) |
|
|
|
|
|
if self.auto_suggest: |
|
get_app().create_background_task(self._async_suggester()) |
|
|
|
def undo(self) -> None: |
|
|
|
|
|
|
|
|
|
while self._undo_stack: |
|
text, pos = self._undo_stack.pop() |
|
|
|
if text != self.text: |
|
|
|
self._redo_stack.append((self.text, self.cursor_position)) |
|
|
|
|
|
self.document = Document(text, cursor_position=pos) |
|
break |
|
|
|
def redo(self) -> None: |
|
if self._redo_stack: |
|
|
|
self.save_to_undo_stack(clear_redo_stack=False) |
|
|
|
|
|
text, pos = self._redo_stack.pop() |
|
self.document = Document(text, cursor_position=pos) |
|
|
|
def validate(self, set_cursor: bool = False) -> bool: |
|
""" |
|
Returns `True` if valid. |
|
|
|
:param set_cursor: Set the cursor position, if an error was found. |
|
""" |
|
|
|
|
|
if self.validation_state != ValidationState.UNKNOWN: |
|
return self.validation_state == ValidationState.VALID |
|
|
|
|
|
if self.validator: |
|
try: |
|
self.validator.validate(self.document) |
|
except ValidationError as e: |
|
|
|
if set_cursor: |
|
self.cursor_position = min( |
|
max(0, e.cursor_position), len(self.text) |
|
) |
|
|
|
self.validation_state = ValidationState.INVALID |
|
self.validation_error = e |
|
return False |
|
|
|
|
|
self.validation_state = ValidationState.VALID |
|
self.validation_error = None |
|
return True |
|
|
|
async def _validate_async(self) -> None: |
|
""" |
|
Asynchronous version of `validate()`. |
|
This one doesn't set the cursor position. |
|
|
|
We have both variants, because a synchronous version is required. |
|
Handling the ENTER key needs to be completely synchronous, otherwise |
|
stuff like type-ahead is going to give very weird results. (People |
|
could type input while the ENTER key is still processed.) |
|
|
|
An asynchronous version is required if we have `validate_while_typing` |
|
enabled. |
|
""" |
|
while True: |
|
|
|
|
|
if self.validation_state != ValidationState.UNKNOWN: |
|
return |
|
|
|
|
|
error = None |
|
document = self.document |
|
|
|
if self.validator: |
|
try: |
|
await self.validator.validate_async(self.document) |
|
except ValidationError as e: |
|
error = e |
|
|
|
|
|
if self.document != document: |
|
continue |
|
|
|
|
|
if error: |
|
self.validation_state = ValidationState.INVALID |
|
else: |
|
self.validation_state = ValidationState.VALID |
|
|
|
self.validation_error = error |
|
get_app().invalidate() |
|
|
|
def append_to_history(self) -> None: |
|
""" |
|
Append the current input to the history. |
|
""" |
|
|
|
|
|
if self.text: |
|
history_strings = self.history.get_strings() |
|
if not len(history_strings) or history_strings[-1] != self.text: |
|
self.history.append_string(self.text) |
|
|
|
def _search( |
|
self, |
|
search_state: SearchState, |
|
include_current_position: bool = False, |
|
count: int = 1, |
|
) -> tuple[int, int] | None: |
|
""" |
|
Execute search. Return (working_index, cursor_position) tuple when this |
|
search is applied. Returns `None` when this text cannot be found. |
|
""" |
|
assert count > 0 |
|
|
|
text = search_state.text |
|
direction = search_state.direction |
|
ignore_case = search_state.ignore_case() |
|
|
|
def search_once( |
|
working_index: int, document: Document |
|
) -> tuple[int, Document] | None: |
|
""" |
|
Do search one time. |
|
Return (working_index, document) or `None` |
|
""" |
|
if direction == SearchDirection.FORWARD: |
|
|
|
new_index = document.find( |
|
text, |
|
include_current_position=include_current_position, |
|
ignore_case=ignore_case, |
|
) |
|
|
|
if new_index is not None: |
|
return ( |
|
working_index, |
|
Document(document.text, document.cursor_position + new_index), |
|
) |
|
else: |
|
|
|
|
|
|
|
for i in range(working_index + 1, len(self._working_lines) + 1): |
|
i %= len(self._working_lines) |
|
|
|
document = Document(self._working_lines[i], 0) |
|
new_index = document.find( |
|
text, include_current_position=True, ignore_case=ignore_case |
|
) |
|
if new_index is not None: |
|
return (i, Document(document.text, new_index)) |
|
else: |
|
|
|
new_index = document.find_backwards(text, ignore_case=ignore_case) |
|
|
|
if new_index is not None: |
|
return ( |
|
working_index, |
|
Document(document.text, document.cursor_position + new_index), |
|
) |
|
else: |
|
|
|
for i in range(working_index - 1, -2, -1): |
|
i %= len(self._working_lines) |
|
|
|
document = Document( |
|
self._working_lines[i], len(self._working_lines[i]) |
|
) |
|
new_index = document.find_backwards( |
|
text, ignore_case=ignore_case |
|
) |
|
if new_index is not None: |
|
return ( |
|
i, |
|
Document(document.text, len(document.text) + new_index), |
|
) |
|
return None |
|
|
|
|
|
working_index = self.working_index |
|
document = self.document |
|
for _ in range(count): |
|
result = search_once(working_index, document) |
|
if result is None: |
|
return None |
|
else: |
|
working_index, document = result |
|
|
|
return (working_index, document.cursor_position) |
|
|
|
def document_for_search(self, search_state: SearchState) -> Document: |
|
""" |
|
Return a :class:`~prompt_toolkit.document.Document` instance that has |
|
the text/cursor position for this search, if we would apply it. This |
|
will be used in the |
|
:class:`~prompt_toolkit.layout.BufferControl` to display feedback while |
|
searching. |
|
""" |
|
search_result = self._search(search_state, include_current_position=True) |
|
|
|
if search_result is None: |
|
return self.document |
|
else: |
|
working_index, cursor_position = search_result |
|
|
|
|
|
if working_index == self.working_index: |
|
selection = self.selection_state |
|
else: |
|
selection = None |
|
|
|
return Document( |
|
self._working_lines[working_index], cursor_position, selection=selection |
|
) |
|
|
|
def get_search_position( |
|
self, |
|
search_state: SearchState, |
|
include_current_position: bool = True, |
|
count: int = 1, |
|
) -> int: |
|
""" |
|
Get the cursor position for this search. |
|
(This operation won't change the `working_index`. It's won't go through |
|
the history. Vi text objects can't span multiple items.) |
|
""" |
|
search_result = self._search( |
|
search_state, include_current_position=include_current_position, count=count |
|
) |
|
|
|
if search_result is None: |
|
return self.cursor_position |
|
else: |
|
working_index, cursor_position = search_result |
|
return cursor_position |
|
|
|
def apply_search( |
|
self, |
|
search_state: SearchState, |
|
include_current_position: bool = True, |
|
count: int = 1, |
|
) -> None: |
|
""" |
|
Apply search. If something is found, set `working_index` and |
|
`cursor_position`. |
|
""" |
|
search_result = self._search( |
|
search_state, include_current_position=include_current_position, count=count |
|
) |
|
|
|
if search_result is not None: |
|
working_index, cursor_position = search_result |
|
self.working_index = working_index |
|
self.cursor_position = cursor_position |
|
|
|
def exit_selection(self) -> None: |
|
self.selection_state = None |
|
|
|
def _editor_simple_tempfile(self) -> tuple[str, Callable[[], None]]: |
|
""" |
|
Simple (file) tempfile implementation. |
|
Return (tempfile, cleanup_func). |
|
""" |
|
suffix = to_str(self.tempfile_suffix) |
|
descriptor, filename = tempfile.mkstemp(suffix) |
|
|
|
os.write(descriptor, self.text.encode("utf-8")) |
|
os.close(descriptor) |
|
|
|
def cleanup() -> None: |
|
os.unlink(filename) |
|
|
|
return filename, cleanup |
|
|
|
def _editor_complex_tempfile(self) -> tuple[str, Callable[[], None]]: |
|
|
|
headtail = to_str(self.tempfile) |
|
if not headtail: |
|
|
|
return self._editor_simple_tempfile() |
|
headtail = str(headtail) |
|
|
|
|
|
head, tail = os.path.split(headtail) |
|
if os.path.isabs(head): |
|
head = head[1:] |
|
|
|
dirpath = tempfile.mkdtemp() |
|
if head: |
|
dirpath = os.path.join(dirpath, head) |
|
|
|
os.makedirs(dirpath) |
|
|
|
|
|
filename = os.path.join(dirpath, tail) |
|
with open(filename, "w", encoding="utf-8") as fh: |
|
fh.write(self.text) |
|
|
|
def cleanup() -> None: |
|
shutil.rmtree(dirpath) |
|
|
|
return filename, cleanup |
|
|
|
def open_in_editor(self, validate_and_handle: bool = False) -> asyncio.Task[None]: |
|
""" |
|
Open code in editor. |
|
|
|
This returns a future, and runs in a thread executor. |
|
""" |
|
if self.read_only(): |
|
raise EditReadOnlyBuffer() |
|
|
|
|
|
if self.tempfile: |
|
filename, cleanup_func = self._editor_complex_tempfile() |
|
else: |
|
filename, cleanup_func = self._editor_simple_tempfile() |
|
|
|
async def run() -> None: |
|
try: |
|
|
|
|
|
|
|
|
|
success = await run_in_terminal( |
|
lambda: self._open_file_in_editor(filename), in_executor=True |
|
) |
|
|
|
|
|
if success: |
|
with open(filename, "rb") as f: |
|
text = f.read().decode("utf-8") |
|
|
|
|
|
|
|
if text.endswith("\n"): |
|
text = text[:-1] |
|
|
|
self.document = Document(text=text, cursor_position=len(text)) |
|
|
|
|
|
if validate_and_handle: |
|
self.validate_and_handle() |
|
|
|
finally: |
|
|
|
cleanup_func() |
|
|
|
return get_app().create_background_task(run()) |
|
|
|
def _open_file_in_editor(self, filename: str) -> bool: |
|
""" |
|
Call editor executable. |
|
|
|
Return True when we received a zero return code. |
|
""" |
|
|
|
|
|
visual = os.environ.get("VISUAL") |
|
editor = os.environ.get("EDITOR") |
|
|
|
editors = [ |
|
visual, |
|
editor, |
|
|
|
"/usr/bin/editor", |
|
"/usr/bin/nano", |
|
"/usr/bin/pico", |
|
"/usr/bin/vi", |
|
"/usr/bin/emacs", |
|
] |
|
|
|
for e in editors: |
|
if e: |
|
try: |
|
|
|
|
|
returncode = subprocess.call(shlex.split(e) + [filename]) |
|
return returncode == 0 |
|
|
|
except OSError: |
|
|
|
pass |
|
|
|
return False |
|
|
|
def start_completion( |
|
self, |
|
select_first: bool = False, |
|
select_last: bool = False, |
|
insert_common_part: bool = False, |
|
complete_event: CompleteEvent | None = None, |
|
) -> None: |
|
""" |
|
Start asynchronous autocompletion of this buffer. |
|
(This will do nothing if a previous completion was still in progress.) |
|
""" |
|
|
|
assert select_first + select_last + insert_common_part <= 1 |
|
|
|
get_app().create_background_task( |
|
self._async_completer( |
|
select_first=select_first, |
|
select_last=select_last, |
|
insert_common_part=insert_common_part, |
|
complete_event=complete_event |
|
or CompleteEvent(completion_requested=True), |
|
) |
|
) |
|
|
|
def _create_completer_coroutine(self) -> Callable[..., Coroutine[Any, Any, None]]: |
|
""" |
|
Create function for asynchronous autocompletion. |
|
|
|
(This consumes the asynchronous completer generator, which possibly |
|
runs the completion algorithm in another thread.) |
|
""" |
|
|
|
def completion_does_nothing(document: Document, completion: Completion) -> bool: |
|
""" |
|
Return `True` if applying this completion doesn't have any effect. |
|
(When it doesn't insert any new text. |
|
""" |
|
text_before_cursor = document.text_before_cursor |
|
replaced_text = text_before_cursor[ |
|
len(text_before_cursor) + completion.start_position : |
|
] |
|
return replaced_text == completion.text |
|
|
|
@_only_one_at_a_time |
|
async def async_completer( |
|
select_first: bool = False, |
|
select_last: bool = False, |
|
insert_common_part: bool = False, |
|
complete_event: CompleteEvent | None = None, |
|
) -> None: |
|
document = self.document |
|
complete_event = complete_event or CompleteEvent(text_inserted=True) |
|
|
|
|
|
if self.complete_state or not self.completer: |
|
return |
|
|
|
|
|
complete_state = CompletionState(original_document=self.document) |
|
self.complete_state = complete_state |
|
|
|
def proceed() -> bool: |
|
"""Keep retrieving completions. Input text has not yet changed |
|
while generating completions.""" |
|
return self.complete_state == complete_state |
|
|
|
refresh_needed = asyncio.Event() |
|
|
|
async def refresh_while_loading() -> None: |
|
"""Background loop to refresh the UI at most 3 times a second |
|
while the completion are loading. Calling |
|
`on_completions_changed.fire()` for every completion that we |
|
receive is too expensive when there are many completions. (We |
|
could tune `Application.max_render_postpone_time` and |
|
`Application.min_redraw_interval`, but having this here is a |
|
better approach.) |
|
""" |
|
while True: |
|
self.on_completions_changed.fire() |
|
refresh_needed.clear() |
|
await asyncio.sleep(0.3) |
|
await refresh_needed.wait() |
|
|
|
refresh_task = asyncio.ensure_future(refresh_while_loading()) |
|
try: |
|
|
|
async with aclosing( |
|
self.completer.get_completions_async(document, complete_event) |
|
) as async_generator: |
|
async for completion in async_generator: |
|
complete_state.completions.append(completion) |
|
refresh_needed.set() |
|
|
|
|
|
if not proceed(): |
|
break |
|
finally: |
|
refresh_task.cancel() |
|
|
|
|
|
self.on_completions_changed.fire() |
|
|
|
completions = complete_state.completions |
|
|
|
|
|
if len(completions) == 1 and completion_does_nothing( |
|
document, completions[0] |
|
): |
|
del completions[:] |
|
|
|
|
|
if proceed(): |
|
|
|
|
|
if ( |
|
not self.complete_state |
|
or self.complete_state.complete_index is not None |
|
): |
|
return |
|
|
|
|
|
if not completions: |
|
self.complete_state = None |
|
|
|
|
|
self.on_completions_changed.fire() |
|
return |
|
|
|
|
|
|
|
|
|
|
|
if select_first: |
|
self.go_to_completion(0) |
|
|
|
elif select_last: |
|
self.go_to_completion(len(completions) - 1) |
|
|
|
elif insert_common_part: |
|
common_part = get_common_complete_suffix(document, completions) |
|
if common_part: |
|
|
|
self.insert_text(common_part) |
|
if len(completions) > 1: |
|
|
|
|
|
|
|
completions[:] = [ |
|
c.new_completion_from_position(len(common_part)) |
|
for c in completions |
|
] |
|
|
|
self._set_completions(completions=completions) |
|
else: |
|
self.complete_state = None |
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(completions) == 1: |
|
self.go_to_completion(0) |
|
|
|
else: |
|
|
|
|
|
|
|
if self.document.text_before_cursor == document.text_before_cursor: |
|
return |
|
|
|
if self.document.text_before_cursor.startswith( |
|
document.text_before_cursor |
|
): |
|
raise _Retry |
|
|
|
return async_completer |
|
|
|
def _create_auto_suggest_coroutine(self) -> Callable[[], Coroutine[Any, Any, None]]: |
|
""" |
|
Create function for asynchronous auto suggestion. |
|
(This can be in another thread.) |
|
""" |
|
|
|
@_only_one_at_a_time |
|
async def async_suggestor() -> None: |
|
document = self.document |
|
|
|
|
|
if self.suggestion or not self.auto_suggest: |
|
return |
|
|
|
suggestion = await self.auto_suggest.get_suggestion_async(self, document) |
|
|
|
|
|
if self.document == document: |
|
|
|
self.suggestion = suggestion |
|
self.on_suggestion_set.fire() |
|
else: |
|
|
|
raise _Retry |
|
|
|
return async_suggestor |
|
|
|
def _create_auto_validate_coroutine( |
|
self, |
|
) -> Callable[[], Coroutine[Any, Any, None]]: |
|
""" |
|
Create a function for asynchronous validation while typing. |
|
(This can be in another thread.) |
|
""" |
|
|
|
@_only_one_at_a_time |
|
async def async_validator() -> None: |
|
await self._validate_async() |
|
|
|
return async_validator |
|
|
|
def validate_and_handle(self) -> None: |
|
""" |
|
Validate buffer and handle the accept action. |
|
""" |
|
valid = self.validate(set_cursor=True) |
|
|
|
|
|
if valid: |
|
if self.accept_handler: |
|
keep_text = self.accept_handler(self) |
|
else: |
|
keep_text = False |
|
|
|
self.append_to_history() |
|
|
|
if not keep_text: |
|
self.reset() |
|
|
|
|
|
_T = TypeVar("_T", bound=Callable[..., Coroutine[Any, Any, None]]) |
|
|
|
|
|
def _only_one_at_a_time(coroutine: _T) -> _T: |
|
""" |
|
Decorator that only starts the coroutine only if the previous call has |
|
finished. (Used to make sure that we have only one autocompleter, auto |
|
suggestor and validator running at a time.) |
|
|
|
When the coroutine raises `_Retry`, it is restarted. |
|
""" |
|
running = False |
|
|
|
@wraps(coroutine) |
|
async def new_coroutine(*a: Any, **kw: Any) -> Any: |
|
nonlocal running |
|
|
|
|
|
if running: |
|
return |
|
|
|
running = True |
|
|
|
try: |
|
while True: |
|
try: |
|
await coroutine(*a, **kw) |
|
except _Retry: |
|
continue |
|
else: |
|
return None |
|
finally: |
|
running = False |
|
|
|
return cast(_T, new_coroutine) |
|
|
|
|
|
class _Retry(Exception): |
|
"Retry in `_only_one_at_a_time`." |
|
|
|
|
|
def indent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: |
|
""" |
|
Indent text of a :class:`.Buffer` object. |
|
""" |
|
current_row = buffer.document.cursor_position_row |
|
current_col = buffer.document.cursor_position_col |
|
line_range = range(from_row, to_row) |
|
|
|
|
|
indent_content = " " * count |
|
new_text = buffer.transform_lines(line_range, lambda l: indent_content + l) |
|
buffer.document = Document( |
|
new_text, Document(new_text).translate_row_col_to_index(current_row, 0) |
|
) |
|
|
|
|
|
buffer.cursor_position += current_col + len(indent_content) |
|
|
|
|
|
def unindent(buffer: Buffer, from_row: int, to_row: int, count: int = 1) -> None: |
|
""" |
|
Unindent text of a :class:`.Buffer` object. |
|
""" |
|
current_row = buffer.document.cursor_position_row |
|
current_col = buffer.document.cursor_position_col |
|
line_range = range(from_row, to_row) |
|
|
|
indent_content = " " * count |
|
|
|
def transform(text: str) -> str: |
|
remove = indent_content |
|
if text.startswith(remove): |
|
return text[len(remove) :] |
|
else: |
|
return text.lstrip() |
|
|
|
|
|
new_text = buffer.transform_lines(line_range, transform) |
|
buffer.document = Document( |
|
new_text, Document(new_text).translate_row_col_to_index(current_row, 0) |
|
) |
|
|
|
|
|
buffer.cursor_position += current_col - len(indent_content) |
|
|
|
|
|
def reshape_text(buffer: Buffer, from_row: int, to_row: int) -> None: |
|
""" |
|
Reformat text, taking the width into account. |
|
`to_row` is included. |
|
(Vi 'gq' operator.) |
|
""" |
|
lines = buffer.text.splitlines(True) |
|
lines_before = lines[:from_row] |
|
lines_after = lines[to_row + 1 :] |
|
lines_to_reformat = lines[from_row : to_row + 1] |
|
|
|
if lines_to_reformat: |
|
|
|
match = re.search(r"^\s*", lines_to_reformat[0]) |
|
length = match.end() if match else 0 |
|
|
|
indent = lines_to_reformat[0][:length].replace("\n", "") |
|
|
|
|
|
words = "".join(lines_to_reformat).split() |
|
|
|
|
|
width = (buffer.text_width or 80) - len(indent) |
|
reshaped_text = [indent] |
|
current_width = 0 |
|
for w in words: |
|
if current_width: |
|
if len(w) + current_width + 1 > width: |
|
reshaped_text.append("\n") |
|
reshaped_text.append(indent) |
|
current_width = 0 |
|
else: |
|
reshaped_text.append(" ") |
|
current_width += 1 |
|
|
|
reshaped_text.append(w) |
|
current_width += len(w) |
|
|
|
if reshaped_text[-1] != "\n": |
|
reshaped_text.append("\n") |
|
|
|
|
|
buffer.document = Document( |
|
text="".join(lines_before + reshaped_text + lines_after), |
|
cursor_position=len("".join(lines_before + reshaped_text)), |
|
) |
|
|