| | |
| | """ |
| | This module contains implementations for the termui module. To keep the |
| | import time of Click down, some infrequently used functionality is |
| | placed in this module and only imported as needed. |
| | """ |
| | import contextlib |
| | import math |
| | import os |
| | import sys |
| | import time |
| |
|
| | from ._compat import _default_text_stdout |
| | from ._compat import CYGWIN |
| | from ._compat import get_best_encoding |
| | from ._compat import int_types |
| | from ._compat import isatty |
| | from ._compat import open_stream |
| | from ._compat import range_type |
| | from ._compat import strip_ansi |
| | from ._compat import term_len |
| | from ._compat import WIN |
| | from .exceptions import ClickException |
| | from .utils import echo |
| |
|
| | if os.name == "nt": |
| | BEFORE_BAR = "\r" |
| | AFTER_BAR = "\n" |
| | else: |
| | BEFORE_BAR = "\r\033[?25l" |
| | AFTER_BAR = "\033[?25h\n" |
| |
|
| |
|
| | def _length_hint(obj): |
| | """Returns the length hint of an object.""" |
| | try: |
| | return len(obj) |
| | except (AttributeError, TypeError): |
| | try: |
| | get_hint = type(obj).__length_hint__ |
| | except AttributeError: |
| | return None |
| | try: |
| | hint = get_hint(obj) |
| | except TypeError: |
| | return None |
| | if hint is NotImplemented or not isinstance(hint, int_types) or hint < 0: |
| | return None |
| | return hint |
| |
|
| |
|
| | class ProgressBar(object): |
| | def __init__( |
| | self, |
| | iterable, |
| | length=None, |
| | fill_char="#", |
| | empty_char=" ", |
| | bar_template="%(bar)s", |
| | info_sep=" ", |
| | show_eta=True, |
| | show_percent=None, |
| | show_pos=False, |
| | item_show_func=None, |
| | label=None, |
| | file=None, |
| | color=None, |
| | width=30, |
| | ): |
| | self.fill_char = fill_char |
| | self.empty_char = empty_char |
| | self.bar_template = bar_template |
| | self.info_sep = info_sep |
| | self.show_eta = show_eta |
| | self.show_percent = show_percent |
| | self.show_pos = show_pos |
| | self.item_show_func = item_show_func |
| | self.label = label or "" |
| | if file is None: |
| | file = _default_text_stdout() |
| | self.file = file |
| | self.color = color |
| | self.width = width |
| | self.autowidth = width == 0 |
| |
|
| | if length is None: |
| | length = _length_hint(iterable) |
| | if iterable is None: |
| | if length is None: |
| | raise TypeError("iterable or length is required") |
| | iterable = range_type(length) |
| | self.iter = iter(iterable) |
| | self.length = length |
| | self.length_known = length is not None |
| | self.pos = 0 |
| | self.avg = [] |
| | self.start = self.last_eta = time.time() |
| | self.eta_known = False |
| | self.finished = False |
| | self.max_width = None |
| | self.entered = False |
| | self.current_item = None |
| | self.is_hidden = not isatty(self.file) |
| | self._last_line = None |
| | self.short_limit = 0.5 |
| |
|
| | def __enter__(self): |
| | self.entered = True |
| | self.render_progress() |
| | return self |
| |
|
| | def __exit__(self, exc_type, exc_value, tb): |
| | self.render_finish() |
| |
|
| | def __iter__(self): |
| | if not self.entered: |
| | raise RuntimeError("You need to use progress bars in a with block.") |
| | self.render_progress() |
| | return self.generator() |
| |
|
| | def __next__(self): |
| | |
| | |
| | |
| | |
| | |
| | return next(iter(self)) |
| |
|
| | |
| | next = __next__ |
| |
|
| | def is_fast(self): |
| | return time.time() - self.start <= self.short_limit |
| |
|
| | def render_finish(self): |
| | if self.is_hidden or self.is_fast(): |
| | return |
| | self.file.write(AFTER_BAR) |
| | self.file.flush() |
| |
|
| | @property |
| | def pct(self): |
| | if self.finished: |
| | return 1.0 |
| | return min(self.pos / (float(self.length) or 1), 1.0) |
| |
|
| | @property |
| | def time_per_iteration(self): |
| | if not self.avg: |
| | return 0.0 |
| | return sum(self.avg) / float(len(self.avg)) |
| |
|
| | @property |
| | def eta(self): |
| | if self.length_known and not self.finished: |
| | return self.time_per_iteration * (self.length - self.pos) |
| | return 0.0 |
| |
|
| | def format_eta(self): |
| | if self.eta_known: |
| | t = int(self.eta) |
| | seconds = t % 60 |
| | t //= 60 |
| | minutes = t % 60 |
| | t //= 60 |
| | hours = t % 24 |
| | t //= 24 |
| | if t > 0: |
| | return "{}d {:02}:{:02}:{:02}".format(t, hours, minutes, seconds) |
| | else: |
| | return "{:02}:{:02}:{:02}".format(hours, minutes, seconds) |
| | return "" |
| |
|
| | def format_pos(self): |
| | pos = str(self.pos) |
| | if self.length_known: |
| | pos += "/{}".format(self.length) |
| | return pos |
| |
|
| | def format_pct(self): |
| | return "{: 4}%".format(int(self.pct * 100))[1:] |
| |
|
| | def format_bar(self): |
| | if self.length_known: |
| | bar_length = int(self.pct * self.width) |
| | bar = self.fill_char * bar_length |
| | bar += self.empty_char * (self.width - bar_length) |
| | elif self.finished: |
| | bar = self.fill_char * self.width |
| | else: |
| | bar = list(self.empty_char * (self.width or 1)) |
| | if self.time_per_iteration != 0: |
| | bar[ |
| | int( |
| | (math.cos(self.pos * self.time_per_iteration) / 2.0 + 0.5) |
| | * self.width |
| | ) |
| | ] = self.fill_char |
| | bar = "".join(bar) |
| | return bar |
| |
|
| | def format_progress_line(self): |
| | show_percent = self.show_percent |
| |
|
| | info_bits = [] |
| | if self.length_known and show_percent is None: |
| | show_percent = not self.show_pos |
| |
|
| | if self.show_pos: |
| | info_bits.append(self.format_pos()) |
| | if show_percent: |
| | info_bits.append(self.format_pct()) |
| | if self.show_eta and self.eta_known and not self.finished: |
| | info_bits.append(self.format_eta()) |
| | if self.item_show_func is not None: |
| | item_info = self.item_show_func(self.current_item) |
| | if item_info is not None: |
| | info_bits.append(item_info) |
| |
|
| | return ( |
| | self.bar_template |
| | % { |
| | "label": self.label, |
| | "bar": self.format_bar(), |
| | "info": self.info_sep.join(info_bits), |
| | } |
| | ).rstrip() |
| |
|
| | def render_progress(self): |
| | from .termui import get_terminal_size |
| |
|
| | if self.is_hidden: |
| | return |
| |
|
| | buf = [] |
| | |
| | if self.autowidth: |
| | old_width = self.width |
| | self.width = 0 |
| | clutter_length = term_len(self.format_progress_line()) |
| | new_width = max(0, get_terminal_size()[0] - clutter_length) |
| | if new_width < old_width: |
| | buf.append(BEFORE_BAR) |
| | buf.append(" " * self.max_width) |
| | self.max_width = new_width |
| | self.width = new_width |
| |
|
| | clear_width = self.width |
| | if self.max_width is not None: |
| | clear_width = self.max_width |
| |
|
| | buf.append(BEFORE_BAR) |
| | line = self.format_progress_line() |
| | line_len = term_len(line) |
| | if self.max_width is None or self.max_width < line_len: |
| | self.max_width = line_len |
| |
|
| | buf.append(line) |
| | buf.append(" " * (clear_width - line_len)) |
| | line = "".join(buf) |
| | |
| |
|
| | if line != self._last_line and not self.is_fast(): |
| | self._last_line = line |
| | echo(line, file=self.file, color=self.color, nl=False) |
| | self.file.flush() |
| |
|
| | def make_step(self, n_steps): |
| | self.pos += n_steps |
| | if self.length_known and self.pos >= self.length: |
| | self.finished = True |
| |
|
| | if (time.time() - self.last_eta) < 1.0: |
| | return |
| |
|
| | self.last_eta = time.time() |
| |
|
| | |
| | |
| | |
| | if self.pos: |
| | step = (time.time() - self.start) / self.pos |
| | else: |
| | step = time.time() - self.start |
| |
|
| | self.avg = self.avg[-6:] + [step] |
| |
|
| | self.eta_known = self.length_known |
| |
|
| | def update(self, n_steps): |
| | self.make_step(n_steps) |
| | self.render_progress() |
| |
|
| | def finish(self): |
| | self.eta_known = 0 |
| | self.current_item = None |
| | self.finished = True |
| |
|
| | def generator(self): |
| | """Return a generator which yields the items added to the bar |
| | during construction, and updates the progress bar *after* the |
| | yielded block returns. |
| | """ |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if not self.entered: |
| | raise RuntimeError("You need to use progress bars in a with block.") |
| |
|
| | if self.is_hidden: |
| | for rv in self.iter: |
| | yield rv |
| | else: |
| | for rv in self.iter: |
| | self.current_item = rv |
| | yield rv |
| | self.update(1) |
| | self.finish() |
| | self.render_progress() |
| |
|
| |
|
| | def pager(generator, color=None): |
| | """Decide what method to use for paging through text.""" |
| | stdout = _default_text_stdout() |
| | if not isatty(sys.stdin) or not isatty(stdout): |
| | return _nullpager(stdout, generator, color) |
| | pager_cmd = (os.environ.get("PAGER", None) or "").strip() |
| | if pager_cmd: |
| | if WIN: |
| | return _tempfilepager(generator, pager_cmd, color) |
| | return _pipepager(generator, pager_cmd, color) |
| | if os.environ.get("TERM") in ("dumb", "emacs"): |
| | return _nullpager(stdout, generator, color) |
| | if WIN or sys.platform.startswith("os2"): |
| | return _tempfilepager(generator, "more <", color) |
| | if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0: |
| | return _pipepager(generator, "less", color) |
| |
|
| | import tempfile |
| |
|
| | fd, filename = tempfile.mkstemp() |
| | os.close(fd) |
| | try: |
| | if hasattr(os, "system") and os.system('more "{}"'.format(filename)) == 0: |
| | return _pipepager(generator, "more", color) |
| | return _nullpager(stdout, generator, color) |
| | finally: |
| | os.unlink(filename) |
| |
|
| |
|
| | def _pipepager(generator, cmd, color): |
| | """Page through text by feeding it to another program. Invoking a |
| | pager through this might support colors. |
| | """ |
| | import subprocess |
| |
|
| | env = dict(os.environ) |
| |
|
| | |
| | |
| | cmd_detail = cmd.rsplit("/", 1)[-1].split() |
| | if color is None and cmd_detail[0] == "less": |
| | less_flags = "{}{}".format(os.environ.get("LESS", ""), " ".join(cmd_detail[1:])) |
| | if not less_flags: |
| | env["LESS"] = "-R" |
| | color = True |
| | elif "r" in less_flags or "R" in less_flags: |
| | color = True |
| |
|
| | c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env) |
| | encoding = get_best_encoding(c.stdin) |
| | try: |
| | for text in generator: |
| | if not color: |
| | text = strip_ansi(text) |
| |
|
| | c.stdin.write(text.encode(encoding, "replace")) |
| | except (IOError, KeyboardInterrupt): |
| | pass |
| | else: |
| | c.stdin.close() |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | while True: |
| | try: |
| | c.wait() |
| | except KeyboardInterrupt: |
| | pass |
| | else: |
| | break |
| |
|
| |
|
| | def _tempfilepager(generator, cmd, color): |
| | """Page through text by invoking a program on a temporary file.""" |
| | import tempfile |
| |
|
| | filename = tempfile.mktemp() |
| | |
| | text = "".join(generator) |
| | if not color: |
| | text = strip_ansi(text) |
| | encoding = get_best_encoding(sys.stdout) |
| | with open_stream(filename, "wb")[0] as f: |
| | f.write(text.encode(encoding)) |
| | try: |
| | os.system('{} "{}"'.format(cmd, filename)) |
| | finally: |
| | os.unlink(filename) |
| |
|
| |
|
| | def _nullpager(stream, generator, color): |
| | """Simply print unformatted text. This is the ultimate fallback.""" |
| | for text in generator: |
| | if not color: |
| | text = strip_ansi(text) |
| | stream.write(text) |
| |
|
| |
|
| | class Editor(object): |
| | def __init__(self, editor=None, env=None, require_save=True, extension=".txt"): |
| | self.editor = editor |
| | self.env = env |
| | self.require_save = require_save |
| | self.extension = extension |
| |
|
| | def get_editor(self): |
| | if self.editor is not None: |
| | return self.editor |
| | for key in "VISUAL", "EDITOR": |
| | rv = os.environ.get(key) |
| | if rv: |
| | return rv |
| | if WIN: |
| | return "notepad" |
| | for editor in "sensible-editor", "vim", "nano": |
| | if os.system("which {} >/dev/null 2>&1".format(editor)) == 0: |
| | return editor |
| | return "vi" |
| |
|
| | def edit_file(self, filename): |
| | import subprocess |
| |
|
| | editor = self.get_editor() |
| | if self.env: |
| | environ = os.environ.copy() |
| | environ.update(self.env) |
| | else: |
| | environ = None |
| | try: |
| | c = subprocess.Popen( |
| | '{} "{}"'.format(editor, filename), env=environ, shell=True, |
| | ) |
| | exit_code = c.wait() |
| | if exit_code != 0: |
| | raise ClickException("{}: Editing failed!".format(editor)) |
| | except OSError as e: |
| | raise ClickException("{}: Editing failed: {}".format(editor, e)) |
| |
|
| | def edit(self, text): |
| | import tempfile |
| |
|
| | text = text or "" |
| | if text and not text.endswith("\n"): |
| | text += "\n" |
| |
|
| | fd, name = tempfile.mkstemp(prefix="editor-", suffix=self.extension) |
| | try: |
| | if WIN: |
| | encoding = "utf-8-sig" |
| | text = text.replace("\n", "\r\n") |
| | else: |
| | encoding = "utf-8" |
| | text = text.encode(encoding) |
| |
|
| | f = os.fdopen(fd, "wb") |
| | f.write(text) |
| | f.close() |
| | timestamp = os.path.getmtime(name) |
| |
|
| | self.edit_file(name) |
| |
|
| | if self.require_save and os.path.getmtime(name) == timestamp: |
| | return None |
| |
|
| | f = open(name, "rb") |
| | try: |
| | rv = f.read() |
| | finally: |
| | f.close() |
| | return rv.decode("utf-8-sig").replace("\r\n", "\n") |
| | finally: |
| | os.unlink(name) |
| |
|
| |
|
| | def open_url(url, wait=False, locate=False): |
| | import subprocess |
| |
|
| | def _unquote_file(url): |
| | try: |
| | import urllib |
| | except ImportError: |
| | import urllib |
| | if url.startswith("file://"): |
| | url = urllib.unquote(url[7:]) |
| | return url |
| |
|
| | if sys.platform == "darwin": |
| | args = ["open"] |
| | if wait: |
| | args.append("-W") |
| | if locate: |
| | args.append("-R") |
| | args.append(_unquote_file(url)) |
| | null = open("/dev/null", "w") |
| | try: |
| | return subprocess.Popen(args, stderr=null).wait() |
| | finally: |
| | null.close() |
| | elif WIN: |
| | if locate: |
| | url = _unquote_file(url) |
| | args = 'explorer /select,"{}"'.format(_unquote_file(url.replace('"', ""))) |
| | else: |
| | args = 'start {} "" "{}"'.format( |
| | "/WAIT" if wait else "", url.replace('"', "") |
| | ) |
| | return os.system(args) |
| | elif CYGWIN: |
| | if locate: |
| | url = _unquote_file(url) |
| | args = 'cygstart "{}"'.format(os.path.dirname(url).replace('"', "")) |
| | else: |
| | args = 'cygstart {} "{}"'.format("-w" if wait else "", url.replace('"', "")) |
| | return os.system(args) |
| |
|
| | try: |
| | if locate: |
| | url = os.path.dirname(_unquote_file(url)) or "." |
| | else: |
| | url = _unquote_file(url) |
| | c = subprocess.Popen(["xdg-open", url]) |
| | if wait: |
| | return c.wait() |
| | return 0 |
| | except OSError: |
| | if url.startswith(("http://", "https://")) and not locate and not wait: |
| | import webbrowser |
| |
|
| | webbrowser.open(url) |
| | return 0 |
| | return 1 |
| |
|
| |
|
| | def _translate_ch_to_exc(ch): |
| | if ch == u"\x03": |
| | raise KeyboardInterrupt() |
| | if ch == u"\x04" and not WIN: |
| | raise EOFError() |
| | if ch == u"\x1a" and WIN: |
| | raise EOFError() |
| |
|
| |
|
| | if WIN: |
| | import msvcrt |
| |
|
| | @contextlib.contextmanager |
| | def raw_terminal(): |
| | yield |
| |
|
| | def getchar(echo): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if echo: |
| | func = msvcrt.getwche |
| | else: |
| | func = msvcrt.getwch |
| |
|
| | rv = func() |
| | if rv in (u"\x00", u"\xe0"): |
| | |
| | |
| | rv += func() |
| | _translate_ch_to_exc(rv) |
| | return rv |
| |
|
| |
|
| | else: |
| | import tty |
| | import termios |
| |
|
| | @contextlib.contextmanager |
| | def raw_terminal(): |
| | if not isatty(sys.stdin): |
| | f = open("/dev/tty") |
| | fd = f.fileno() |
| | else: |
| | fd = sys.stdin.fileno() |
| | f = None |
| | try: |
| | old_settings = termios.tcgetattr(fd) |
| | try: |
| | tty.setraw(fd) |
| | yield fd |
| | finally: |
| | termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) |
| | sys.stdout.flush() |
| | if f is not None: |
| | f.close() |
| | except termios.error: |
| | pass |
| |
|
| | def getchar(echo): |
| | with raw_terminal() as fd: |
| | ch = os.read(fd, 32) |
| | ch = ch.decode(get_best_encoding(sys.stdin), "replace") |
| | if echo and isatty(sys.stdout): |
| | sys.stdout.write(ch) |
| | _translate_ch_to_exc(ch) |
| | return ch |
| |
|