Spaces:
Sleeping
Sleeping
import json | |
import logging | |
from optparse import Values | |
from typing import TYPE_CHECKING, Generator, List, Optional, Sequence, Tuple, cast | |
from pip._vendor.packaging.utils import canonicalize_name | |
from pip._internal.cli import cmdoptions | |
from pip._internal.cli.req_command import IndexGroupCommand | |
from pip._internal.cli.status_codes import SUCCESS | |
from pip._internal.exceptions import CommandError | |
from pip._internal.index.collector import LinkCollector | |
from pip._internal.index.package_finder import PackageFinder | |
from pip._internal.metadata import BaseDistribution, get_environment | |
from pip._internal.models.selection_prefs import SelectionPreferences | |
from pip._internal.network.session import PipSession | |
from pip._internal.utils.compat import stdlib_pkgs | |
from pip._internal.utils.misc import tabulate, write_output | |
if TYPE_CHECKING: | |
from pip._internal.metadata.base import DistributionVersion | |
class _DistWithLatestInfo(BaseDistribution): | |
"""Give the distribution object a couple of extra fields. | |
These will be populated during ``get_outdated()``. This is dirty but | |
makes the rest of the code much cleaner. | |
""" | |
latest_version: DistributionVersion | |
latest_filetype: str | |
_ProcessedDists = Sequence[_DistWithLatestInfo] | |
logger = logging.getLogger(__name__) | |
class ListCommand(IndexGroupCommand): | |
""" | |
List installed packages, including editables. | |
Packages are listed in a case-insensitive sorted order. | |
""" | |
ignore_require_venv = True | |
usage = """ | |
%prog [options]""" | |
def add_options(self) -> None: | |
self.cmd_opts.add_option( | |
"-o", | |
"--outdated", | |
action="store_true", | |
default=False, | |
help="List outdated packages", | |
) | |
self.cmd_opts.add_option( | |
"-u", | |
"--uptodate", | |
action="store_true", | |
default=False, | |
help="List uptodate packages", | |
) | |
self.cmd_opts.add_option( | |
"-e", | |
"--editable", | |
action="store_true", | |
default=False, | |
help="List editable projects.", | |
) | |
self.cmd_opts.add_option( | |
"-l", | |
"--local", | |
action="store_true", | |
default=False, | |
help=( | |
"If in a virtualenv that has global access, do not list " | |
"globally-installed packages." | |
), | |
) | |
self.cmd_opts.add_option( | |
"--user", | |
dest="user", | |
action="store_true", | |
default=False, | |
help="Only output packages installed in user-site.", | |
) | |
self.cmd_opts.add_option(cmdoptions.list_path()) | |
self.cmd_opts.add_option( | |
"--pre", | |
action="store_true", | |
default=False, | |
help=( | |
"Include pre-release and development versions. By default, " | |
"pip only finds stable versions." | |
), | |
) | |
self.cmd_opts.add_option( | |
"--format", | |
action="store", | |
dest="list_format", | |
default="columns", | |
choices=("columns", "freeze", "json"), | |
help="Select the output format among: columns (default), freeze, or json", | |
) | |
self.cmd_opts.add_option( | |
"--not-required", | |
action="store_true", | |
dest="not_required", | |
help="List packages that are not dependencies of installed packages.", | |
) | |
self.cmd_opts.add_option( | |
"--exclude-editable", | |
action="store_false", | |
dest="include_editable", | |
help="Exclude editable package from output.", | |
) | |
self.cmd_opts.add_option( | |
"--include-editable", | |
action="store_true", | |
dest="include_editable", | |
help="Include editable package from output.", | |
default=True, | |
) | |
self.cmd_opts.add_option(cmdoptions.list_exclude()) | |
index_opts = cmdoptions.make_option_group(cmdoptions.index_group, self.parser) | |
self.parser.insert_option_group(0, index_opts) | |
self.parser.insert_option_group(0, self.cmd_opts) | |
def _build_package_finder( | |
self, options: Values, session: PipSession | |
) -> PackageFinder: | |
""" | |
Create a package finder appropriate to this list command. | |
""" | |
link_collector = LinkCollector.create(session, options=options) | |
# Pass allow_yanked=False to ignore yanked versions. | |
selection_prefs = SelectionPreferences( | |
allow_yanked=False, | |
allow_all_prereleases=options.pre, | |
) | |
return PackageFinder.create( | |
link_collector=link_collector, | |
selection_prefs=selection_prefs, | |
) | |
def run(self, options: Values, args: List[str]) -> int: | |
if options.outdated and options.uptodate: | |
raise CommandError("Options --outdated and --uptodate cannot be combined.") | |
if options.outdated and options.list_format == "freeze": | |
raise CommandError( | |
"List format 'freeze' can not be used with the --outdated option." | |
) | |
cmdoptions.check_list_path_option(options) | |
skip = set(stdlib_pkgs) | |
if options.excludes: | |
skip.update(canonicalize_name(n) for n in options.excludes) | |
packages: "_ProcessedDists" = [ | |
cast("_DistWithLatestInfo", d) | |
for d in get_environment(options.path).iter_installed_distributions( | |
local_only=options.local, | |
user_only=options.user, | |
editables_only=options.editable, | |
include_editables=options.include_editable, | |
skip=skip, | |
) | |
] | |
# get_not_required must be called firstly in order to find and | |
# filter out all dependencies correctly. Otherwise a package | |
# can't be identified as requirement because some parent packages | |
# could be filtered out before. | |
if options.not_required: | |
packages = self.get_not_required(packages, options) | |
if options.outdated: | |
packages = self.get_outdated(packages, options) | |
elif options.uptodate: | |
packages = self.get_uptodate(packages, options) | |
self.output_package_listing(packages, options) | |
return SUCCESS | |
def get_outdated( | |
self, packages: "_ProcessedDists", options: Values | |
) -> "_ProcessedDists": | |
return [ | |
dist | |
for dist in self.iter_packages_latest_infos(packages, options) | |
if dist.latest_version > dist.version | |
] | |
def get_uptodate( | |
self, packages: "_ProcessedDists", options: Values | |
) -> "_ProcessedDists": | |
return [ | |
dist | |
for dist in self.iter_packages_latest_infos(packages, options) | |
if dist.latest_version == dist.version | |
] | |
def get_not_required( | |
self, packages: "_ProcessedDists", options: Values | |
) -> "_ProcessedDists": | |
dep_keys = { | |
canonicalize_name(dep.name) | |
for dist in packages | |
for dep in (dist.iter_dependencies() or ()) | |
} | |
# Create a set to remove duplicate packages, and cast it to a list | |
# to keep the return type consistent with get_outdated and | |
# get_uptodate | |
return list({pkg for pkg in packages if pkg.canonical_name not in dep_keys}) | |
def iter_packages_latest_infos( | |
self, packages: "_ProcessedDists", options: Values | |
) -> Generator["_DistWithLatestInfo", None, None]: | |
with self._build_session(options) as session: | |
finder = self._build_package_finder(options, session) | |
def latest_info( | |
dist: "_DistWithLatestInfo", | |
) -> Optional["_DistWithLatestInfo"]: | |
all_candidates = finder.find_all_candidates(dist.canonical_name) | |
if not options.pre: | |
# Remove prereleases | |
all_candidates = [ | |
candidate | |
for candidate in all_candidates | |
if not candidate.version.is_prerelease | |
] | |
evaluator = finder.make_candidate_evaluator( | |
project_name=dist.canonical_name, | |
) | |
best_candidate = evaluator.sort_best_candidate(all_candidates) | |
if best_candidate is None: | |
return None | |
remote_version = best_candidate.version | |
if best_candidate.link.is_wheel: | |
typ = "wheel" | |
else: | |
typ = "sdist" | |
dist.latest_version = remote_version | |
dist.latest_filetype = typ | |
return dist | |
for dist in map(latest_info, packages): | |
if dist is not None: | |
yield dist | |
def output_package_listing( | |
self, packages: "_ProcessedDists", options: Values | |
) -> None: | |
packages = sorted( | |
packages, | |
key=lambda dist: dist.canonical_name, | |
) | |
if options.list_format == "columns" and packages: | |
data, header = format_for_columns(packages, options) | |
self.output_package_listing_columns(data, header) | |
elif options.list_format == "freeze": | |
for dist in packages: | |
if options.verbose >= 1: | |
write_output( | |
"%s==%s (%s)", dist.raw_name, dist.version, dist.location | |
) | |
else: | |
write_output("%s==%s", dist.raw_name, dist.version) | |
elif options.list_format == "json": | |
write_output(format_for_json(packages, options)) | |
def output_package_listing_columns( | |
self, data: List[List[str]], header: List[str] | |
) -> None: | |
# insert the header first: we need to know the size of column names | |
if len(data) > 0: | |
data.insert(0, header) | |
pkg_strings, sizes = tabulate(data) | |
# Create and add a separator. | |
if len(data) > 0: | |
pkg_strings.insert(1, " ".join(map(lambda x: "-" * x, sizes))) | |
for val in pkg_strings: | |
write_output(val) | |
def format_for_columns( | |
pkgs: "_ProcessedDists", options: Values | |
) -> Tuple[List[List[str]], List[str]]: | |
""" | |
Convert the package data into something usable | |
by output_package_listing_columns. | |
""" | |
header = ["Package", "Version"] | |
running_outdated = options.outdated | |
if running_outdated: | |
header.extend(["Latest", "Type"]) | |
has_editables = any(x.editable for x in pkgs) | |
if has_editables: | |
header.append("Editable project location") | |
if options.verbose >= 1: | |
header.append("Location") | |
if options.verbose >= 1: | |
header.append("Installer") | |
data = [] | |
for proj in pkgs: | |
# if we're working on the 'outdated' list, separate out the | |
# latest_version and type | |
row = [proj.raw_name, str(proj.version)] | |
if running_outdated: | |
row.append(str(proj.latest_version)) | |
row.append(proj.latest_filetype) | |
if has_editables: | |
row.append(proj.editable_project_location or "") | |
if options.verbose >= 1: | |
row.append(proj.location or "") | |
if options.verbose >= 1: | |
row.append(proj.installer) | |
data.append(row) | |
return data, header | |
def format_for_json(packages: "_ProcessedDists", options: Values) -> str: | |
data = [] | |
for dist in packages: | |
info = { | |
"name": dist.raw_name, | |
"version": str(dist.version), | |
} | |
if options.verbose >= 1: | |
info["location"] = dist.location or "" | |
info["installer"] = dist.installer | |
if options.outdated: | |
info["latest_version"] = str(dist.latest_version) | |
info["latest_filetype"] = dist.latest_filetype | |
editable_project_location = dist.editable_project_location | |
if editable_project_location: | |
info["editable_project_location"] = editable_project_location | |
data.append(info) | |
return json.dumps(data) | |