Spaces:
				
			
			
	
			
			
		Paused
		
	
	
	
			
			
	
	
	
	
		
		
		Paused
		
	| import argparse | |
| import logging | |
| import os | |
| import stat | |
| import threading | |
| import time | |
| from errno import EIO, ENOENT | |
| from fuse import FUSE, FuseOSError, LoggingMixIn, Operations | |
| from fsspec import __version__ | |
| from fsspec.core import url_to_fs | |
| logger = logging.getLogger("fsspec.fuse") | |
| class FUSEr(Operations): | |
| def __init__(self, fs, path, ready_file=False): | |
| self.fs = fs | |
| self.cache = {} | |
| self.root = path.rstrip("/") + "/" | |
| self.counter = 0 | |
| logger.info("Starting FUSE at %s", path) | |
| self._ready_file = ready_file | |
| def getattr(self, path, fh=None): | |
| logger.debug("getattr %s", path) | |
| if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]: | |
| return {"type": "file", "st_size": 5} | |
| path = "".join([self.root, path.lstrip("/")]).rstrip("/") | |
| try: | |
| info = self.fs.info(path) | |
| except FileNotFoundError: | |
| raise FuseOSError(ENOENT) | |
| data = {"st_uid": info.get("uid", 1000), "st_gid": info.get("gid", 1000)} | |
| perm = info.get("mode", 0o777) | |
| if info["type"] != "file": | |
| data["st_mode"] = stat.S_IFDIR | perm | |
| data["st_size"] = 0 | |
| data["st_blksize"] = 0 | |
| else: | |
| data["st_mode"] = stat.S_IFREG | perm | |
| data["st_size"] = info["size"] | |
| data["st_blksize"] = 5 * 2**20 | |
| data["st_nlink"] = 1 | |
| data["st_atime"] = info["atime"] if "atime" in info else time.time() | |
| data["st_ctime"] = info["ctime"] if "ctime" in info else time.time() | |
| data["st_mtime"] = info["mtime"] if "mtime" in info else time.time() | |
| return data | |
| def readdir(self, path, fh): | |
| logger.debug("readdir %s", path) | |
| path = "".join([self.root, path.lstrip("/")]) | |
| files = self.fs.ls(path, False) | |
| files = [os.path.basename(f.rstrip("/")) for f in files] | |
| return [".", ".."] + files | |
| def mkdir(self, path, mode): | |
| path = "".join([self.root, path.lstrip("/")]) | |
| self.fs.mkdir(path) | |
| return 0 | |
| def rmdir(self, path): | |
| path = "".join([self.root, path.lstrip("/")]) | |
| self.fs.rmdir(path) | |
| return 0 | |
| def read(self, path, size, offset, fh): | |
| logger.debug("read %s", (path, size, offset)) | |
| if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]: | |
| # status indicator | |
| return b"ready" | |
| f = self.cache[fh] | |
| f.seek(offset) | |
| out = f.read(size) | |
| return out | |
| def write(self, path, data, offset, fh): | |
| logger.debug("write %s", (path, offset)) | |
| f = self.cache[fh] | |
| f.seek(offset) | |
| f.write(data) | |
| return len(data) | |
| def create(self, path, flags, fi=None): | |
| logger.debug("create %s", (path, flags)) | |
| fn = "".join([self.root, path.lstrip("/")]) | |
| self.fs.touch(fn) # OS will want to get attributes immediately | |
| f = self.fs.open(fn, "wb") | |
| self.cache[self.counter] = f | |
| self.counter += 1 | |
| return self.counter - 1 | |
| def open(self, path, flags): | |
| logger.debug("open %s", (path, flags)) | |
| fn = "".join([self.root, path.lstrip("/")]) | |
| if flags % 2 == 0: | |
| # read | |
| mode = "rb" | |
| else: | |
| # write/create | |
| mode = "wb" | |
| self.cache[self.counter] = self.fs.open(fn, mode) | |
| self.counter += 1 | |
| return self.counter - 1 | |
| def truncate(self, path, length, fh=None): | |
| fn = "".join([self.root, path.lstrip("/")]) | |
| if length != 0: | |
| raise NotImplementedError | |
| # maybe should be no-op since open with write sets size to zero anyway | |
| self.fs.touch(fn) | |
| def unlink(self, path): | |
| fn = "".join([self.root, path.lstrip("/")]) | |
| try: | |
| self.fs.rm(fn, False) | |
| except (OSError, FileNotFoundError): | |
| raise FuseOSError(EIO) | |
| def release(self, path, fh): | |
| try: | |
| if fh in self.cache: | |
| f = self.cache[fh] | |
| f.close() | |
| self.cache.pop(fh) | |
| except Exception as e: | |
| print(e) | |
| return 0 | |
| def chmod(self, path, mode): | |
| if hasattr(self.fs, "chmod"): | |
| path = "".join([self.root, path.lstrip("/")]) | |
| return self.fs.chmod(path, mode) | |
| raise NotImplementedError | |
| def run( | |
| fs, | |
| path, | |
| mount_point, | |
| foreground=True, | |
| threads=False, | |
| ready_file=False, | |
| ops_class=FUSEr, | |
| ): | |
| """Mount stuff in a local directory | |
| This uses fusepy to make it appear as if a given path on an fsspec | |
| instance is in fact resident within the local file-system. | |
| This requires that fusepy by installed, and that FUSE be available on | |
| the system (typically requiring a package to be installed with | |
| apt, yum, brew, etc.). | |
| Parameters | |
| ---------- | |
| fs: file-system instance | |
| From one of the compatible implementations | |
| path: str | |
| Location on that file-system to regard as the root directory to | |
| mount. Note that you typically should include the terminating "/" | |
| character. | |
| mount_point: str | |
| An empty directory on the local file-system where the contents of | |
| the remote path will appear. | |
| foreground: bool | |
| Whether or not calling this function will block. Operation will | |
| typically be more stable if True. | |
| threads: bool | |
| Whether or not to create threads when responding to file operations | |
| within the mounter directory. Operation will typically be more | |
| stable if False. | |
| ready_file: bool | |
| Whether the FUSE process is ready. The ``.fuse_ready`` file will | |
| exist in the ``mount_point`` directory if True. Debugging purpose. | |
| ops_class: FUSEr or Subclass of FUSEr | |
| To override the default behavior of FUSEr. For Example, logging | |
| to file. | |
| """ | |
| func = lambda: FUSE( | |
| ops_class(fs, path, ready_file=ready_file), | |
| mount_point, | |
| nothreads=not threads, | |
| foreground=foreground, | |
| ) | |
| if not foreground: | |
| th = threading.Thread(target=func) | |
| th.daemon = True | |
| th.start() | |
| return th | |
| else: # pragma: no cover | |
| try: | |
| func() | |
| except KeyboardInterrupt: | |
| pass | |
| def main(args): | |
| """Mount filesystem from chained URL to MOUNT_POINT. | |
| Examples: | |
| python3 -m fsspec.fuse memory /usr/share /tmp/mem | |
| python3 -m fsspec.fuse local /tmp/source /tmp/local \\ | |
| -l /tmp/fsspecfuse.log | |
| You can also mount chained-URLs and use special settings: | |
| python3 -m fsspec.fuse 'filecache::zip::file://data.zip' \\ | |
| / /tmp/zip \\ | |
| -o 'filecache-cache_storage=/tmp/simplecache' | |
| You can specify the type of the setting by using `[int]` or `[bool]`, | |
| (`true`, `yes`, `1` represents the Boolean value `True`): | |
| python3 -m fsspec.fuse 'simplecache::ftp://ftp1.at.proftpd.org' \\ | |
| /historic/packages/RPMS /tmp/ftp \\ | |
| -o 'simplecache-cache_storage=/tmp/simplecache' \\ | |
| -o 'simplecache-check_files=false[bool]' \\ | |
| -o 'ftp-listings_expiry_time=60[int]' \\ | |
| -o 'ftp-username=anonymous' \\ | |
| -o 'ftp-password=xieyanbo' | |
| """ | |
| class RawDescriptionArgumentParser(argparse.ArgumentParser): | |
| def format_help(self): | |
| usage = super().format_help() | |
| parts = usage.split("\n\n") | |
| parts[1] = self.description.rstrip() | |
| return "\n\n".join(parts) | |
| parser = RawDescriptionArgumentParser(prog="fsspec.fuse", description=main.__doc__) | |
| parser.add_argument("--version", action="version", version=__version__) | |
| parser.add_argument("url", type=str, help="fs url") | |
| parser.add_argument("source_path", type=str, help="source directory in fs") | |
| parser.add_argument("mount_point", type=str, help="local directory") | |
| parser.add_argument( | |
| "-o", | |
| "--option", | |
| action="append", | |
| help="Any options of protocol included in the chained URL", | |
| ) | |
| parser.add_argument( | |
| "-l", "--log-file", type=str, help="Logging FUSE debug info (Default: '')" | |
| ) | |
| parser.add_argument( | |
| "-f", | |
| "--foreground", | |
| action="store_false", | |
| help="Running in foreground or not (Default: False)", | |
| ) | |
| parser.add_argument( | |
| "-t", | |
| "--threads", | |
| action="store_false", | |
| help="Running with threads support (Default: False)", | |
| ) | |
| parser.add_argument( | |
| "-r", | |
| "--ready-file", | |
| action="store_false", | |
| help="The `.fuse_ready` file will exist after FUSE is ready. " | |
| "(Debugging purpose, Default: False)", | |
| ) | |
| args = parser.parse_args(args) | |
| kwargs = {} | |
| for item in args.option or []: | |
| key, sep, value = item.partition("=") | |
| if not sep: | |
| parser.error(message=f"Wrong option: {item!r}") | |
| val = value.lower() | |
| if val.endswith("[int]"): | |
| value = int(value[: -len("[int]")]) | |
| elif val.endswith("[bool]"): | |
| value = val[: -len("[bool]")] in ["1", "yes", "true"] | |
| if "-" in key: | |
| fs_name, setting_name = key.split("-", 1) | |
| if fs_name in kwargs: | |
| kwargs[fs_name][setting_name] = value | |
| else: | |
| kwargs[fs_name] = {setting_name: value} | |
| else: | |
| kwargs[key] = value | |
| if args.log_file: | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| filename=args.log_file, | |
| format="%(asctime)s %(message)s", | |
| ) | |
| class LoggingFUSEr(FUSEr, LoggingMixIn): | |
| pass | |
| fuser = LoggingFUSEr | |
| else: | |
| fuser = FUSEr | |
| fs, url_path = url_to_fs(args.url, **kwargs) | |
| logger.debug("Mounting %s to %s", url_path, str(args.mount_point)) | |
| run( | |
| fs, | |
| args.source_path, | |
| args.mount_point, | |
| foreground=args.foreground, | |
| threads=args.threads, | |
| ready_file=args.ready_file, | |
| ops_class=fuser, | |
| ) | |
| if __name__ == "__main__": | |
| import sys | |
| main(sys.argv[1:]) | |