|
|
import functools |
|
|
import itertools |
|
|
import sys |
|
|
from signal import SIGINT, default_int_handler, signal |
|
|
from typing import Any, Callable, Iterator, Optional, Tuple |
|
|
|
|
|
from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar |
|
|
from pip._vendor.progress.spinner import Spinner |
|
|
from pip._vendor.rich.progress import ( |
|
|
BarColumn, |
|
|
DownloadColumn, |
|
|
FileSizeColumn, |
|
|
Progress, |
|
|
ProgressColumn, |
|
|
SpinnerColumn, |
|
|
TextColumn, |
|
|
TimeElapsedColumn, |
|
|
TimeRemainingColumn, |
|
|
TransferSpeedColumn, |
|
|
) |
|
|
|
|
|
from pip._internal.utils.compat import WINDOWS |
|
|
from pip._internal.utils.logging import get_indentation |
|
|
from pip._internal.utils.misc import format_size |
|
|
|
|
|
try: |
|
|
from pip._vendor import colorama |
|
|
|
|
|
|
|
|
except Exception: |
|
|
colorama = None |
|
|
|
|
|
DownloadProgressRenderer = Callable[[Iterator[bytes]], Iterator[bytes]] |
|
|
|
|
|
|
|
|
def _select_progress_class(preferred: Bar, fallback: Bar) -> Bar: |
|
|
encoding = getattr(preferred.file, "encoding", None) |
|
|
|
|
|
|
|
|
|
|
|
if not encoding: |
|
|
return fallback |
|
|
|
|
|
|
|
|
|
|
|
characters = [ |
|
|
getattr(preferred, "empty_fill", ""), |
|
|
getattr(preferred, "fill", ""), |
|
|
] |
|
|
characters += list(getattr(preferred, "phases", [])) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
"".join(characters).encode(encoding) |
|
|
except UnicodeEncodeError: |
|
|
return fallback |
|
|
else: |
|
|
return preferred |
|
|
|
|
|
|
|
|
_BaseBar: Any = _select_progress_class(IncrementalBar, Bar) |
|
|
|
|
|
|
|
|
class InterruptibleMixin: |
|
|
""" |
|
|
Helper to ensure that self.finish() gets called on keyboard interrupt. |
|
|
|
|
|
This allows downloads to be interrupted without leaving temporary state |
|
|
(like hidden cursors) behind. |
|
|
|
|
|
This class is similar to the progress library's existing SigIntMixin |
|
|
helper, but as of version 1.2, that helper has the following problems: |
|
|
|
|
|
1. It calls sys.exit(). |
|
|
2. It discards the existing SIGINT handler completely. |
|
|
3. It leaves its own handler in place even after an uninterrupted finish, |
|
|
which will have unexpected delayed effects if the user triggers an |
|
|
unrelated keyboard interrupt some time after a progress-displaying |
|
|
download has already completed, for example. |
|
|
""" |
|
|
|
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None: |
|
|
""" |
|
|
Save the original SIGINT handler for later. |
|
|
""" |
|
|
|
|
|
super().__init__(*args, **kwargs) |
|
|
|
|
|
self.original_handler = signal(SIGINT, self.handle_sigint) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.original_handler is None: |
|
|
self.original_handler = default_int_handler |
|
|
|
|
|
def finish(self) -> None: |
|
|
""" |
|
|
Restore the original SIGINT handler after finishing. |
|
|
|
|
|
This should happen regardless of whether the progress display finishes |
|
|
normally, or gets interrupted. |
|
|
""" |
|
|
super().finish() |
|
|
signal(SIGINT, self.original_handler) |
|
|
|
|
|
def handle_sigint(self, signum, frame): |
|
|
""" |
|
|
Call self.finish() before delegating to the original SIGINT handler. |
|
|
|
|
|
This handler should only be in place while the progress display is |
|
|
active. |
|
|
""" |
|
|
self.finish() |
|
|
self.original_handler(signum, frame) |
|
|
|
|
|
|
|
|
class SilentBar(Bar): |
|
|
def update(self) -> None: |
|
|
pass |
|
|
|
|
|
|
|
|
class BlueEmojiBar(IncrementalBar): |
|
|
|
|
|
suffix = "%(percent)d%%" |
|
|
bar_prefix = " " |
|
|
bar_suffix = " " |
|
|
phases = ("\U0001F539", "\U0001F537", "\U0001F535") |
|
|
|
|
|
|
|
|
class DownloadProgressMixin: |
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None: |
|
|
|
|
|
super().__init__(*args, **kwargs) |
|
|
self.message: str = (" " * (get_indentation() + 2)) + self.message |
|
|
|
|
|
@property |
|
|
def downloaded(self) -> str: |
|
|
return format_size(self.index) |
|
|
|
|
|
@property |
|
|
def download_speed(self) -> str: |
|
|
|
|
|
if self.avg == 0.0: |
|
|
return "..." |
|
|
return format_size(1 / self.avg) + "/s" |
|
|
|
|
|
@property |
|
|
def pretty_eta(self) -> str: |
|
|
if self.eta: |
|
|
return f"eta {self.eta_td}" |
|
|
return "" |
|
|
|
|
|
def iter(self, it): |
|
|
for x in it: |
|
|
yield x |
|
|
|
|
|
|
|
|
self.next(len(x)) |
|
|
self.finish() |
|
|
|
|
|
|
|
|
class WindowsMixin: |
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if WINDOWS and self.hide_cursor: |
|
|
self.hide_cursor = False |
|
|
|
|
|
|
|
|
super().__init__(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
if WINDOWS and colorama: |
|
|
self.file = colorama.AnsiToWin32(self.file) |
|
|
|
|
|
|
|
|
|
|
|
self.file.isatty = lambda: self.file.wrapped.isatty() |
|
|
|
|
|
|
|
|
|
|
|
self.file.flush = lambda: self.file.wrapped.flush() |
|
|
|
|
|
|
|
|
class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin, DownloadProgressMixin): |
|
|
|
|
|
file = sys.stdout |
|
|
message = "%(percent)d%%" |
|
|
suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s" |
|
|
|
|
|
|
|
|
class DefaultDownloadProgressBar(BaseDownloadProgressBar, _BaseBar): |
|
|
pass |
|
|
|
|
|
|
|
|
class DownloadSilentBar(BaseDownloadProgressBar, SilentBar): |
|
|
pass |
|
|
|
|
|
|
|
|
class DownloadBar(BaseDownloadProgressBar, Bar): |
|
|
pass |
|
|
|
|
|
|
|
|
class DownloadFillingCirclesBar(BaseDownloadProgressBar, FillingCirclesBar): |
|
|
pass |
|
|
|
|
|
|
|
|
class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, BlueEmojiBar): |
|
|
pass |
|
|
|
|
|
|
|
|
class DownloadProgressSpinner( |
|
|
WindowsMixin, InterruptibleMixin, DownloadProgressMixin, Spinner |
|
|
): |
|
|
|
|
|
file = sys.stdout |
|
|
suffix = "%(downloaded)s %(download_speed)s" |
|
|
|
|
|
def next_phase(self) -> str: |
|
|
if not hasattr(self, "_phaser"): |
|
|
self._phaser = itertools.cycle(self.phases) |
|
|
return next(self._phaser) |
|
|
|
|
|
def update(self) -> None: |
|
|
message = self.message % self |
|
|
phase = self.next_phase() |
|
|
suffix = self.suffix % self |
|
|
line = "".join( |
|
|
[ |
|
|
message, |
|
|
" " if message else "", |
|
|
phase, |
|
|
" " if suffix else "", |
|
|
suffix, |
|
|
] |
|
|
) |
|
|
|
|
|
self.writeln(line) |
|
|
|
|
|
|
|
|
BAR_TYPES = { |
|
|
"off": (DownloadSilentBar, DownloadSilentBar), |
|
|
"on": (DefaultDownloadProgressBar, DownloadProgressSpinner), |
|
|
"ascii": (DownloadBar, DownloadProgressSpinner), |
|
|
"pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner), |
|
|
"emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner), |
|
|
} |
|
|
|
|
|
|
|
|
def _legacy_progress_bar( |
|
|
progress_bar: str, max: Optional[int] |
|
|
) -> DownloadProgressRenderer: |
|
|
if max is None or max == 0: |
|
|
return BAR_TYPES[progress_bar][1]().iter |
|
|
else: |
|
|
return BAR_TYPES[progress_bar][0](max=max).iter |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _rich_progress_bar( |
|
|
iterable: Iterator[bytes], |
|
|
*, |
|
|
bar_type: str, |
|
|
size: int, |
|
|
) -> Iterator[bytes]: |
|
|
assert bar_type == "on", "This should only be used in the default mode." |
|
|
|
|
|
if not size: |
|
|
total = float("inf") |
|
|
columns: Tuple[ProgressColumn, ...] = ( |
|
|
TextColumn("[progress.description]{task.description}"), |
|
|
SpinnerColumn("line", speed=1.5), |
|
|
FileSizeColumn(), |
|
|
TransferSpeedColumn(), |
|
|
TimeElapsedColumn(), |
|
|
) |
|
|
else: |
|
|
total = size |
|
|
columns = ( |
|
|
TextColumn("[progress.description]{task.description}"), |
|
|
BarColumn(), |
|
|
DownloadColumn(), |
|
|
TransferSpeedColumn(), |
|
|
TextColumn("eta"), |
|
|
TimeRemainingColumn(), |
|
|
) |
|
|
|
|
|
progress = Progress(*columns, refresh_per_second=30) |
|
|
task_id = progress.add_task(" " * (get_indentation() + 2), total=total) |
|
|
with progress: |
|
|
for chunk in iterable: |
|
|
yield chunk |
|
|
progress.update(task_id, advance=len(chunk)) |
|
|
|
|
|
|
|
|
def get_download_progress_renderer( |
|
|
*, bar_type: str, size: Optional[int] = None |
|
|
) -> DownloadProgressRenderer: |
|
|
"""Get an object that can be used to render the download progress. |
|
|
|
|
|
Returns a callable, that takes an iterable to "wrap". |
|
|
""" |
|
|
if bar_type == "on": |
|
|
return functools.partial(_rich_progress_bar, bar_type=bar_type, size=size) |
|
|
elif bar_type == "off": |
|
|
return iter |
|
|
else: |
|
|
return _legacy_progress_bar(bar_type, size) |
|
|
|