Spaces:
Sleeping
Sleeping
import logging | |
import shutil | |
import sys | |
import textwrap | |
import xmlrpc.client | |
from collections import OrderedDict | |
from optparse import Values | |
from typing import TYPE_CHECKING, Dict, List, Optional | |
from pip._vendor.packaging.version import parse as parse_version | |
from pip._internal.cli.base_command import Command | |
from pip._internal.cli.req_command import SessionCommandMixin | |
from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS | |
from pip._internal.exceptions import CommandError | |
from pip._internal.metadata import get_default_environment | |
from pip._internal.models.index import PyPI | |
from pip._internal.network.xmlrpc import PipXmlrpcTransport | |
from pip._internal.utils.logging import indent_log | |
from pip._internal.utils.misc import write_output | |
if TYPE_CHECKING: | |
from typing import TypedDict | |
class TransformedHit(TypedDict): | |
name: str | |
summary: str | |
versions: List[str] | |
logger = logging.getLogger(__name__) | |
class SearchCommand(Command, SessionCommandMixin): | |
"""Search for PyPI packages whose name or summary contains <query>.""" | |
usage = """ | |
%prog [options] <query>""" | |
ignore_require_venv = True | |
def add_options(self) -> None: | |
self.cmd_opts.add_option( | |
"-i", | |
"--index", | |
dest="index", | |
metavar="URL", | |
default=PyPI.pypi_url, | |
help="Base URL of Python Package Index (default %default)", | |
) | |
self.parser.insert_option_group(0, self.cmd_opts) | |
def run(self, options: Values, args: List[str]) -> int: | |
if not args: | |
raise CommandError("Missing required argument (search query).") | |
query = args | |
pypi_hits = self.search(query, options) | |
hits = transform_hits(pypi_hits) | |
terminal_width = None | |
if sys.stdout.isatty(): | |
terminal_width = shutil.get_terminal_size()[0] | |
print_results(hits, terminal_width=terminal_width) | |
if pypi_hits: | |
return SUCCESS | |
return NO_MATCHES_FOUND | |
def search(self, query: List[str], options: Values) -> List[Dict[str, str]]: | |
index_url = options.index | |
session = self.get_default_session(options) | |
transport = PipXmlrpcTransport(index_url, session) | |
pypi = xmlrpc.client.ServerProxy(index_url, transport) | |
try: | |
hits = pypi.search({"name": query, "summary": query}, "or") | |
except xmlrpc.client.Fault as fault: | |
message = "XMLRPC request failed [code: {code}]\n{string}".format( | |
code=fault.faultCode, | |
string=fault.faultString, | |
) | |
raise CommandError(message) | |
assert isinstance(hits, list) | |
return hits | |
def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]: | |
""" | |
The list from pypi is really a list of versions. We want a list of | |
packages with the list of versions stored inline. This converts the | |
list from pypi into one we can use. | |
""" | |
packages: Dict[str, "TransformedHit"] = OrderedDict() | |
for hit in hits: | |
name = hit["name"] | |
summary = hit["summary"] | |
version = hit["version"] | |
if name not in packages.keys(): | |
packages[name] = { | |
"name": name, | |
"summary": summary, | |
"versions": [version], | |
} | |
else: | |
packages[name]["versions"].append(version) | |
# if this is the highest version, replace summary and score | |
if version == highest_version(packages[name]["versions"]): | |
packages[name]["summary"] = summary | |
return list(packages.values()) | |
def print_dist_installation_info(name: str, latest: str) -> None: | |
env = get_default_environment() | |
dist = env.get_distribution(name) | |
if dist is not None: | |
with indent_log(): | |
if dist.version == latest: | |
write_output("INSTALLED: %s (latest)", dist.version) | |
else: | |
write_output("INSTALLED: %s", dist.version) | |
if parse_version(latest).pre: | |
write_output( | |
"LATEST: %s (pre-release; install" | |
" with `pip install --pre`)", | |
latest, | |
) | |
else: | |
write_output("LATEST: %s", latest) | |
def print_results( | |
hits: List["TransformedHit"], | |
name_column_width: Optional[int] = None, | |
terminal_width: Optional[int] = None, | |
) -> None: | |
if not hits: | |
return | |
if name_column_width is None: | |
name_column_width = ( | |
max( | |
[ | |
len(hit["name"]) + len(highest_version(hit.get("versions", ["-"]))) | |
for hit in hits | |
] | |
) | |
+ 4 | |
) | |
for hit in hits: | |
name = hit["name"] | |
summary = hit["summary"] or "" | |
latest = highest_version(hit.get("versions", ["-"])) | |
if terminal_width is not None: | |
target_width = terminal_width - name_column_width - 5 | |
if target_width > 10: | |
# wrap and indent summary to fit terminal | |
summary_lines = textwrap.wrap(summary, target_width) | |
summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines) | |
name_latest = f"{name} ({latest})" | |
line = f"{name_latest:{name_column_width}} - {summary}" | |
try: | |
write_output(line) | |
print_dist_installation_info(name, latest) | |
except UnicodeEncodeError: | |
pass | |
def highest_version(versions: List[str]) -> str: | |
return max(versions, key=parse_version) | |