|
from __future__ import annotations |
|
|
|
import logging |
|
from datetime import datetime, timezone |
|
from errno import ENOTEMPTY |
|
from io import BytesIO |
|
from typing import Any, ClassVar |
|
|
|
from fsspec import AbstractFileSystem |
|
|
|
logger = logging.Logger("fsspec.memoryfs") |
|
|
|
|
|
class MemoryFileSystem(AbstractFileSystem): |
|
"""A filesystem based on a dict of BytesIO objects |
|
|
|
This is a global filesystem so instances of this class all point to the same |
|
in memory filesystem. |
|
""" |
|
|
|
store: ClassVar[dict[str, Any]] = {} |
|
pseudo_dirs = [""] |
|
protocol = "memory" |
|
root_marker = "/" |
|
|
|
@classmethod |
|
def _strip_protocol(cls, path): |
|
if path.startswith("memory://"): |
|
path = path[len("memory://") :] |
|
if "::" in path or "://" in path: |
|
return path.rstrip("/") |
|
path = path.lstrip("/").rstrip("/") |
|
return "/" + path if path else "" |
|
|
|
def ls(self, path, detail=True, **kwargs): |
|
path = self._strip_protocol(path) |
|
if path in self.store: |
|
|
|
if not detail: |
|
return [path] |
|
return [ |
|
{ |
|
"name": path, |
|
"size": self.store[path].size, |
|
"type": "file", |
|
"created": self.store[path].created.timestamp(), |
|
} |
|
] |
|
paths = set() |
|
starter = path + "/" |
|
out = [] |
|
for p2 in tuple(self.store): |
|
if p2.startswith(starter): |
|
if "/" not in p2[len(starter) :]: |
|
|
|
out.append( |
|
{ |
|
"name": p2, |
|
"size": self.store[p2].size, |
|
"type": "file", |
|
"created": self.store[p2].created.timestamp(), |
|
} |
|
) |
|
elif len(p2) > len(starter): |
|
|
|
ppath = starter + p2[len(starter) :].split("/", 1)[0] |
|
if ppath not in paths: |
|
out = out or [] |
|
out.append( |
|
{ |
|
"name": ppath, |
|
"size": 0, |
|
"type": "directory", |
|
} |
|
) |
|
paths.add(ppath) |
|
for p2 in self.pseudo_dirs: |
|
if p2.startswith(starter): |
|
if "/" not in p2[len(starter) :]: |
|
|
|
if p2 not in paths: |
|
out.append({"name": p2, "size": 0, "type": "directory"}) |
|
paths.add(p2) |
|
else: |
|
|
|
ppath = starter + p2[len(starter) :].split("/", 1)[0] |
|
if ppath not in paths: |
|
out.append({"name": ppath, "size": 0, "type": "directory"}) |
|
paths.add(ppath) |
|
if not out: |
|
if path in self.pseudo_dirs: |
|
|
|
return [] |
|
raise FileNotFoundError(path) |
|
if detail: |
|
return out |
|
return sorted([f["name"] for f in out]) |
|
|
|
def mkdir(self, path, create_parents=True, **kwargs): |
|
path = self._strip_protocol(path) |
|
if path in self.store or path in self.pseudo_dirs: |
|
raise FileExistsError(path) |
|
if self._parent(path).strip("/") and self.isfile(self._parent(path)): |
|
raise NotADirectoryError(self._parent(path)) |
|
if create_parents and self._parent(path).strip("/"): |
|
try: |
|
self.mkdir(self._parent(path), create_parents, **kwargs) |
|
except FileExistsError: |
|
pass |
|
if path and path not in self.pseudo_dirs: |
|
self.pseudo_dirs.append(path) |
|
|
|
def makedirs(self, path, exist_ok=False): |
|
try: |
|
self.mkdir(path, create_parents=True) |
|
except FileExistsError: |
|
if not exist_ok: |
|
raise |
|
|
|
def pipe_file(self, path, value, **kwargs): |
|
"""Set the bytes of given file |
|
|
|
Avoids copies of the data if possible |
|
""" |
|
self.open(path, "wb", data=value) |
|
|
|
def rmdir(self, path): |
|
path = self._strip_protocol(path) |
|
if path == "": |
|
|
|
return |
|
if path in self.pseudo_dirs: |
|
if not self.ls(path): |
|
self.pseudo_dirs.remove(path) |
|
else: |
|
raise OSError(ENOTEMPTY, "Directory not empty", path) |
|
else: |
|
raise FileNotFoundError(path) |
|
|
|
def info(self, path, **kwargs): |
|
path = self._strip_protocol(path) |
|
if path in self.pseudo_dirs or any( |
|
p.startswith(path + "/") for p in list(self.store) + self.pseudo_dirs |
|
): |
|
return { |
|
"name": path, |
|
"size": 0, |
|
"type": "directory", |
|
} |
|
elif path in self.store: |
|
filelike = self.store[path] |
|
return { |
|
"name": path, |
|
"size": filelike.size, |
|
"type": "file", |
|
"created": getattr(filelike, "created", None), |
|
} |
|
else: |
|
raise FileNotFoundError(path) |
|
|
|
def _open( |
|
self, |
|
path, |
|
mode="rb", |
|
block_size=None, |
|
autocommit=True, |
|
cache_options=None, |
|
**kwargs, |
|
): |
|
path = self._strip_protocol(path) |
|
if path in self.pseudo_dirs: |
|
raise IsADirectoryError(path) |
|
parent = path |
|
while len(parent) > 1: |
|
parent = self._parent(parent) |
|
if self.isfile(parent): |
|
raise FileExistsError(parent) |
|
if mode in ["rb", "ab", "r+b"]: |
|
if path in self.store: |
|
f = self.store[path] |
|
if mode == "ab": |
|
|
|
f.seek(0, 2) |
|
else: |
|
|
|
f.seek(0) |
|
return f |
|
else: |
|
raise FileNotFoundError(path) |
|
elif mode == "wb": |
|
m = MemoryFile(self, path, kwargs.get("data")) |
|
if not self._intrans: |
|
m.commit() |
|
return m |
|
else: |
|
name = self.__class__.__name__ |
|
raise ValueError(f"unsupported file mode for {name}: {mode!r}") |
|
|
|
def cp_file(self, path1, path2, **kwargs): |
|
path1 = self._strip_protocol(path1) |
|
path2 = self._strip_protocol(path2) |
|
if self.isfile(path1): |
|
self.store[path2] = MemoryFile( |
|
self, path2, self.store[path1].getvalue() |
|
) |
|
elif self.isdir(path1): |
|
if path2 not in self.pseudo_dirs: |
|
self.pseudo_dirs.append(path2) |
|
else: |
|
raise FileNotFoundError(path1) |
|
|
|
def cat_file(self, path, start=None, end=None, **kwargs): |
|
path = self._strip_protocol(path) |
|
try: |
|
return bytes(self.store[path].getbuffer()[start:end]) |
|
except KeyError: |
|
raise FileNotFoundError(path) |
|
|
|
def _rm(self, path): |
|
path = self._strip_protocol(path) |
|
try: |
|
del self.store[path] |
|
except KeyError as e: |
|
raise FileNotFoundError(path) from e |
|
|
|
def modified(self, path): |
|
path = self._strip_protocol(path) |
|
try: |
|
return self.store[path].modified |
|
except KeyError: |
|
raise FileNotFoundError(path) |
|
|
|
def created(self, path): |
|
path = self._strip_protocol(path) |
|
try: |
|
return self.store[path].created |
|
except KeyError: |
|
raise FileNotFoundError(path) |
|
|
|
def rm(self, path, recursive=False, maxdepth=None): |
|
if isinstance(path, str): |
|
path = self._strip_protocol(path) |
|
else: |
|
path = [self._strip_protocol(p) for p in path] |
|
paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) |
|
for p in reversed(paths): |
|
|
|
|
|
|
|
|
|
if not self.exists(p): |
|
continue |
|
if self.isfile(p): |
|
self.rm_file(p) |
|
else: |
|
self.rmdir(p) |
|
|
|
|
|
class MemoryFile(BytesIO): |
|
"""A BytesIO which can't close and works as a context manager |
|
|
|
Can initialise with data. Each path should only be active once at any moment. |
|
|
|
No need to provide fs, path if auto-committing (default) |
|
""" |
|
|
|
def __init__(self, fs=None, path=None, data=None): |
|
logger.debug("open file %s", path) |
|
self.fs = fs |
|
self.path = path |
|
self.created = datetime.now(tz=timezone.utc) |
|
self.modified = datetime.now(tz=timezone.utc) |
|
if data: |
|
super().__init__(data) |
|
self.seek(0) |
|
|
|
@property |
|
def size(self): |
|
return self.getbuffer().nbytes |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def close(self): |
|
pass |
|
|
|
def discard(self): |
|
pass |
|
|
|
def commit(self): |
|
self.fs.store[self.path] = self |
|
self.modified = datetime.now(tz=timezone.utc) |
|
|