Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| """Utilities for extracting common archive formats""" | |
| import zipfile | |
| import tarfile | |
| import os | |
| import shutil | |
| import posixpath | |
| import contextlib | |
| from distutils.errors import DistutilsError | |
| from ._path import ensure_directory | |
| __all__ = [ | |
| "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter", | |
| "UnrecognizedFormat", "extraction_drivers", "unpack_directory", | |
| ] | |
| class UnrecognizedFormat(DistutilsError): | |
| """Couldn't recognize the archive type""" | |
| def default_filter(src, dst): | |
| """The default progress/filter callback; returns True for all files""" | |
| return dst | |
| def unpack_archive( | |
| filename, extract_dir, progress_filter=default_filter, | |
| drivers=None): | |
| """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat`` | |
| `progress_filter` is a function taking two arguments: a source path | |
| internal to the archive ('/'-separated), and a filesystem path where it | |
| will be extracted. The callback must return the desired extract path | |
| (which may be the same as the one passed in), or else ``None`` to skip | |
| that file or directory. The callback can thus be used to report on the | |
| progress of the extraction, as well as to filter the items extracted or | |
| alter their extraction paths. | |
| `drivers`, if supplied, must be a non-empty sequence of functions with the | |
| same signature as this function (minus the `drivers` argument), that raise | |
| ``UnrecognizedFormat`` if they do not support extracting the designated | |
| archive type. The `drivers` are tried in sequence until one is found that | |
| does not raise an error, or until all are exhausted (in which case | |
| ``UnrecognizedFormat`` is raised). If you do not supply a sequence of | |
| drivers, the module's ``extraction_drivers`` constant will be used, which | |
| means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that | |
| order. | |
| """ | |
| for driver in drivers or extraction_drivers: | |
| try: | |
| driver(filename, extract_dir, progress_filter) | |
| except UnrecognizedFormat: | |
| continue | |
| else: | |
| return | |
| else: | |
| raise UnrecognizedFormat( | |
| "Not a recognized archive type: %s" % filename | |
| ) | |
| def unpack_directory(filename, extract_dir, progress_filter=default_filter): | |
| """"Unpack" a directory, using the same interface as for archives | |
| Raises ``UnrecognizedFormat`` if `filename` is not a directory | |
| """ | |
| if not os.path.isdir(filename): | |
| raise UnrecognizedFormat("%s is not a directory" % filename) | |
| paths = { | |
| filename: ('', extract_dir), | |
| } | |
| for base, dirs, files in os.walk(filename): | |
| src, dst = paths[base] | |
| for d in dirs: | |
| paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) | |
| for f in files: | |
| target = os.path.join(dst, f) | |
| target = progress_filter(src + f, target) | |
| if not target: | |
| # skip non-files | |
| continue | |
| ensure_directory(target) | |
| f = os.path.join(base, f) | |
| shutil.copyfile(f, target) | |
| shutil.copystat(f, target) | |
| def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): | |
| """Unpack zip `filename` to `extract_dir` | |
| Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined | |
| by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation | |
| of the `progress_filter` argument. | |
| """ | |
| if not zipfile.is_zipfile(filename): | |
| raise UnrecognizedFormat("%s is not a zip file" % (filename,)) | |
| with zipfile.ZipFile(filename) as z: | |
| _unpack_zipfile_obj(z, extract_dir, progress_filter) | |
| def _unpack_zipfile_obj(zipfile_obj, extract_dir, progress_filter=default_filter): | |
| """Internal/private API used by other parts of setuptools. | |
| Similar to ``unpack_zipfile``, but receives an already opened :obj:`zipfile.ZipFile` | |
| object instead of a filename. | |
| """ | |
| for info in zipfile_obj.infolist(): | |
| name = info.filename | |
| # don't extract absolute paths or ones with .. in them | |
| if name.startswith('/') or '..' in name.split('/'): | |
| continue | |
| target = os.path.join(extract_dir, *name.split('/')) | |
| target = progress_filter(name, target) | |
| if not target: | |
| continue | |
| if name.endswith('/'): | |
| # directory | |
| ensure_directory(target) | |
| else: | |
| # file | |
| ensure_directory(target) | |
| data = zipfile_obj.read(info.filename) | |
| with open(target, 'wb') as f: | |
| f.write(data) | |
| unix_attributes = info.external_attr >> 16 | |
| if unix_attributes: | |
| os.chmod(target, unix_attributes) | |
| def _resolve_tar_file_or_dir(tar_obj, tar_member_obj): | |
| """Resolve any links and extract link targets as normal files.""" | |
| while tar_member_obj is not None and ( | |
| tar_member_obj.islnk() or tar_member_obj.issym()): | |
| linkpath = tar_member_obj.linkname | |
| if tar_member_obj.issym(): | |
| base = posixpath.dirname(tar_member_obj.name) | |
| linkpath = posixpath.join(base, linkpath) | |
| linkpath = posixpath.normpath(linkpath) | |
| tar_member_obj = tar_obj._getmember(linkpath) | |
| is_file_or_dir = ( | |
| tar_member_obj is not None and | |
| (tar_member_obj.isfile() or tar_member_obj.isdir()) | |
| ) | |
| if is_file_or_dir: | |
| return tar_member_obj | |
| raise LookupError('Got unknown file type') | |
| def _iter_open_tar(tar_obj, extract_dir, progress_filter): | |
| """Emit member-destination pairs from a tar archive.""" | |
| # don't do any chowning! | |
| tar_obj.chown = lambda *args: None | |
| with contextlib.closing(tar_obj): | |
| for member in tar_obj: | |
| name = member.name | |
| # don't extract absolute paths or ones with .. in them | |
| if name.startswith('/') or '..' in name.split('/'): | |
| continue | |
| prelim_dst = os.path.join(extract_dir, *name.split('/')) | |
| try: | |
| member = _resolve_tar_file_or_dir(tar_obj, member) | |
| except LookupError: | |
| continue | |
| final_dst = progress_filter(name, prelim_dst) | |
| if not final_dst: | |
| continue | |
| if final_dst.endswith(os.sep): | |
| final_dst = final_dst[:-1] | |
| yield member, final_dst | |
| def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): | |
| """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` | |
| Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined | |
| by ``tarfile.open()``). See ``unpack_archive()`` for an explanation | |
| of the `progress_filter` argument. | |
| """ | |
| try: | |
| tarobj = tarfile.open(filename) | |
| except tarfile.TarError as e: | |
| raise UnrecognizedFormat( | |
| "%s is not a compressed or uncompressed tar file" % (filename,) | |
| ) from e | |
| for member, final_dst in _iter_open_tar( | |
| tarobj, extract_dir, progress_filter, | |
| ): | |
| try: | |
| # XXX Ugh | |
| tarobj._extract_member(member, final_dst) | |
| except tarfile.ExtractError: | |
| # chown/chmod/mkfifo/mknode/makedev failed | |
| pass | |
| return True | |
| extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile | |