|
import logging |
|
import tarfile |
|
|
|
import fsspec |
|
from fsspec.archive import AbstractArchiveFileSystem |
|
from fsspec.compression import compr |
|
from fsspec.utils import infer_compression |
|
|
|
typemap = {b"0": "file", b"5": "directory"} |
|
|
|
logger = logging.getLogger("tar") |
|
|
|
|
|
class TarFileSystem(AbstractArchiveFileSystem): |
|
"""Compressed Tar archives as a file-system (read-only) |
|
|
|
Supports the following formats: |
|
tar.gz, tar.bz2, tar.xz |
|
""" |
|
|
|
root_marker = "" |
|
protocol = "tar" |
|
cachable = False |
|
|
|
def __init__( |
|
self, |
|
fo="", |
|
index_store=None, |
|
target_options=None, |
|
target_protocol=None, |
|
compression=None, |
|
**kwargs, |
|
): |
|
super().__init__(**kwargs) |
|
target_options = target_options or {} |
|
|
|
if isinstance(fo, str): |
|
self.of = fsspec.open(fo, protocol=target_protocol, **target_options) |
|
fo = self.of.open() |
|
|
|
|
|
if compression is None: |
|
name = None |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
if hasattr(fo, "original"): |
|
name = fo.original |
|
|
|
|
|
elif hasattr(fo, "path"): |
|
name = fo.path |
|
|
|
|
|
elif hasattr(fo, "name"): |
|
name = fo.name |
|
|
|
|
|
elif hasattr(fo, "info"): |
|
name = fo.info()["name"] |
|
|
|
except Exception as ex: |
|
logger.warning( |
|
f"Unable to determine file name, not inferring compression: {ex}" |
|
) |
|
|
|
if name is not None: |
|
compression = infer_compression(name) |
|
logger.info(f"Inferred compression {compression} from file name {name}") |
|
|
|
if compression is not None: |
|
|
|
|
|
fo = compr[compression](fo) |
|
|
|
self._fo_ref = fo |
|
self.fo = fo |
|
self.tar = tarfile.TarFile(fileobj=self.fo) |
|
self.dir_cache = None |
|
|
|
self.index_store = index_store |
|
self.index = None |
|
self._index() |
|
|
|
def _index(self): |
|
|
|
out = {} |
|
for ti in self.tar: |
|
info = ti.get_info() |
|
info["type"] = typemap.get(info["type"], "file") |
|
name = ti.get_info()["name"].rstrip("/") |
|
out[name] = (info, ti.offset_data) |
|
|
|
self.index = out |
|
|
|
|
|
def _get_dirs(self): |
|
if self.dir_cache is not None: |
|
return |
|
|
|
|
|
self.dir_cache = { |
|
dirname: {"name": dirname, "size": 0, "type": "directory"} |
|
for dirname in self._all_dirnames(self.tar.getnames()) |
|
} |
|
for member in self.tar.getmembers(): |
|
info = member.get_info() |
|
info["name"] = info["name"].rstrip("/") |
|
info["type"] = typemap.get(info["type"], "file") |
|
self.dir_cache[info["name"]] = info |
|
|
|
def _open(self, path, mode="rb", **kwargs): |
|
if mode != "rb": |
|
raise ValueError("Read-only filesystem implementation") |
|
details, offset = self.index[path] |
|
if details["type"] != "file": |
|
raise ValueError("Can only handle regular files") |
|
return self.tar.extractfile(path) |
|
|