"""Helper functions for a standard streaming compression API""" from zipfile import ZipFile import fsspec.utils from fsspec.spec import AbstractBufferedFile def noop_file(file, mode, **kwargs): return file # TODO: files should also be available as contexts # should be functions of the form func(infile, mode=, **kwargs) -> file-like compr = {None: noop_file} def register_compression(name, callback, extensions, force=False): """Register an "inferable" file compression type. Registers transparent file compression type for use with fsspec.open. Compression can be specified by name in open, or "infer"-ed for any files ending with the given extensions. Args: name: (str) The compression type name. Eg. "gzip". callback: A callable of form (infile, mode, **kwargs) -> file-like. Accepts an input file-like object, the target mode and kwargs. Returns a wrapped file-like object. extensions: (str, Iterable[str]) A file extension, or list of file extensions for which to infer this compression scheme. Eg. "gz". force: (bool) Force re-registration of compression type or extensions. Raises: ValueError: If name or extensions already registered, and not force. """ if isinstance(extensions, str): extensions = [extensions] # Validate registration if name in compr and not force: raise ValueError(f"Duplicate compression registration: {name}") for ext in extensions: if ext in fsspec.utils.compressions and not force: raise ValueError(f"Duplicate compression file extension: {ext} ({name})") compr[name] = callback for ext in extensions: fsspec.utils.compressions[ext] = name def unzip(infile, mode="rb", filename=None, **kwargs): if "r" not in mode: filename = filename or "file" z = ZipFile(infile, mode="w", **kwargs) fo = z.open(filename, mode="w") fo.close = lambda closer=fo.close: closer() or z.close() return fo z = ZipFile(infile) if filename is None: filename = z.namelist()[0] return z.open(filename, mode="r", **kwargs) register_compression("zip", unzip, "zip") try: from bz2 import BZ2File except ImportError: pass else: register_compression("bz2", BZ2File, "bz2") try: # pragma: no cover from isal import igzip def isal(infile, mode="rb", **kwargs): return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs) register_compression("gzip", isal, "gz") except ImportError: from gzip import GzipFile register_compression( "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz" ) try: from lzma import LZMAFile register_compression("lzma", LZMAFile, "xz") register_compression("xz", LZMAFile, "xz", force=True) except ImportError: pass try: import lzmaffi register_compression("lzma", lzmaffi.LZMAFile, "xz", force=True) register_compression("xz", lzmaffi.LZMAFile, "xz", force=True) except ImportError: pass class SnappyFile(AbstractBufferedFile): def __init__(self, infile, mode, **kwargs): import snappy super().__init__( fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs ) self.infile = infile if "r" in mode: self.codec = snappy.StreamDecompressor() else: self.codec = snappy.StreamCompressor() def _upload_chunk(self, final=False): self.buffer.seek(0) out = self.codec.add_chunk(self.buffer.read()) self.infile.write(out) return True def seek(self, loc, whence=0): raise NotImplementedError("SnappyFile is not seekable") def seekable(self): return False def _fetch_range(self, start, end): """Get the specified set of bytes from remote""" data = self.infile.read(end - start) return self.codec.decompress(data) try: import snappy snappy.compress # Snappy may use the .sz file extension, but this is not part of the # standard implementation. register_compression("snappy", SnappyFile, []) except (ImportError, NameError, AttributeError): pass try: import lz4.frame register_compression("lz4", lz4.frame.open, "lz4") except ImportError: pass try: import zstandard as zstd def zstandard_file(infile, mode="rb"): if "r" in mode: cctx = zstd.ZstdDecompressor() return cctx.stream_reader(infile) else: cctx = zstd.ZstdCompressor(level=10) return cctx.stream_writer(infile) register_compression("zstd", zstandard_file, "zst") except ImportError: pass def available_compressions(): """Return a list of the implemented compressions.""" return list(compr)