| """ |
| Parser for VT100 input stream. |
| """ |
| from __future__ import annotations |
|
|
| import re |
| from typing import Callable, Dict, Generator |
|
|
| from ..key_binding.key_processor import KeyPress |
| from ..keys import Keys |
| from .ansi_escape_sequences import ANSI_SEQUENCES |
|
|
| __all__ = [ |
| "Vt100Parser", |
| ] |
|
|
|
|
| |
| |
| |
| _cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") |
|
|
| |
| |
| _mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") |
|
|
| |
| |
| |
| _cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") |
|
|
| _mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") |
|
|
|
|
| class _Flush: |
| """Helper object to indicate flush operation to the parser.""" |
|
|
| pass |
|
|
|
|
| class _IsPrefixOfLongerMatchCache(Dict[str, bool]): |
| """ |
| Dictionary that maps input sequences to a boolean indicating whether there is |
| any key that start with this characters. |
| """ |
|
|
| def __missing__(self, prefix: str) -> bool: |
| |
| |
| if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( |
| prefix |
| ): |
| result = True |
| else: |
| |
| result = any( |
| v |
| for k, v in ANSI_SEQUENCES.items() |
| if k.startswith(prefix) and k != prefix |
| ) |
|
|
| self[prefix] = result |
| return result |
|
|
|
|
| _IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() |
|
|
|
|
| class Vt100Parser: |
| """ |
| Parser for VT100 input stream. |
| Data can be fed through the `feed` method and the given callback will be |
| called with KeyPress objects. |
| |
| :: |
| |
| def callback(key): |
| pass |
| i = Vt100Parser(callback) |
| i.feed('data\x01...') |
| |
| :attr feed_key_callback: Function that will be called when a key is parsed. |
| """ |
|
|
| |
| |
| |
| def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: |
| self.feed_key_callback = feed_key_callback |
| self.reset() |
|
|
| def reset(self, request: bool = False) -> None: |
| self._in_bracketed_paste = False |
| self._start_parser() |
|
|
| def _start_parser(self) -> None: |
| """ |
| Start the parser coroutine. |
| """ |
| self._input_parser = self._input_parser_generator() |
| self._input_parser.send(None) |
|
|
| def _get_match(self, prefix: str) -> None | Keys | tuple[Keys, ...]: |
| """ |
| Return the key (or keys) that maps to this prefix. |
| """ |
| |
| |
| |
| if _cpr_response_re.match(prefix): |
| return Keys.CPRResponse |
|
|
| elif _mouse_event_re.match(prefix): |
| return Keys.Vt100MouseEvent |
|
|
| |
| try: |
| return ANSI_SEQUENCES[prefix] |
| except KeyError: |
| return None |
|
|
| def _input_parser_generator(self) -> Generator[None, str | _Flush, None]: |
| """ |
| Coroutine (state machine) for the input parser. |
| """ |
| prefix = "" |
| retry = False |
| flush = False |
|
|
| while True: |
| flush = False |
|
|
| if retry: |
| retry = False |
| else: |
| |
| c = yield |
|
|
| if isinstance(c, _Flush): |
| flush = True |
| else: |
| prefix += c |
|
|
| |
| if prefix: |
| is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] |
| match = self._get_match(prefix) |
|
|
| |
| if (flush or not is_prefix_of_longer_match) and match: |
| self._call_handler(match, prefix) |
| prefix = "" |
|
|
| |
| elif (flush or not is_prefix_of_longer_match) and not match: |
| found = False |
| retry = True |
|
|
| |
| |
| for i in range(len(prefix), 0, -1): |
| match = self._get_match(prefix[:i]) |
| if match: |
| self._call_handler(match, prefix[:i]) |
| prefix = prefix[i:] |
| found = True |
|
|
| if not found: |
| self._call_handler(prefix[0], prefix[0]) |
| prefix = prefix[1:] |
|
|
| def _call_handler( |
| self, key: str | Keys | tuple[Keys, ...], insert_text: str |
| ) -> None: |
| """ |
| Callback to handler. |
| """ |
| if isinstance(key, tuple): |
| |
| |
| |
| |
| for i, k in enumerate(key): |
| self._call_handler(k, insert_text if i == 0 else "") |
| else: |
| if key == Keys.BracketedPaste: |
| self._in_bracketed_paste = True |
| self._paste_buffer = "" |
| else: |
| self.feed_key_callback(KeyPress(key, insert_text)) |
|
|
| def feed(self, data: str) -> None: |
| """ |
| Feed the input stream. |
| |
| :param data: Input string (unicode). |
| """ |
| |
| |
| |
| if self._in_bracketed_paste: |
| self._paste_buffer += data |
| end_mark = "\x1b[201~" |
|
|
| if end_mark in self._paste_buffer: |
| end_index = self._paste_buffer.index(end_mark) |
|
|
| |
| paste_content = self._paste_buffer[:end_index] |
| self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) |
|
|
| |
| self._in_bracketed_paste = False |
| remaining = self._paste_buffer[end_index + len(end_mark) :] |
| self._paste_buffer = "" |
|
|
| self.feed(remaining) |
|
|
| |
| else: |
| for i, c in enumerate(data): |
| if self._in_bracketed_paste: |
| |
| |
| self.feed(data[i:]) |
| break |
| else: |
| self._input_parser.send(c) |
|
|
| def flush(self) -> None: |
| """ |
| Flush the buffer of the input stream. |
| |
| This will allow us to handle the escape key (or maybe meta) sooner. |
| The input received by the escape key is actually the same as the first |
| characters of e.g. Arrow-Up, so without knowing what follows the escape |
| sequence, we don't know whether escape has been pressed, or whether |
| it's something else. This flush function should be called after a |
| timeout, and processes everything that's still in the buffer as-is, so |
| without assuming any characters will follow. |
| """ |
| self._input_parser.send(_Flush()) |
|
|
| def feed_and_flush(self, data: str) -> None: |
| """ |
| Wrapper around ``feed`` and ``flush``. |
| """ |
| self.feed(data) |
| self.flush() |
|
|