Spaces:
Build error
Build error
| import os | |
| import pathlib | |
| import tempfile | |
| import functools | |
| import contextlib | |
| import types | |
| import importlib | |
| from typing import Union, Any, Optional | |
| from .abc import ResourceReader, Traversable | |
| from ._adapters import wrap_spec | |
| Package = Union[types.ModuleType, str] | |
| def files(package): | |
| # type: (Package) -> Traversable | |
| """ | |
| Get a Traversable resource from a package | |
| """ | |
| return from_package(get_package(package)) | |
| def normalize_path(path): | |
| # type: (Any) -> str | |
| """Normalize a path by ensuring it is a string. | |
| If the resulting string contains path separators, an exception is raised. | |
| """ | |
| str_path = str(path) | |
| parent, file_name = os.path.split(str_path) | |
| if parent: | |
| raise ValueError(f'{path!r} must be only a file name') | |
| return file_name | |
| def get_resource_reader(package): | |
| # type: (types.ModuleType) -> Optional[ResourceReader] | |
| """ | |
| Return the package's loader if it's a ResourceReader. | |
| """ | |
| # We can't use | |
| # a issubclass() check here because apparently abc.'s __subclasscheck__() | |
| # hook wants to create a weak reference to the object, but | |
| # zipimport.zipimporter does not support weak references, resulting in a | |
| # TypeError. That seems terrible. | |
| spec = package.__spec__ | |
| reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore | |
| if reader is None: | |
| return None | |
| return reader(spec.name) # type: ignore | |
| def resolve(cand): | |
| # type: (Package) -> types.ModuleType | |
| return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand) | |
| def get_package(package): | |
| # type: (Package) -> types.ModuleType | |
| """Take a package name or module object and return the module. | |
| Raise an exception if the resolved module is not a package. | |
| """ | |
| resolved = resolve(package) | |
| if wrap_spec(resolved).submodule_search_locations is None: | |
| raise TypeError(f'{package!r} is not a package') | |
| return resolved | |
| def from_package(package): | |
| """ | |
| Return a Traversable object for the given package. | |
| """ | |
| spec = wrap_spec(package) | |
| reader = spec.loader.get_resource_reader(spec.name) | |
| return reader.files() | |
| def _tempfile(reader, suffix='', | |
| # gh-93353: Keep a reference to call os.remove() in late Python | |
| # finalization. | |
| *, _os_remove=os.remove): | |
| # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' | |
| # blocks due to the need to close the temporary file to work on Windows | |
| # properly. | |
| fd, raw_path = tempfile.mkstemp(suffix=suffix) | |
| try: | |
| os.write(fd, reader()) | |
| os.close(fd) | |
| del reader | |
| yield pathlib.Path(raw_path) | |
| finally: | |
| try: | |
| _os_remove(raw_path) | |
| except FileNotFoundError: | |
| pass | |
| def as_file(path): | |
| """ | |
| Given a Traversable object, return that object as a | |
| path on the local file system in a context manager. | |
| """ | |
| return _tempfile(path.read_bytes, suffix=path.name) | |
| def _(path): | |
| """ | |
| Degenerate behavior for pathlib.Path objects. | |
| """ | |
| yield path | |