|
import os |
|
import re |
|
import abc |
|
import csv |
|
import sys |
|
import email |
|
import pathlib |
|
import zipfile |
|
import operator |
|
import textwrap |
|
import warnings |
|
import functools |
|
import itertools |
|
import posixpath |
|
import collections |
|
|
|
from . import _adapters, _meta |
|
from ._collections import FreezableDefaultDict, Pair |
|
from ._functools import method_cache, pass_none |
|
from ._itertools import always_iterable, unique_everseen |
|
from ._meta import PackageMetadata, SimplePath |
|
|
|
from contextlib import suppress |
|
from importlib import import_module |
|
from importlib.abc import MetaPathFinder |
|
from itertools import starmap |
|
from typing import List, Mapping, Optional, Union |
|
|
|
|
|
__all__ = [ |
|
'Distribution', |
|
'DistributionFinder', |
|
'PackageMetadata', |
|
'PackageNotFoundError', |
|
'distribution', |
|
'distributions', |
|
'entry_points', |
|
'files', |
|
'metadata', |
|
'packages_distributions', |
|
'requires', |
|
'version', |
|
] |
|
|
|
|
|
class PackageNotFoundError(ModuleNotFoundError): |
|
"""The package was not found.""" |
|
|
|
def __str__(self): |
|
return f"No package metadata was found for {self.name}" |
|
|
|
@property |
|
def name(self): |
|
(name,) = self.args |
|
return name |
|
|
|
|
|
class Sectioned: |
|
""" |
|
A simple entry point config parser for performance |
|
|
|
>>> for item in Sectioned.read(Sectioned._sample): |
|
... print(item) |
|
Pair(name='sec1', value='# comments ignored') |
|
Pair(name='sec1', value='a = 1') |
|
Pair(name='sec1', value='b = 2') |
|
Pair(name='sec2', value='a = 2') |
|
|
|
>>> res = Sectioned.section_pairs(Sectioned._sample) |
|
>>> item = next(res) |
|
>>> item.name |
|
'sec1' |
|
>>> item.value |
|
Pair(name='a', value='1') |
|
>>> item = next(res) |
|
>>> item.value |
|
Pair(name='b', value='2') |
|
>>> item = next(res) |
|
>>> item.name |
|
'sec2' |
|
>>> item.value |
|
Pair(name='a', value='2') |
|
>>> list(res) |
|
[] |
|
""" |
|
|
|
_sample = textwrap.dedent( |
|
""" |
|
[sec1] |
|
# comments ignored |
|
a = 1 |
|
b = 2 |
|
|
|
[sec2] |
|
a = 2 |
|
""" |
|
).lstrip() |
|
|
|
@classmethod |
|
def section_pairs(cls, text): |
|
return ( |
|
section._replace(value=Pair.parse(section.value)) |
|
for section in cls.read(text, filter_=cls.valid) |
|
if section.name is not None |
|
) |
|
|
|
@staticmethod |
|
def read(text, filter_=None): |
|
lines = filter(filter_, map(str.strip, text.splitlines())) |
|
name = None |
|
for value in lines: |
|
section_match = value.startswith('[') and value.endswith(']') |
|
if section_match: |
|
name = value.strip('[]') |
|
continue |
|
yield Pair(name, value) |
|
|
|
@staticmethod |
|
def valid(line): |
|
return line and not line.startswith('#') |
|
|
|
|
|
class DeprecatedTuple: |
|
""" |
|
Provide subscript item access for backward compatibility. |
|
|
|
>>> recwarn = getfixture('recwarn') |
|
>>> ep = EntryPoint(name='name', value='value', group='group') |
|
>>> ep[:] |
|
('name', 'value', 'group') |
|
>>> ep[0] |
|
'name' |
|
>>> len(recwarn) |
|
1 |
|
""" |
|
|
|
_warn = functools.partial( |
|
warnings.warn, |
|
"EntryPoint tuple interface is deprecated. Access members by name.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
|
|
def __getitem__(self, item): |
|
self._warn() |
|
return self._key()[item] |
|
|
|
|
|
class EntryPoint(DeprecatedTuple): |
|
"""An entry point as defined by Python packaging conventions. |
|
|
|
See `the packaging docs on entry points |
|
<https://packaging.python.org/specifications/entry-points/>`_ |
|
for more information. |
|
|
|
>>> ep = EntryPoint( |
|
... name=None, group=None, value='package.module:attr [extra1, extra2]') |
|
>>> ep.module |
|
'package.module' |
|
>>> ep.attr |
|
'attr' |
|
>>> ep.extras |
|
['extra1', 'extra2'] |
|
""" |
|
|
|
pattern = re.compile( |
|
r'(?P<module>[\w.]+)\s*' |
|
r'(:\s*(?P<attr>[\w.]+)\s*)?' |
|
r'((?P<extras>\[.*\])\s*)?$' |
|
) |
|
""" |
|
A regular expression describing the syntax for an entry point, |
|
which might look like: |
|
|
|
- module |
|
- package.module |
|
- package.module:attribute |
|
- package.module:object.attribute |
|
- package.module:attr [extra1, extra2] |
|
|
|
Other combinations are possible as well. |
|
|
|
The expression is lenient about whitespace around the ':', |
|
following the attr, and following any extras. |
|
""" |
|
|
|
name: str |
|
value: str |
|
group: str |
|
|
|
dist: Optional['Distribution'] = None |
|
|
|
def __init__(self, name, value, group): |
|
vars(self).update(name=name, value=value, group=group) |
|
|
|
def load(self): |
|
"""Load the entry point from its definition. If only a module |
|
is indicated by the value, return that module. Otherwise, |
|
return the named object. |
|
""" |
|
match = self.pattern.match(self.value) |
|
module = import_module(match.group('module')) |
|
attrs = filter(None, (match.group('attr') or '').split('.')) |
|
return functools.reduce(getattr, attrs, module) |
|
|
|
@property |
|
def module(self): |
|
match = self.pattern.match(self.value) |
|
return match.group('module') |
|
|
|
@property |
|
def attr(self): |
|
match = self.pattern.match(self.value) |
|
return match.group('attr') |
|
|
|
@property |
|
def extras(self): |
|
match = self.pattern.match(self.value) |
|
return re.findall(r'\w+', match.group('extras') or '') |
|
|
|
def _for(self, dist): |
|
vars(self).update(dist=dist) |
|
return self |
|
|
|
def __iter__(self): |
|
""" |
|
Supply iter so one may construct dicts of EntryPoints by name. |
|
""" |
|
msg = ( |
|
"Construction of dict of EntryPoints is deprecated in " |
|
"favor of EntryPoints." |
|
) |
|
warnings.warn(msg, DeprecationWarning) |
|
return iter((self.name, self)) |
|
|
|
def matches(self, **params): |
|
""" |
|
EntryPoint matches the given parameters. |
|
|
|
>>> ep = EntryPoint(group='foo', name='bar', value='bing:bong [extra1, extra2]') |
|
>>> ep.matches(group='foo') |
|
True |
|
>>> ep.matches(name='bar', value='bing:bong [extra1, extra2]') |
|
True |
|
>>> ep.matches(group='foo', name='other') |
|
False |
|
>>> ep.matches() |
|
True |
|
>>> ep.matches(extras=['extra1', 'extra2']) |
|
True |
|
>>> ep.matches(module='bing') |
|
True |
|
>>> ep.matches(attr='bong') |
|
True |
|
""" |
|
attrs = (getattr(self, param) for param in params) |
|
return all(map(operator.eq, params.values(), attrs)) |
|
|
|
def _key(self): |
|
return self.name, self.value, self.group |
|
|
|
def __lt__(self, other): |
|
return self._key() < other._key() |
|
|
|
def __eq__(self, other): |
|
return self._key() == other._key() |
|
|
|
def __setattr__(self, name, value): |
|
raise AttributeError("EntryPoint objects are immutable.") |
|
|
|
def __repr__(self): |
|
return ( |
|
f'EntryPoint(name={self.name!r}, value={self.value!r}, ' |
|
f'group={self.group!r})' |
|
) |
|
|
|
def __hash__(self): |
|
return hash(self._key()) |
|
|
|
|
|
class DeprecatedList(list): |
|
""" |
|
Allow an otherwise immutable object to implement mutability |
|
for compatibility. |
|
|
|
>>> recwarn = getfixture('recwarn') |
|
>>> dl = DeprecatedList(range(3)) |
|
>>> dl[0] = 1 |
|
>>> dl.append(3) |
|
>>> del dl[3] |
|
>>> dl.reverse() |
|
>>> dl.sort() |
|
>>> dl.extend([4]) |
|
>>> dl.pop(-1) |
|
4 |
|
>>> dl.remove(1) |
|
>>> dl += [5] |
|
>>> dl + [6] |
|
[1, 2, 5, 6] |
|
>>> dl + (6,) |
|
[1, 2, 5, 6] |
|
>>> dl.insert(0, 0) |
|
>>> dl |
|
[0, 1, 2, 5] |
|
>>> dl == [0, 1, 2, 5] |
|
True |
|
>>> dl == (0, 1, 2, 5) |
|
True |
|
>>> len(recwarn) |
|
1 |
|
""" |
|
|
|
__slots__ = () |
|
|
|
_warn = functools.partial( |
|
warnings.warn, |
|
"EntryPoints list interface is deprecated. Cast to list if needed.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
|
|
def _wrap_deprecated_method(method_name: str): |
|
def wrapped(self, *args, **kwargs): |
|
self._warn() |
|
return getattr(super(), method_name)(*args, **kwargs) |
|
|
|
return method_name, wrapped |
|
|
|
locals().update( |
|
map( |
|
_wrap_deprecated_method, |
|
'__setitem__ __delitem__ append reverse extend pop remove ' |
|
'__iadd__ insert sort'.split(), |
|
) |
|
) |
|
|
|
def __add__(self, other): |
|
if not isinstance(other, tuple): |
|
self._warn() |
|
other = tuple(other) |
|
return self.__class__(tuple(self) + other) |
|
|
|
def __eq__(self, other): |
|
if not isinstance(other, tuple): |
|
self._warn() |
|
other = tuple(other) |
|
|
|
return tuple(self).__eq__(other) |
|
|
|
|
|
class EntryPoints(DeprecatedList): |
|
""" |
|
An immutable collection of selectable EntryPoint objects. |
|
""" |
|
|
|
__slots__ = () |
|
|
|
def __getitem__(self, name): |
|
""" |
|
Get the EntryPoint in self matching name. |
|
""" |
|
if isinstance(name, int): |
|
warnings.warn( |
|
"Accessing entry points by index is deprecated. " |
|
"Cast to tuple if needed.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
return super().__getitem__(name) |
|
try: |
|
return next(iter(self.select(name=name))) |
|
except StopIteration: |
|
raise KeyError(name) |
|
|
|
def select(self, **params): |
|
""" |
|
Select entry points from self that match the |
|
given parameters (typically group and/or name). |
|
""" |
|
return EntryPoints(ep for ep in self if ep.matches(**params)) |
|
|
|
@property |
|
def names(self): |
|
""" |
|
Return the set of all names of all entry points. |
|
""" |
|
return {ep.name for ep in self} |
|
|
|
@property |
|
def groups(self): |
|
""" |
|
Return the set of all groups of all entry points. |
|
|
|
For coverage while SelectableGroups is present. |
|
>>> EntryPoints().groups |
|
set() |
|
""" |
|
return {ep.group for ep in self} |
|
|
|
@classmethod |
|
def _from_text_for(cls, text, dist): |
|
return cls(ep._for(dist) for ep in cls._from_text(text)) |
|
|
|
@staticmethod |
|
def _from_text(text): |
|
return ( |
|
EntryPoint(name=item.value.name, value=item.value.value, group=item.name) |
|
for item in Sectioned.section_pairs(text or '') |
|
) |
|
|
|
|
|
class Deprecated: |
|
""" |
|
Compatibility add-in for mapping to indicate that |
|
mapping behavior is deprecated. |
|
|
|
>>> recwarn = getfixture('recwarn') |
|
>>> class DeprecatedDict(Deprecated, dict): pass |
|
>>> dd = DeprecatedDict(foo='bar') |
|
>>> dd.get('baz', None) |
|
>>> dd['foo'] |
|
'bar' |
|
>>> list(dd) |
|
['foo'] |
|
>>> list(dd.keys()) |
|
['foo'] |
|
>>> 'foo' in dd |
|
True |
|
>>> list(dd.values()) |
|
['bar'] |
|
>>> len(recwarn) |
|
1 |
|
""" |
|
|
|
_warn = functools.partial( |
|
warnings.warn, |
|
"SelectableGroups dict interface is deprecated. Use select.", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
|
|
def __getitem__(self, name): |
|
self._warn() |
|
return super().__getitem__(name) |
|
|
|
def get(self, name, default=None): |
|
self._warn() |
|
return super().get(name, default) |
|
|
|
def __iter__(self): |
|
self._warn() |
|
return super().__iter__() |
|
|
|
def __contains__(self, *args): |
|
self._warn() |
|
return super().__contains__(*args) |
|
|
|
def keys(self): |
|
self._warn() |
|
return super().keys() |
|
|
|
def values(self): |
|
self._warn() |
|
return super().values() |
|
|
|
|
|
class SelectableGroups(Deprecated, dict): |
|
""" |
|
A backward- and forward-compatible result from |
|
entry_points that fully implements the dict interface. |
|
""" |
|
|
|
@classmethod |
|
def load(cls, eps): |
|
by_group = operator.attrgetter('group') |
|
ordered = sorted(eps, key=by_group) |
|
grouped = itertools.groupby(ordered, by_group) |
|
return cls((group, EntryPoints(eps)) for group, eps in grouped) |
|
|
|
@property |
|
def _all(self): |
|
""" |
|
Reconstruct a list of all entrypoints from the groups. |
|
""" |
|
groups = super(Deprecated, self).values() |
|
return EntryPoints(itertools.chain.from_iterable(groups)) |
|
|
|
@property |
|
def groups(self): |
|
return self._all.groups |
|
|
|
@property |
|
def names(self): |
|
""" |
|
for coverage: |
|
>>> SelectableGroups().names |
|
set() |
|
""" |
|
return self._all.names |
|
|
|
def select(self, **params): |
|
if not params: |
|
return self |
|
return self._all.select(**params) |
|
|
|
|
|
class PackagePath(pathlib.PurePosixPath): |
|
"""A reference to a path in a package""" |
|
|
|
def read_text(self, encoding='utf-8'): |
|
with self.locate().open(encoding=encoding) as stream: |
|
return stream.read() |
|
|
|
def read_binary(self): |
|
with self.locate().open('rb') as stream: |
|
return stream.read() |
|
|
|
def locate(self): |
|
"""Return a path-like object for this path""" |
|
return self.dist.locate_file(self) |
|
|
|
|
|
class FileHash: |
|
def __init__(self, spec): |
|
self.mode, _, self.value = spec.partition('=') |
|
|
|
def __repr__(self): |
|
return f'<FileHash mode: {self.mode} value: {self.value}>' |
|
|
|
|
|
class Distribution: |
|
"""A Python distribution package.""" |
|
|
|
@abc.abstractmethod |
|
def read_text(self, filename): |
|
"""Attempt to load metadata file given by the name. |
|
|
|
:param filename: The name of the file in the distribution info. |
|
:return: The text if found, otherwise None. |
|
""" |
|
|
|
@abc.abstractmethod |
|
def locate_file(self, path): |
|
""" |
|
Given a path to a file in this distribution, return a path |
|
to it. |
|
""" |
|
|
|
@classmethod |
|
def from_name(cls, name: str): |
|
"""Return the Distribution for the given package name. |
|
|
|
:param name: The name of the distribution package to search for. |
|
:return: The Distribution instance (or subclass thereof) for the named |
|
package, if found. |
|
:raises PackageNotFoundError: When the named package's distribution |
|
metadata cannot be found. |
|
:raises ValueError: When an invalid value is supplied for name. |
|
""" |
|
if not name: |
|
raise ValueError("A distribution name is required.") |
|
try: |
|
return next(cls.discover(name=name)) |
|
except StopIteration: |
|
raise PackageNotFoundError(name) |
|
|
|
@classmethod |
|
def discover(cls, **kwargs): |
|
"""Return an iterable of Distribution objects for all packages. |
|
|
|
Pass a ``context`` or pass keyword arguments for constructing |
|
a context. |
|
|
|
:context: A ``DistributionFinder.Context`` object. |
|
:return: Iterable of Distribution objects for all packages. |
|
""" |
|
context = kwargs.pop('context', None) |
|
if context and kwargs: |
|
raise ValueError("cannot accept context and kwargs") |
|
context = context or DistributionFinder.Context(**kwargs) |
|
return itertools.chain.from_iterable( |
|
resolver(context) for resolver in cls._discover_resolvers() |
|
) |
|
|
|
@staticmethod |
|
def at(path): |
|
"""Return a Distribution for the indicated metadata path |
|
|
|
:param path: a string or path-like object |
|
:return: a concrete Distribution instance for the path |
|
""" |
|
return PathDistribution(pathlib.Path(path)) |
|
|
|
@staticmethod |
|
def _discover_resolvers(): |
|
"""Search the meta_path for resolvers.""" |
|
declared = ( |
|
getattr(finder, 'find_distributions', None) for finder in sys.meta_path |
|
) |
|
return filter(None, declared) |
|
|
|
@property |
|
def metadata(self) -> _meta.PackageMetadata: |
|
"""Return the parsed metadata for this Distribution. |
|
|
|
The returned object will have keys that name the various bits of |
|
metadata. See PEP 566 for details. |
|
""" |
|
text = ( |
|
self.read_text('METADATA') |
|
or self.read_text('PKG-INFO') |
|
|
|
|
|
|
|
or self.read_text('') |
|
) |
|
return _adapters.Message(email.message_from_string(text)) |
|
|
|
@property |
|
def name(self): |
|
"""Return the 'Name' metadata for the distribution package.""" |
|
return self.metadata['Name'] |
|
|
|
@property |
|
def _normalized_name(self): |
|
"""Return a normalized version of the name.""" |
|
return Prepared.normalize(self.name) |
|
|
|
@property |
|
def version(self): |
|
"""Return the 'Version' metadata for the distribution package.""" |
|
return self.metadata['Version'] |
|
|
|
@property |
|
def entry_points(self): |
|
return EntryPoints._from_text_for(self.read_text('entry_points.txt'), self) |
|
|
|
@property |
|
def files(self): |
|
"""Files in this distribution. |
|
|
|
:return: List of PackagePath for this distribution or None |
|
|
|
Result is `None` if the metadata file that enumerates files |
|
(i.e. RECORD for dist-info or SOURCES.txt for egg-info) is |
|
missing. |
|
Result may be empty if the metadata exists but is empty. |
|
""" |
|
|
|
def make_file(name, hash=None, size_str=None): |
|
result = PackagePath(name) |
|
result.hash = FileHash(hash) if hash else None |
|
result.size = int(size_str) if size_str else None |
|
result.dist = self |
|
return result |
|
|
|
@pass_none |
|
def make_files(lines): |
|
return list(starmap(make_file, csv.reader(lines))) |
|
|
|
return make_files(self._read_files_distinfo() or self._read_files_egginfo()) |
|
|
|
def _read_files_distinfo(self): |
|
""" |
|
Read the lines of RECORD |
|
""" |
|
text = self.read_text('RECORD') |
|
return text and text.splitlines() |
|
|
|
def _read_files_egginfo(self): |
|
""" |
|
SOURCES.txt might contain literal commas, so wrap each line |
|
in quotes. |
|
""" |
|
text = self.read_text('SOURCES.txt') |
|
return text and map('"{}"'.format, text.splitlines()) |
|
|
|
@property |
|
def requires(self): |
|
"""Generated requirements specified for this Distribution""" |
|
reqs = self._read_dist_info_reqs() or self._read_egg_info_reqs() |
|
return reqs and list(reqs) |
|
|
|
def _read_dist_info_reqs(self): |
|
return self.metadata.get_all('Requires-Dist') |
|
|
|
def _read_egg_info_reqs(self): |
|
source = self.read_text('requires.txt') |
|
return pass_none(self._deps_from_requires_text)(source) |
|
|
|
@classmethod |
|
def _deps_from_requires_text(cls, source): |
|
return cls._convert_egg_info_reqs_to_simple_reqs(Sectioned.read(source)) |
|
|
|
@staticmethod |
|
def _convert_egg_info_reqs_to_simple_reqs(sections): |
|
""" |
|
Historically, setuptools would solicit and store 'extra' |
|
requirements, including those with environment markers, |
|
in separate sections. More modern tools expect each |
|
dependency to be defined separately, with any relevant |
|
extras and environment markers attached directly to that |
|
requirement. This method converts the former to the |
|
latter. See _test_deps_from_requires_text for an example. |
|
""" |
|
|
|
def make_condition(name): |
|
return name and f'extra == "{name}"' |
|
|
|
def quoted_marker(section): |
|
section = section or '' |
|
extra, sep, markers = section.partition(':') |
|
if extra and markers: |
|
markers = f'({markers})' |
|
conditions = list(filter(None, [markers, make_condition(extra)])) |
|
return '; ' + ' and '.join(conditions) if conditions else '' |
|
|
|
def url_req_space(req): |
|
""" |
|
PEP 508 requires a space between the url_spec and the quoted_marker. |
|
Ref python/importlib_metadata#357. |
|
""" |
|
|
|
return ' ' * ('@' in req) |
|
|
|
for section in sections: |
|
space = url_req_space(section.value) |
|
yield section.value + space + quoted_marker(section.name) |
|
|
|
|
|
class DistributionFinder(MetaPathFinder): |
|
""" |
|
A MetaPathFinder capable of discovering installed distributions. |
|
""" |
|
|
|
class Context: |
|
""" |
|
Keyword arguments presented by the caller to |
|
``distributions()`` or ``Distribution.discover()`` |
|
to narrow the scope of a search for distributions |
|
in all DistributionFinders. |
|
|
|
Each DistributionFinder may expect any parameters |
|
and should attempt to honor the canonical |
|
parameters defined below when appropriate. |
|
""" |
|
|
|
name = None |
|
""" |
|
Specific name for which a distribution finder should match. |
|
A name of ``None`` matches all distributions. |
|
""" |
|
|
|
def __init__(self, **kwargs): |
|
vars(self).update(kwargs) |
|
|
|
@property |
|
def path(self): |
|
""" |
|
The sequence of directory path that a distribution finder |
|
should search. |
|
|
|
Typically refers to Python installed package paths such as |
|
"site-packages" directories and defaults to ``sys.path``. |
|
""" |
|
return vars(self).get('path', sys.path) |
|
|
|
@abc.abstractmethod |
|
def find_distributions(self, context=Context()): |
|
""" |
|
Find distributions. |
|
|
|
Return an iterable of all Distribution instances capable of |
|
loading the metadata for packages matching the ``context``, |
|
a DistributionFinder.Context instance. |
|
""" |
|
|
|
|
|
class FastPath: |
|
""" |
|
Micro-optimized class for searching a path for |
|
children. |
|
|
|
>>> FastPath('').children() |
|
['...'] |
|
""" |
|
|
|
@functools.lru_cache() |
|
def __new__(cls, root): |
|
return super().__new__(cls) |
|
|
|
def __init__(self, root): |
|
self.root = root |
|
|
|
def joinpath(self, child): |
|
return pathlib.Path(self.root, child) |
|
|
|
def children(self): |
|
with suppress(Exception): |
|
return os.listdir(self.root or '.') |
|
with suppress(Exception): |
|
return self.zip_children() |
|
return [] |
|
|
|
def zip_children(self): |
|
zip_path = zipfile.Path(self.root) |
|
names = zip_path.root.namelist() |
|
self.joinpath = zip_path.joinpath |
|
|
|
return dict.fromkeys(child.split(posixpath.sep, 1)[0] for child in names) |
|
|
|
def search(self, name): |
|
return self.lookup(self.mtime).search(name) |
|
|
|
@property |
|
def mtime(self): |
|
with suppress(OSError): |
|
return os.stat(self.root).st_mtime |
|
self.lookup.cache_clear() |
|
|
|
@method_cache |
|
def lookup(self, mtime): |
|
return Lookup(self) |
|
|
|
|
|
class Lookup: |
|
def __init__(self, path: FastPath): |
|
base = os.path.basename(path.root).lower() |
|
base_is_egg = base.endswith(".egg") |
|
self.infos = FreezableDefaultDict(list) |
|
self.eggs = FreezableDefaultDict(list) |
|
|
|
for child in path.children(): |
|
low = child.lower() |
|
if low.endswith((".dist-info", ".egg-info")): |
|
|
|
name = low.rpartition(".")[0].partition("-")[0] |
|
normalized = Prepared.normalize(name) |
|
self.infos[normalized].append(path.joinpath(child)) |
|
elif base_is_egg and low == "egg-info": |
|
name = base.rpartition(".")[0].partition("-")[0] |
|
legacy_normalized = Prepared.legacy_normalize(name) |
|
self.eggs[legacy_normalized].append(path.joinpath(child)) |
|
|
|
self.infos.freeze() |
|
self.eggs.freeze() |
|
|
|
def search(self, prepared): |
|
infos = ( |
|
self.infos[prepared.normalized] |
|
if prepared |
|
else itertools.chain.from_iterable(self.infos.values()) |
|
) |
|
eggs = ( |
|
self.eggs[prepared.legacy_normalized] |
|
if prepared |
|
else itertools.chain.from_iterable(self.eggs.values()) |
|
) |
|
return itertools.chain(infos, eggs) |
|
|
|
|
|
class Prepared: |
|
""" |
|
A prepared search for metadata on a possibly-named package. |
|
""" |
|
|
|
normalized = None |
|
legacy_normalized = None |
|
|
|
def __init__(self, name): |
|
self.name = name |
|
if name is None: |
|
return |
|
self.normalized = self.normalize(name) |
|
self.legacy_normalized = self.legacy_normalize(name) |
|
|
|
@staticmethod |
|
def normalize(name): |
|
""" |
|
PEP 503 normalization plus dashes as underscores. |
|
""" |
|
return re.sub(r"[-_.]+", "-", name).lower().replace('-', '_') |
|
|
|
@staticmethod |
|
def legacy_normalize(name): |
|
""" |
|
Normalize the package name as found in the convention in |
|
older packaging tools versions and specs. |
|
""" |
|
return name.lower().replace('-', '_') |
|
|
|
def __bool__(self): |
|
return bool(self.name) |
|
|
|
|
|
class MetadataPathFinder(DistributionFinder): |
|
@classmethod |
|
def find_distributions(cls, context=DistributionFinder.Context()): |
|
""" |
|
Find distributions. |
|
|
|
Return an iterable of all Distribution instances capable of |
|
loading the metadata for packages matching ``context.name`` |
|
(or all names if ``None`` indicated) along the paths in the list |
|
of directories ``context.path``. |
|
""" |
|
found = cls._search_paths(context.name, context.path) |
|
return map(PathDistribution, found) |
|
|
|
@classmethod |
|
def _search_paths(cls, name, paths): |
|
"""Find metadata directories in paths heuristically.""" |
|
prepared = Prepared(name) |
|
return itertools.chain.from_iterable( |
|
path.search(prepared) for path in map(FastPath, paths) |
|
) |
|
|
|
@classmethod |
|
def invalidate_caches(cls): |
|
FastPath.__new__.cache_clear() |
|
|
|
|
|
class PathDistribution(Distribution): |
|
def __init__(self, path: SimplePath): |
|
"""Construct a distribution. |
|
|
|
:param path: SimplePath indicating the metadata directory. |
|
""" |
|
self._path = path |
|
|
|
def read_text(self, filename): |
|
with suppress( |
|
FileNotFoundError, |
|
IsADirectoryError, |
|
KeyError, |
|
NotADirectoryError, |
|
PermissionError, |
|
): |
|
return self._path.joinpath(filename).read_text(encoding='utf-8') |
|
|
|
read_text.__doc__ = Distribution.read_text.__doc__ |
|
|
|
def locate_file(self, path): |
|
return self._path.parent / path |
|
|
|
@property |
|
def _normalized_name(self): |
|
""" |
|
Performance optimization: where possible, resolve the |
|
normalized name from the file system path. |
|
""" |
|
stem = os.path.basename(str(self._path)) |
|
return ( |
|
pass_none(Prepared.normalize)(self._name_from_stem(stem)) |
|
or super()._normalized_name |
|
) |
|
|
|
@staticmethod |
|
def _name_from_stem(stem): |
|
""" |
|
>>> PathDistribution._name_from_stem('foo-3.0.egg-info') |
|
'foo' |
|
>>> PathDistribution._name_from_stem('CherryPy-3.0.dist-info') |
|
'CherryPy' |
|
>>> PathDistribution._name_from_stem('face.egg-info') |
|
'face' |
|
>>> PathDistribution._name_from_stem('foo.bar') |
|
""" |
|
filename, ext = os.path.splitext(stem) |
|
if ext not in ('.dist-info', '.egg-info'): |
|
return |
|
name, sep, rest = filename.partition('-') |
|
return name |
|
|
|
|
|
def distribution(distribution_name): |
|
"""Get the ``Distribution`` instance for the named package. |
|
|
|
:param distribution_name: The name of the distribution package as a string. |
|
:return: A ``Distribution`` instance (or subclass thereof). |
|
""" |
|
return Distribution.from_name(distribution_name) |
|
|
|
|
|
def distributions(**kwargs): |
|
"""Get all ``Distribution`` instances in the current environment. |
|
|
|
:return: An iterable of ``Distribution`` instances. |
|
""" |
|
return Distribution.discover(**kwargs) |
|
|
|
|
|
def metadata(distribution_name) -> _meta.PackageMetadata: |
|
"""Get the metadata for the named package. |
|
|
|
:param distribution_name: The name of the distribution package to query. |
|
:return: A PackageMetadata containing the parsed metadata. |
|
""" |
|
return Distribution.from_name(distribution_name).metadata |
|
|
|
|
|
def version(distribution_name): |
|
"""Get the version string for the named package. |
|
|
|
:param distribution_name: The name of the distribution package to query. |
|
:return: The version string for the package as defined in the package's |
|
"Version" metadata key. |
|
""" |
|
return distribution(distribution_name).version |
|
|
|
|
|
_unique = functools.partial( |
|
unique_everseen, |
|
key=operator.attrgetter('_normalized_name'), |
|
) |
|
""" |
|
Wrapper for ``distributions`` to return unique distributions by name. |
|
""" |
|
|
|
|
|
def entry_points(**params) -> Union[EntryPoints, SelectableGroups]: |
|
"""Return EntryPoint objects for all installed packages. |
|
|
|
Pass selection parameters (group or name) to filter the |
|
result to entry points matching those properties (see |
|
EntryPoints.select()). |
|
|
|
For compatibility, returns ``SelectableGroups`` object unless |
|
selection parameters are supplied. In the future, this function |
|
will return ``EntryPoints`` instead of ``SelectableGroups`` |
|
even when no selection parameters are supplied. |
|
|
|
For maximum future compatibility, pass selection parameters |
|
or invoke ``.select`` with parameters on the result. |
|
|
|
:return: EntryPoints or SelectableGroups for all installed packages. |
|
""" |
|
eps = itertools.chain.from_iterable( |
|
dist.entry_points for dist in _unique(distributions()) |
|
) |
|
return SelectableGroups.load(eps).select(**params) |
|
|
|
|
|
def files(distribution_name): |
|
"""Return a list of files for the named package. |
|
|
|
:param distribution_name: The name of the distribution package to query. |
|
:return: List of files composing the distribution. |
|
""" |
|
return distribution(distribution_name).files |
|
|
|
|
|
def requires(distribution_name): |
|
""" |
|
Return a list of requirements for the named package. |
|
|
|
:return: An iterator of requirements, suitable for |
|
packaging.requirement.Requirement. |
|
""" |
|
return distribution(distribution_name).requires |
|
|
|
|
|
def packages_distributions() -> Mapping[str, List[str]]: |
|
""" |
|
Return a mapping of top-level packages to their |
|
distributions. |
|
|
|
>>> import collections.abc |
|
>>> pkgs = packages_distributions() |
|
>>> all(isinstance(dist, collections.abc.Sequence) for dist in pkgs.values()) |
|
True |
|
""" |
|
pkg_to_dist = collections.defaultdict(list) |
|
for dist in distributions(): |
|
for pkg in _top_level_declared(dist) or _top_level_inferred(dist): |
|
pkg_to_dist[pkg].append(dist.metadata['Name']) |
|
return dict(pkg_to_dist) |
|
|
|
|
|
def _top_level_declared(dist): |
|
return (dist.read_text('top_level.txt') or '').split() |
|
|
|
|
|
def _top_level_inferred(dist): |
|
return { |
|
f.parts[0] if len(f.parts) > 1 else f.with_suffix('').name |
|
for f in always_iterable(dist.files) |
|
if f.suffix == ".py" |
|
} |
|
|