|
from __future__ import annotations |
|
|
|
import os |
|
import signal |
|
import sys |
|
import threading |
|
from collections import deque |
|
from typing import ( |
|
Callable, |
|
ContextManager, |
|
Dict, |
|
Generator, |
|
Generic, |
|
TypeVar, |
|
Union, |
|
) |
|
|
|
from wcwidth import wcwidth |
|
|
|
__all__ = [ |
|
"Event", |
|
"DummyContext", |
|
"get_cwidth", |
|
"suspend_to_background_supported", |
|
"is_conemu_ansi", |
|
"is_windows", |
|
"in_main_thread", |
|
"get_bell_environment_variable", |
|
"get_term_environment_variable", |
|
"take_using_weights", |
|
"to_str", |
|
"to_int", |
|
"AnyFloat", |
|
"to_float", |
|
"is_dumb_terminal", |
|
] |
|
|
|
|
|
|
|
SPHINX_AUTODOC_RUNNING = "sphinx.ext.autodoc" in sys.modules |
|
|
|
_Sender = TypeVar("_Sender", covariant=True) |
|
|
|
|
|
class Event(Generic[_Sender]): |
|
""" |
|
Simple event to which event handlers can be attached. For instance:: |
|
|
|
class Cls: |
|
def __init__(self): |
|
# Define event. The first parameter is the sender. |
|
self.event = Event(self) |
|
|
|
obj = Cls() |
|
|
|
def handler(sender): |
|
pass |
|
|
|
# Add event handler by using the += operator. |
|
obj.event += handler |
|
|
|
# Fire event. |
|
obj.event() |
|
""" |
|
|
|
def __init__( |
|
self, sender: _Sender, handler: Callable[[_Sender], None] | None = None |
|
) -> None: |
|
self.sender = sender |
|
self._handlers: list[Callable[[_Sender], None]] = [] |
|
|
|
if handler is not None: |
|
self += handler |
|
|
|
def __call__(self) -> None: |
|
"Fire event." |
|
for handler in self._handlers: |
|
handler(self.sender) |
|
|
|
def fire(self) -> None: |
|
"Alias for just calling the event." |
|
self() |
|
|
|
def add_handler(self, handler: Callable[[_Sender], None]) -> None: |
|
""" |
|
Add another handler to this callback. |
|
(Handler should be a callable that takes exactly one parameter: the |
|
sender object.) |
|
""" |
|
|
|
self._handlers.append(handler) |
|
|
|
def remove_handler(self, handler: Callable[[_Sender], None]) -> None: |
|
""" |
|
Remove a handler from this callback. |
|
""" |
|
if handler in self._handlers: |
|
self._handlers.remove(handler) |
|
|
|
def __iadd__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: |
|
""" |
|
`event += handler` notation for adding a handler. |
|
""" |
|
self.add_handler(handler) |
|
return self |
|
|
|
def __isub__(self, handler: Callable[[_Sender], None]) -> Event[_Sender]: |
|
""" |
|
`event -= handler` notation for removing a handler. |
|
""" |
|
self.remove_handler(handler) |
|
return self |
|
|
|
|
|
class DummyContext(ContextManager[None]): |
|
""" |
|
(contextlib.nested is not available on Py3) |
|
""" |
|
|
|
def __enter__(self) -> None: |
|
pass |
|
|
|
def __exit__(self, *a: object) -> None: |
|
pass |
|
|
|
|
|
class _CharSizesCache(Dict[str, int]): |
|
""" |
|
Cache for wcwidth sizes. |
|
""" |
|
|
|
LONG_STRING_MIN_LEN = 64 |
|
MAX_LONG_STRINGS = 16 |
|
|
|
def __init__(self) -> None: |
|
super().__init__() |
|
|
|
self._long_strings: deque[str] = deque() |
|
|
|
def __missing__(self, string: str) -> int: |
|
|
|
|
|
|
|
|
|
result: int |
|
if len(string) == 1: |
|
result = max(0, wcwidth(string)) |
|
else: |
|
result = sum(self[c] for c in string) |
|
|
|
|
|
self[string] = result |
|
|
|
|
|
|
|
if len(string) > self.LONG_STRING_MIN_LEN: |
|
long_strings = self._long_strings |
|
long_strings.append(string) |
|
|
|
if len(long_strings) > self.MAX_LONG_STRINGS: |
|
key_to_remove = long_strings.popleft() |
|
if key_to_remove in self: |
|
del self[key_to_remove] |
|
|
|
return result |
|
|
|
|
|
_CHAR_SIZES_CACHE = _CharSizesCache() |
|
|
|
|
|
def get_cwidth(string: str) -> int: |
|
""" |
|
Return width of a string. Wrapper around ``wcwidth``. |
|
""" |
|
return _CHAR_SIZES_CACHE[string] |
|
|
|
|
|
def suspend_to_background_supported() -> bool: |
|
""" |
|
Returns `True` when the Python implementation supports |
|
suspend-to-background. This is typically `False' on Windows systems. |
|
""" |
|
return hasattr(signal, "SIGTSTP") |
|
|
|
|
|
def is_windows() -> bool: |
|
""" |
|
True when we are using Windows. |
|
""" |
|
return sys.platform == "win32" |
|
|
|
|
|
def is_windows_vt100_supported() -> bool: |
|
""" |
|
True when we are using Windows, but VT100 escape sequences are supported. |
|
""" |
|
if sys.platform == "win32": |
|
|
|
from prompt_toolkit.output.windows10 import is_win_vt100_enabled |
|
|
|
return is_win_vt100_enabled() |
|
|
|
return False |
|
|
|
|
|
def is_conemu_ansi() -> bool: |
|
""" |
|
True when the ConEmu Windows console is used. |
|
""" |
|
return sys.platform == "win32" and os.environ.get("ConEmuANSI", "OFF") == "ON" |
|
|
|
|
|
def in_main_thread() -> bool: |
|
""" |
|
True when the current thread is the main thread. |
|
""" |
|
return threading.current_thread().__class__.__name__ == "_MainThread" |
|
|
|
|
|
def get_bell_environment_variable() -> bool: |
|
""" |
|
True if env variable is set to true (true, TRUE, True, 1). |
|
""" |
|
value = os.environ.get("PROMPT_TOOLKIT_BELL", "true") |
|
return value.lower() in ("1", "true") |
|
|
|
|
|
def get_term_environment_variable() -> str: |
|
"Return the $TERM environment variable." |
|
return os.environ.get("TERM", "") |
|
|
|
|
|
_T = TypeVar("_T") |
|
|
|
|
|
def take_using_weights( |
|
items: list[_T], weights: list[int] |
|
) -> Generator[_T, None, None]: |
|
""" |
|
Generator that keeps yielding items from the items list, in proportion to |
|
their weight. For instance:: |
|
|
|
# Getting the first 70 items from this generator should have yielded 10 |
|
# times A, 20 times B and 40 times C, all distributed equally.. |
|
take_using_weights(['A', 'B', 'C'], [5, 10, 20]) |
|
|
|
:param items: List of items to take from. |
|
:param weights: Integers representing the weight. (Numbers have to be |
|
integers, not floats.) |
|
""" |
|
assert len(items) == len(weights) |
|
assert len(items) > 0 |
|
|
|
|
|
items2 = [] |
|
weights2 = [] |
|
for item, w in zip(items, weights): |
|
if w > 0: |
|
items2.append(item) |
|
weights2.append(w) |
|
|
|
items = items2 |
|
weights = weights2 |
|
|
|
|
|
if not items: |
|
raise ValueError("Did't got any items with a positive weight.") |
|
|
|
|
|
already_taken = [0 for i in items] |
|
item_count = len(items) |
|
max_weight = max(weights) |
|
|
|
i = 0 |
|
while True: |
|
|
|
adding = True |
|
while adding: |
|
adding = False |
|
|
|
for item_i, item, weight in zip(range(item_count), items, weights): |
|
if already_taken[item_i] < i * weight / float(max_weight): |
|
yield item |
|
already_taken[item_i] += 1 |
|
adding = True |
|
|
|
i += 1 |
|
|
|
|
|
def to_str(value: Callable[[], str] | str) -> str: |
|
"Turn callable or string into string." |
|
if callable(value): |
|
return to_str(value()) |
|
else: |
|
return str(value) |
|
|
|
|
|
def to_int(value: Callable[[], int] | int) -> int: |
|
"Turn callable or int into int." |
|
if callable(value): |
|
return to_int(value()) |
|
else: |
|
return int(value) |
|
|
|
|
|
AnyFloat = Union[Callable[[], float], float] |
|
|
|
|
|
def to_float(value: AnyFloat) -> float: |
|
"Turn callable or float into float." |
|
if callable(value): |
|
return to_float(value()) |
|
else: |
|
return float(value) |
|
|
|
|
|
def is_dumb_terminal(term: str | None = None) -> bool: |
|
""" |
|
True if this terminal type is considered "dumb". |
|
|
|
If so, we should fall back to the simplest possible form of line editing, |
|
without cursor positioning and color support. |
|
""" |
|
if term is None: |
|
return is_dumb_terminal(os.environ.get("TERM", "")) |
|
|
|
return term.lower() in ["dumb", "unknown"] |
|
|