|
import zipfile |
|
|
|
import fsspec |
|
from fsspec.archive import AbstractArchiveFileSystem |
|
|
|
|
|
class ZipFileSystem(AbstractArchiveFileSystem): |
|
"""Read/Write contents of ZIP archive as a file-system |
|
|
|
Keeps file object open while instance lives. |
|
|
|
This class is pickleable, but not necessarily thread-safe |
|
""" |
|
|
|
root_marker = "" |
|
protocol = "zip" |
|
cachable = False |
|
|
|
def __init__( |
|
self, |
|
fo="", |
|
mode="r", |
|
target_protocol=None, |
|
target_options=None, |
|
compression=zipfile.ZIP_STORED, |
|
allowZip64=True, |
|
compresslevel=None, |
|
**kwargs, |
|
): |
|
""" |
|
Parameters |
|
---------- |
|
fo: str or file-like |
|
Contains ZIP, and must exist. If a str, will fetch file using |
|
:meth:`~fsspec.open_files`, which must return one file exactly. |
|
mode: str |
|
Accept: "r", "w", "a" |
|
target_protocol: str (optional) |
|
If ``fo`` is a string, this value can be used to override the |
|
FS protocol inferred from a URL |
|
target_options: dict (optional) |
|
Kwargs passed when instantiating the target FS, if ``fo`` is |
|
a string. |
|
compression, allowZip64, compresslevel: passed to ZipFile |
|
Only relevant when creating a ZIP |
|
""" |
|
super().__init__(self, **kwargs) |
|
if mode not in set("rwa"): |
|
raise ValueError(f"mode '{mode}' no understood") |
|
self.mode = mode |
|
if isinstance(fo, str): |
|
if mode == "a": |
|
m = "r+b" |
|
else: |
|
m = mode + "b" |
|
fo = fsspec.open( |
|
fo, mode=m, protocol=target_protocol, **(target_options or {}) |
|
) |
|
self.of = fo |
|
self.fo = fo.__enter__() |
|
self.zip = zipfile.ZipFile( |
|
self.fo, |
|
mode=mode, |
|
compression=compression, |
|
allowZip64=allowZip64, |
|
compresslevel=compresslevel, |
|
) |
|
self.dir_cache = None |
|
|
|
@classmethod |
|
def _strip_protocol(cls, path): |
|
|
|
return super()._strip_protocol(path).lstrip("/") |
|
|
|
def __del__(self): |
|
if hasattr(self, "zip"): |
|
self.close() |
|
del self.zip |
|
|
|
def close(self): |
|
"""Commits any write changes to the file. Done on ``del`` too.""" |
|
self.zip.close() |
|
|
|
def _get_dirs(self): |
|
if self.dir_cache is None or self.mode in set("wa"): |
|
|
|
|
|
files = self.zip.infolist() |
|
self.dir_cache = { |
|
dirname.rstrip("/"): { |
|
"name": dirname.rstrip("/"), |
|
"size": 0, |
|
"type": "directory", |
|
} |
|
for dirname in self._all_dirnames(self.zip.namelist()) |
|
} |
|
for z in files: |
|
f = {s: getattr(z, s, None) for s in zipfile.ZipInfo.__slots__} |
|
f.update( |
|
{ |
|
"name": z.filename.rstrip("/"), |
|
"size": z.file_size, |
|
"type": ("directory" if z.is_dir() else "file"), |
|
} |
|
) |
|
self.dir_cache[f["name"]] = f |
|
|
|
def pipe_file(self, path, value, **kwargs): |
|
|
|
self.zip.writestr(path, value, **kwargs) |
|
|
|
def _open( |
|
self, |
|
path, |
|
mode="rb", |
|
block_size=None, |
|
autocommit=True, |
|
cache_options=None, |
|
**kwargs, |
|
): |
|
path = self._strip_protocol(path) |
|
if "r" in mode and self.mode in set("wa"): |
|
if self.exists(path): |
|
raise OSError("ZipFS can only be open for reading or writing, not both") |
|
raise FileNotFoundError(path) |
|
if "r" in self.mode and "w" in mode: |
|
raise OSError("ZipFS can only be open for reading or writing, not both") |
|
out = self.zip.open(path, mode.strip("b")) |
|
if "r" in mode: |
|
info = self.info(path) |
|
out.size = info["size"] |
|
out.name = info["name"] |
|
return out |
|
|