Spaces:
Sleeping
Sleeping
import functools | |
import sys | |
from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple, TypeVar | |
from pip._vendor.rich.progress import ( | |
BarColumn, | |
DownloadColumn, | |
FileSizeColumn, | |
MofNCompleteColumn, | |
Progress, | |
ProgressColumn, | |
SpinnerColumn, | |
TextColumn, | |
TimeElapsedColumn, | |
TimeRemainingColumn, | |
TransferSpeedColumn, | |
) | |
from pip._internal.cli.spinners import RateLimiter | |
from pip._internal.req.req_install import InstallRequirement | |
from pip._internal.utils.logging import get_console, get_indentation | |
T = TypeVar("T") | |
ProgressRenderer = Callable[[Iterable[T]], Iterator[T]] | |
def _rich_download_progress_bar( | |
iterable: Iterable[bytes], | |
*, | |
bar_type: str, | |
size: Optional[int], | |
initial_progress: Optional[int] = None, | |
) -> Generator[bytes, None, None]: | |
assert bar_type == "on", "This should only be used in the default mode." | |
if not size: | |
total = float("inf") | |
columns: Tuple[ProgressColumn, ...] = ( | |
TextColumn("[progress.description]{task.description}"), | |
SpinnerColumn("line", speed=1.5), | |
FileSizeColumn(), | |
TransferSpeedColumn(), | |
TimeElapsedColumn(), | |
) | |
else: | |
total = size | |
columns = ( | |
TextColumn("[progress.description]{task.description}"), | |
BarColumn(), | |
DownloadColumn(), | |
TransferSpeedColumn(), | |
TextColumn("eta"), | |
TimeRemainingColumn(), | |
) | |
progress = Progress(*columns, refresh_per_second=5) | |
task_id = progress.add_task(" " * (get_indentation() + 2), total=total) | |
if initial_progress is not None: | |
progress.update(task_id, advance=initial_progress) | |
with progress: | |
for chunk in iterable: | |
yield chunk | |
progress.update(task_id, advance=len(chunk)) | |
def _rich_install_progress_bar( | |
iterable: Iterable[InstallRequirement], *, total: int | |
) -> Iterator[InstallRequirement]: | |
columns = ( | |
TextColumn("{task.fields[indent]}"), | |
BarColumn(), | |
MofNCompleteColumn(), | |
TextColumn("{task.description}"), | |
) | |
console = get_console() | |
bar = Progress(*columns, refresh_per_second=6, console=console, transient=True) | |
# Hiding the progress bar at initialization forces a refresh cycle to occur | |
# until the bar appears, avoiding very short flashes. | |
task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False) | |
with bar: | |
for req in iterable: | |
bar.update(task, description=rf"\[{req.name}]", visible=True) | |
yield req | |
bar.advance(task) | |
def _raw_progress_bar( | |
iterable: Iterable[bytes], | |
*, | |
size: Optional[int], | |
initial_progress: Optional[int] = None, | |
) -> Generator[bytes, None, None]: | |
def write_progress(current: int, total: int) -> None: | |
sys.stdout.write(f"Progress {current} of {total}\n") | |
sys.stdout.flush() | |
current = initial_progress or 0 | |
total = size or 0 | |
rate_limiter = RateLimiter(0.25) | |
write_progress(current, total) | |
for chunk in iterable: | |
current += len(chunk) | |
if rate_limiter.ready() or current == total: | |
write_progress(current, total) | |
rate_limiter.reset() | |
yield chunk | |
def get_download_progress_renderer( | |
*, bar_type: str, size: Optional[int] = None, initial_progress: Optional[int] = None | |
) -> ProgressRenderer[bytes]: | |
"""Get an object that can be used to render the download progress. | |
Returns a callable, that takes an iterable to "wrap". | |
""" | |
if bar_type == "on": | |
return functools.partial( | |
_rich_download_progress_bar, | |
bar_type=bar_type, | |
size=size, | |
initial_progress=initial_progress, | |
) | |
elif bar_type == "raw": | |
return functools.partial( | |
_raw_progress_bar, | |
size=size, | |
initial_progress=initial_progress, | |
) | |
else: | |
return iter # no-op, when passed an iterator | |
def get_install_progress_renderer( | |
*, bar_type: str, total: int | |
) -> ProgressRenderer[InstallRequirement]: | |
"""Get an object that can be used to render the install progress. | |
Returns a callable, that takes an iterable to "wrap". | |
""" | |
if bar_type == "on": | |
return functools.partial(_rich_install_progress_bar, total=total) | |
else: | |
return iter | |