Spaces:
Running
Running
import asyncio | |
import asyncio.events | |
import functools | |
import inspect | |
import io | |
import numbers | |
import os | |
import re | |
import threading | |
from contextlib import contextmanager | |
from glob import has_magic | |
from typing import TYPE_CHECKING, Iterable | |
from .callbacks import DEFAULT_CALLBACK | |
from .exceptions import FSTimeoutError | |
from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep | |
from .spec import AbstractBufferedFile, AbstractFileSystem | |
from .utils import glob_translate, is_exception, other_paths | |
private = re.compile("_[^_]") | |
iothread = [None] # dedicated fsspec IO thread | |
loop = [None] # global event loop for any non-async instance | |
_lock = None # global lock placeholder | |
get_running_loop = asyncio.get_running_loop | |
def get_lock(): | |
"""Allocate or return a threading lock. | |
The lock is allocated on first use to allow setting one lock per forked process. | |
""" | |
global _lock | |
if not _lock: | |
_lock = threading.Lock() | |
return _lock | |
def reset_lock(): | |
"""Reset the global lock. | |
This should be called only on the init of a forked process to reset the lock to | |
None, enabling the new forked process to get a new lock. | |
""" | |
global _lock | |
iothread[0] = None | |
loop[0] = None | |
_lock = None | |
async def _runner(event, coro, result, timeout=None): | |
timeout = timeout if timeout else None # convert 0 or 0.0 to None | |
if timeout is not None: | |
coro = asyncio.wait_for(coro, timeout=timeout) | |
try: | |
result[0] = await coro | |
except Exception as ex: | |
result[0] = ex | |
finally: | |
event.set() | |
def sync(loop, func, *args, timeout=None, **kwargs): | |
""" | |
Make loop run coroutine until it returns. Runs in other thread | |
Examples | |
-------- | |
>>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, | |
timeout=timeout, **kwargs) | |
""" | |
timeout = timeout if timeout else None # convert 0 or 0.0 to None | |
# NB: if the loop is not running *yet*, it is OK to submit work | |
# and we will wait for it | |
if loop is None or loop.is_closed(): | |
raise RuntimeError("Loop is not running") | |
try: | |
loop0 = asyncio.events.get_running_loop() | |
if loop0 is loop: | |
raise NotImplementedError("Calling sync() from within a running loop") | |
except NotImplementedError: | |
raise | |
except RuntimeError: | |
pass | |
coro = func(*args, **kwargs) | |
result = [None] | |
event = threading.Event() | |
asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) | |
while True: | |
# this loops allows thread to get interrupted | |
if event.wait(1): | |
break | |
if timeout is not None: | |
timeout -= 1 | |
if timeout < 0: | |
raise FSTimeoutError | |
return_result = result[0] | |
if isinstance(return_result, asyncio.TimeoutError): | |
# suppress asyncio.TimeoutError, raise FSTimeoutError | |
raise FSTimeoutError from return_result | |
elif isinstance(return_result, BaseException): | |
raise return_result | |
else: | |
return return_result | |
def sync_wrapper(func, obj=None): | |
"""Given a function, make so can be called in blocking contexts | |
Leave obj=None if defining within a class. Pass the instance if attaching | |
as an attribute of the instance. | |
""" | |
def wrapper(*args, **kwargs): | |
self = obj or args[0] | |
return sync(self.loop, func, *args, **kwargs) | |
return wrapper | |
def _selector_policy(): | |
original_policy = asyncio.get_event_loop_policy() | |
try: | |
if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): | |
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
yield | |
finally: | |
asyncio.set_event_loop_policy(original_policy) | |
def get_loop(): | |
"""Create or return the default fsspec IO loop | |
The loop will be running on a separate thread. | |
""" | |
if loop[0] is None: | |
with get_lock(): | |
# repeat the check just in case the loop got filled between the | |
# previous two calls from another thread | |
if loop[0] is None: | |
with _selector_policy(): | |
loop[0] = asyncio.new_event_loop() | |
th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") | |
th.daemon = True | |
th.start() | |
iothread[0] = th | |
return loop[0] | |
if TYPE_CHECKING: | |
import resource | |
ResourceError = resource.error | |
else: | |
try: | |
import resource | |
except ImportError: | |
resource = None | |
ResourceError = OSError | |
else: | |
ResourceError = getattr(resource, "error", OSError) | |
_DEFAULT_BATCH_SIZE = 128 | |
_NOFILES_DEFAULT_BATCH_SIZE = 1280 | |
def _get_batch_size(nofiles=False): | |
from fsspec.config import conf | |
if nofiles: | |
if "nofiles_gather_batch_size" in conf: | |
return conf["nofiles_gather_batch_size"] | |
else: | |
if "gather_batch_size" in conf: | |
return conf["gather_batch_size"] | |
if nofiles: | |
return _NOFILES_DEFAULT_BATCH_SIZE | |
if resource is None: | |
return _DEFAULT_BATCH_SIZE | |
try: | |
soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) | |
except (ImportError, ValueError, ResourceError): | |
return _DEFAULT_BATCH_SIZE | |
if soft_limit == resource.RLIM_INFINITY: | |
return -1 | |
else: | |
return soft_limit // 8 | |
def running_async() -> bool: | |
"""Being executed by an event loop?""" | |
try: | |
asyncio.get_running_loop() | |
return True | |
except RuntimeError: | |
return False | |
async def _run_coros_in_chunks( | |
coros, | |
batch_size=None, | |
callback=DEFAULT_CALLBACK, | |
timeout=None, | |
return_exceptions=False, | |
nofiles=False, | |
): | |
"""Run the given coroutines in chunks. | |
Parameters | |
---------- | |
coros: list of coroutines to run | |
batch_size: int or None | |
Number of coroutines to submit/wait on simultaneously. | |
If -1, then it will not be any throttling. If | |
None, it will be inferred from _get_batch_size() | |
callback: fsspec.callbacks.Callback instance | |
Gets a relative_update when each coroutine completes | |
timeout: number or None | |
If given, each coroutine times out after this time. Note that, since | |
there are multiple batches, the total run time of this function will in | |
general be longer | |
return_exceptions: bool | |
Same meaning as in asyncio.gather | |
nofiles: bool | |
If inferring the batch_size, does this operation involve local files? | |
If yes, you normally expect smaller batches. | |
""" | |
if batch_size is None: | |
batch_size = _get_batch_size(nofiles=nofiles) | |
if batch_size == -1: | |
batch_size = len(coros) | |
assert batch_size > 0 | |
async def _run_coro(coro, i): | |
try: | |
return await asyncio.wait_for(coro, timeout=timeout), i | |
except Exception as e: | |
if not return_exceptions: | |
raise | |
return e, i | |
finally: | |
callback.relative_update(1) | |
i = 0 | |
n = len(coros) | |
results = [None] * n | |
pending = set() | |
while pending or i < n: | |
while len(pending) < batch_size and i < n: | |
pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) | |
i += 1 | |
if not pending: | |
break | |
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) | |
while done: | |
result, k = await done.pop() | |
results[k] = result | |
return results | |
# these methods should be implemented as async by any async-able backend | |
async_methods = [ | |
"_ls", | |
"_cat_file", | |
"_get_file", | |
"_put_file", | |
"_rm_file", | |
"_cp_file", | |
"_pipe_file", | |
"_expand_path", | |
"_info", | |
"_isfile", | |
"_isdir", | |
"_exists", | |
"_walk", | |
"_glob", | |
"_find", | |
"_du", | |
"_size", | |
"_mkdir", | |
"_makedirs", | |
] | |
class AsyncFileSystem(AbstractFileSystem): | |
"""Async file operations, default implementations | |
Passes bulk operations to asyncio.gather for concurrent operation. | |
Implementations that have concurrent batch operations and/or async methods | |
should inherit from this class instead of AbstractFileSystem. Docstrings are | |
copied from the un-underscored method in AbstractFileSystem, if not given. | |
""" | |
# note that methods do not have docstring here; they will be copied | |
# for _* methods and inferred for overridden methods. | |
async_impl = True | |
mirror_sync_methods = True | |
disable_throttling = False | |
def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): | |
self.asynchronous = asynchronous | |
self._pid = os.getpid() | |
if not asynchronous: | |
self._loop = loop or get_loop() | |
else: | |
self._loop = None | |
self.batch_size = batch_size | |
super().__init__(*args, **kwargs) | |
def loop(self): | |
if self._pid != os.getpid(): | |
raise RuntimeError("This class is not fork-safe") | |
return self._loop | |
async def _rm_file(self, path, **kwargs): | |
raise NotImplementedError | |
async def _rm(self, path, recursive=False, batch_size=None, **kwargs): | |
# TODO: implement on_error | |
batch_size = batch_size or self.batch_size | |
path = await self._expand_path(path, recursive=recursive) | |
return await _run_coros_in_chunks( | |
[self._rm_file(p, **kwargs) for p in reversed(path)], | |
batch_size=batch_size, | |
nofiles=True, | |
) | |
async def _cp_file(self, path1, path2, **kwargs): | |
raise NotImplementedError | |
async def _copy( | |
self, | |
path1, | |
path2, | |
recursive=False, | |
on_error=None, | |
maxdepth=None, | |
batch_size=None, | |
**kwargs, | |
): | |
if on_error is None and recursive: | |
on_error = "ignore" | |
elif on_error is None: | |
on_error = "raise" | |
if isinstance(path1, list) and isinstance(path2, list): | |
# No need to expand paths when both source and destination | |
# are provided as lists | |
paths1 = path1 | |
paths2 = path2 | |
else: | |
source_is_str = isinstance(path1, str) | |
paths1 = await self._expand_path( | |
path1, maxdepth=maxdepth, recursive=recursive | |
) | |
if source_is_str and (not recursive or maxdepth is not None): | |
# Non-recursive glob does not copy directories | |
paths1 = [ | |
p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) | |
] | |
if not paths1: | |
return | |
source_is_file = len(paths1) == 1 | |
dest_is_dir = isinstance(path2, str) and ( | |
trailing_sep(path2) or await self._isdir(path2) | |
) | |
exists = source_is_str and ( | |
(has_magic(path1) and source_is_file) | |
or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) | |
) | |
paths2 = other_paths( | |
paths1, | |
path2, | |
exists=exists, | |
flatten=not source_is_str, | |
) | |
batch_size = batch_size or self.batch_size | |
coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] | |
result = await _run_coros_in_chunks( | |
coros, batch_size=batch_size, return_exceptions=True, nofiles=True | |
) | |
for ex in filter(is_exception, result): | |
if on_error == "ignore" and isinstance(ex, FileNotFoundError): | |
continue | |
raise ex | |
async def _pipe_file(self, path, value, **kwargs): | |
raise NotImplementedError | |
async def _pipe(self, path, value=None, batch_size=None, **kwargs): | |
if isinstance(path, str): | |
path = {path: value} | |
batch_size = batch_size or self.batch_size | |
return await _run_coros_in_chunks( | |
[self._pipe_file(k, v, **kwargs) for k, v in path.items()], | |
batch_size=batch_size, | |
nofiles=True, | |
) | |
async def _process_limits(self, url, start, end): | |
"""Helper for "Range"-based _cat_file""" | |
size = None | |
suff = False | |
if start is not None and start < 0: | |
# if start is negative and end None, end is the "suffix length" | |
if end is None: | |
end = -start | |
start = "" | |
suff = True | |
else: | |
size = size or (await self._info(url))["size"] | |
start = size + start | |
elif start is None: | |
start = 0 | |
if not suff: | |
if end is not None and end < 0: | |
if start is not None: | |
size = size or (await self._info(url))["size"] | |
end = size + end | |
elif end is None: | |
end = "" | |
if isinstance(end, numbers.Integral): | |
end -= 1 # bytes range is inclusive | |
return f"bytes={start}-{end}" | |
async def _cat_file(self, path, start=None, end=None, **kwargs): | |
raise NotImplementedError | |
async def _cat( | |
self, path, recursive=False, on_error="raise", batch_size=None, **kwargs | |
): | |
paths = await self._expand_path(path, recursive=recursive) | |
coros = [self._cat_file(path, **kwargs) for path in paths] | |
batch_size = batch_size or self.batch_size | |
out = await _run_coros_in_chunks( | |
coros, batch_size=batch_size, nofiles=True, return_exceptions=True | |
) | |
if on_error == "raise": | |
ex = next(filter(is_exception, out), False) | |
if ex: | |
raise ex | |
if ( | |
len(paths) > 1 | |
or isinstance(path, list) | |
or paths[0] != self._strip_protocol(path) | |
): | |
return { | |
k: v | |
for k, v in zip(paths, out) | |
if on_error != "omit" or not is_exception(v) | |
} | |
else: | |
return out[0] | |
async def _cat_ranges( | |
self, | |
paths, | |
starts, | |
ends, | |
max_gap=None, | |
batch_size=None, | |
on_error="return", | |
**kwargs, | |
): | |
"""Get the contents of byte ranges from one or more files | |
Parameters | |
---------- | |
paths: list | |
A list of of filepaths on this filesystems | |
starts, ends: int or list | |
Bytes limits of the read. If using a single int, the same value will be | |
used to read all the specified files. | |
""" | |
# TODO: on_error | |
if max_gap is not None: | |
# use utils.merge_offset_ranges | |
raise NotImplementedError | |
if not isinstance(paths, list): | |
raise TypeError | |
if not isinstance(starts, Iterable): | |
starts = [starts] * len(paths) | |
if not isinstance(ends, Iterable): | |
ends = [ends] * len(paths) | |
if len(starts) != len(paths) or len(ends) != len(paths): | |
raise ValueError | |
coros = [ | |
self._cat_file(p, start=s, end=e, **kwargs) | |
for p, s, e in zip(paths, starts, ends) | |
] | |
batch_size = batch_size or self.batch_size | |
return await _run_coros_in_chunks( | |
coros, batch_size=batch_size, nofiles=True, return_exceptions=True | |
) | |
async def _put_file(self, lpath, rpath, **kwargs): | |
raise NotImplementedError | |
async def _put( | |
self, | |
lpath, | |
rpath, | |
recursive=False, | |
callback=DEFAULT_CALLBACK, | |
batch_size=None, | |
maxdepth=None, | |
**kwargs, | |
): | |
"""Copy file(s) from local. | |
Copies a specific file or tree of files (if recursive=True). If rpath | |
ends with a "/", it will be assumed to be a directory, and target files | |
will go within. | |
The put_file method will be called concurrently on a batch of files. The | |
batch_size option can configure the amount of futures that can be executed | |
at the same time. If it is -1, then all the files will be uploaded concurrently. | |
The default can be set for this instance by passing "batch_size" in the | |
constructor, or for all instances by setting the "gather_batch_size" key | |
in ``fsspec.config.conf``, falling back to 1/8th of the system limit . | |
""" | |
if isinstance(lpath, list) and isinstance(rpath, list): | |
# No need to expand paths when both source and destination | |
# are provided as lists | |
rpaths = rpath | |
lpaths = lpath | |
else: | |
source_is_str = isinstance(lpath, str) | |
if source_is_str: | |
lpath = make_path_posix(lpath) | |
fs = LocalFileSystem() | |
lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) | |
if source_is_str and (not recursive or maxdepth is not None): | |
# Non-recursive glob does not copy directories | |
lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] | |
if not lpaths: | |
return | |
source_is_file = len(lpaths) == 1 | |
dest_is_dir = isinstance(rpath, str) and ( | |
trailing_sep(rpath) or await self._isdir(rpath) | |
) | |
rpath = self._strip_protocol(rpath) | |
exists = source_is_str and ( | |
(has_magic(lpath) and source_is_file) | |
or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) | |
) | |
rpaths = other_paths( | |
lpaths, | |
rpath, | |
exists=exists, | |
flatten=not source_is_str, | |
) | |
is_dir = {l: os.path.isdir(l) for l in lpaths} | |
rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] | |
file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] | |
await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) | |
batch_size = batch_size or self.batch_size | |
coros = [] | |
callback.set_size(len(file_pairs)) | |
for lfile, rfile in file_pairs: | |
put_file = callback.branch_coro(self._put_file) | |
coros.append(put_file(lfile, rfile, **kwargs)) | |
return await _run_coros_in_chunks( | |
coros, batch_size=batch_size, callback=callback | |
) | |
async def _get_file(self, rpath, lpath, **kwargs): | |
raise NotImplementedError | |
async def _get( | |
self, | |
rpath, | |
lpath, | |
recursive=False, | |
callback=DEFAULT_CALLBACK, | |
maxdepth=None, | |
**kwargs, | |
): | |
"""Copy file(s) to local. | |
Copies a specific file or tree of files (if recursive=True). If lpath | |
ends with a "/", it will be assumed to be a directory, and target files | |
will go within. Can submit a list of paths, which may be glob-patterns | |
and will be expanded. | |
The get_file method will be called concurrently on a batch of files. The | |
batch_size option can configure the amount of futures that can be executed | |
at the same time. If it is -1, then all the files will be uploaded concurrently. | |
The default can be set for this instance by passing "batch_size" in the | |
constructor, or for all instances by setting the "gather_batch_size" key | |
in ``fsspec.config.conf``, falling back to 1/8th of the system limit . | |
""" | |
if isinstance(lpath, list) and isinstance(rpath, list): | |
# No need to expand paths when both source and destination | |
# are provided as lists | |
rpaths = rpath | |
lpaths = lpath | |
else: | |
source_is_str = isinstance(rpath, str) | |
# First check for rpath trailing slash as _strip_protocol removes it. | |
source_not_trailing_sep = source_is_str and not trailing_sep(rpath) | |
rpath = self._strip_protocol(rpath) | |
rpaths = await self._expand_path( | |
rpath, recursive=recursive, maxdepth=maxdepth | |
) | |
if source_is_str and (not recursive or maxdepth is not None): | |
# Non-recursive glob does not copy directories | |
rpaths = [ | |
p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) | |
] | |
if not rpaths: | |
return | |
lpath = make_path_posix(lpath) | |
source_is_file = len(rpaths) == 1 | |
dest_is_dir = isinstance(lpath, str) and ( | |
trailing_sep(lpath) or LocalFileSystem().isdir(lpath) | |
) | |
exists = source_is_str and ( | |
(has_magic(rpath) and source_is_file) | |
or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) | |
) | |
lpaths = other_paths( | |
rpaths, | |
lpath, | |
exists=exists, | |
flatten=not source_is_str, | |
) | |
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] | |
batch_size = kwargs.pop("batch_size", self.batch_size) | |
coros = [] | |
callback.set_size(len(lpaths)) | |
for lpath, rpath in zip(lpaths, rpaths): | |
get_file = callback.branch_coro(self._get_file) | |
coros.append(get_file(rpath, lpath, **kwargs)) | |
return await _run_coros_in_chunks( | |
coros, batch_size=batch_size, callback=callback | |
) | |
async def _isfile(self, path): | |
try: | |
return (await self._info(path))["type"] == "file" | |
except: # noqa: E722 | |
return False | |
async def _isdir(self, path): | |
try: | |
return (await self._info(path))["type"] == "directory" | |
except OSError: | |
return False | |
async def _size(self, path): | |
return (await self._info(path)).get("size", None) | |
async def _sizes(self, paths, batch_size=None): | |
batch_size = batch_size or self.batch_size | |
return await _run_coros_in_chunks( | |
[self._size(p) for p in paths], batch_size=batch_size | |
) | |
async def _exists(self, path, **kwargs): | |
try: | |
await self._info(path, **kwargs) | |
return True | |
except FileNotFoundError: | |
return False | |
async def _info(self, path, **kwargs): | |
raise NotImplementedError | |
async def _ls(self, path, detail=True, **kwargs): | |
raise NotImplementedError | |
async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): | |
if maxdepth is not None and maxdepth < 1: | |
raise ValueError("maxdepth must be at least 1") | |
path = self._strip_protocol(path) | |
full_dirs = {} | |
dirs = {} | |
files = {} | |
detail = kwargs.pop("detail", False) | |
try: | |
listing = await self._ls(path, detail=True, **kwargs) | |
except (FileNotFoundError, OSError) as e: | |
if on_error == "raise": | |
raise | |
elif callable(on_error): | |
on_error(e) | |
if detail: | |
yield path, {}, {} | |
else: | |
yield path, [], [] | |
return | |
for info in listing: | |
# each info name must be at least [path]/part , but here | |
# we check also for names like [path]/part/ | |
pathname = info["name"].rstrip("/") | |
name = pathname.rsplit("/", 1)[-1] | |
if info["type"] == "directory" and pathname != path: | |
# do not include "self" path | |
full_dirs[name] = pathname | |
dirs[name] = info | |
elif pathname == path: | |
# file-like with same name as give path | |
files[""] = info | |
else: | |
files[name] = info | |
if detail: | |
yield path, dirs, files | |
else: | |
yield path, list(dirs), list(files) | |
if maxdepth is not None: | |
maxdepth -= 1 | |
if maxdepth < 1: | |
return | |
for d in dirs: | |
async for _ in self._walk( | |
full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs | |
): | |
yield _ | |
async def _glob(self, path, maxdepth=None, **kwargs): | |
if maxdepth is not None and maxdepth < 1: | |
raise ValueError("maxdepth must be at least 1") | |
import re | |
seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) | |
ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash | |
path = self._strip_protocol(path) | |
append_slash_to_dirname = ends_with_sep or path.endswith( | |
tuple(sep + "**" for sep in seps) | |
) | |
idx_star = path.find("*") if path.find("*") >= 0 else len(path) | |
idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) | |
idx_brace = path.find("[") if path.find("[") >= 0 else len(path) | |
min_idx = min(idx_star, idx_qmark, idx_brace) | |
detail = kwargs.pop("detail", False) | |
if not has_magic(path): | |
if await self._exists(path, **kwargs): | |
if not detail: | |
return [path] | |
else: | |
return {path: await self._info(path, **kwargs)} | |
else: | |
if not detail: | |
return [] # glob of non-existent returns empty | |
else: | |
return {} | |
elif "/" in path[:min_idx]: | |
min_idx = path[:min_idx].rindex("/") | |
root = path[: min_idx + 1] | |
depth = path[min_idx + 1 :].count("/") + 1 | |
else: | |
root = "" | |
depth = path[min_idx + 1 :].count("/") + 1 | |
if "**" in path: | |
if maxdepth is not None: | |
idx_double_stars = path.find("**") | |
depth_double_stars = path[idx_double_stars:].count("/") + 1 | |
depth = depth - depth_double_stars + maxdepth | |
else: | |
depth = None | |
allpaths = await self._find( | |
root, maxdepth=depth, withdirs=True, detail=True, **kwargs | |
) | |
pattern = glob_translate(path + ("/" if ends_with_sep else "")) | |
pattern = re.compile(pattern) | |
out = { | |
p: info | |
for p, info in sorted(allpaths.items()) | |
if pattern.match( | |
( | |
p + "/" | |
if append_slash_to_dirname and info["type"] == "directory" | |
else p | |
) | |
) | |
} | |
if detail: | |
return out | |
else: | |
return list(out) | |
async def _du(self, path, total=True, maxdepth=None, **kwargs): | |
sizes = {} | |
# async for? | |
for f in await self._find(path, maxdepth=maxdepth, **kwargs): | |
info = await self._info(f) | |
sizes[info["name"]] = info["size"] | |
if total: | |
return sum(sizes.values()) | |
else: | |
return sizes | |
async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): | |
path = self._strip_protocol(path) | |
out = {} | |
detail = kwargs.pop("detail", False) | |
# Add the root directory if withdirs is requested | |
# This is needed for posix glob compliance | |
if withdirs and path != "" and await self._isdir(path): | |
out[path] = await self._info(path) | |
# async for? | |
async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): | |
if withdirs: | |
files.update(dirs) | |
out.update({info["name"]: info for name, info in files.items()}) | |
if not out and (await self._isfile(path)): | |
# walk works on directories, but find should also return [path] | |
# when path happens to be a file | |
out[path] = {} | |
names = sorted(out) | |
if not detail: | |
return names | |
else: | |
return {name: out[name] for name in names} | |
async def _expand_path(self, path, recursive=False, maxdepth=None): | |
if maxdepth is not None and maxdepth < 1: | |
raise ValueError("maxdepth must be at least 1") | |
if isinstance(path, str): | |
out = await self._expand_path([path], recursive, maxdepth) | |
else: | |
out = set() | |
path = [self._strip_protocol(p) for p in path] | |
for p in path: # can gather here | |
if has_magic(p): | |
bit = set(await self._glob(p, maxdepth=maxdepth)) | |
out |= bit | |
if recursive: | |
# glob call above expanded one depth so if maxdepth is defined | |
# then decrement it in expand_path call below. If it is zero | |
# after decrementing then avoid expand_path call. | |
if maxdepth is not None and maxdepth <= 1: | |
continue | |
out |= set( | |
await self._expand_path( | |
list(bit), | |
recursive=recursive, | |
maxdepth=maxdepth - 1 if maxdepth is not None else None, | |
) | |
) | |
continue | |
elif recursive: | |
rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) | |
out |= rec | |
if p not in out and (recursive is False or (await self._exists(p))): | |
# should only check once, for the root | |
out.add(p) | |
if not out: | |
raise FileNotFoundError(path) | |
return sorted(out) | |
async def _mkdir(self, path, create_parents=True, **kwargs): | |
pass # not necessary to implement, may not have directories | |
async def _makedirs(self, path, exist_ok=False): | |
pass # not necessary to implement, may not have directories | |
async def open_async(self, path, mode="rb", **kwargs): | |
if "b" not in mode or kwargs.get("compression"): | |
raise ValueError | |
raise NotImplementedError | |
def mirror_sync_methods(obj): | |
"""Populate sync and async methods for obj | |
For each method will create a sync version if the name refers to an async method | |
(coroutine) and there is no override in the child class; will create an async | |
method for the corresponding sync method if there is no implementation. | |
Uses the methods specified in | |
- async_methods: the set that an implementation is expected to provide | |
- default_async_methods: that can be derived from their sync version in | |
AbstractFileSystem | |
- AsyncFileSystem: async-specific default coroutines | |
""" | |
from fsspec import AbstractFileSystem | |
for method in async_methods + dir(AsyncFileSystem): | |
if not method.startswith("_"): | |
continue | |
smethod = method[1:] | |
if private.match(method): | |
isco = inspect.iscoroutinefunction(getattr(obj, method, None)) | |
unsync = getattr(getattr(obj, smethod, False), "__func__", None) | |
is_default = unsync is getattr(AbstractFileSystem, smethod, "") | |
if isco and is_default: | |
mth = sync_wrapper(getattr(obj, method), obj=obj) | |
setattr(obj, smethod, mth) | |
if not mth.__doc__: | |
mth.__doc__ = getattr( | |
getattr(AbstractFileSystem, smethod, None), "__doc__", "" | |
) | |
class FSSpecCoroutineCancel(Exception): | |
pass | |
def _dump_running_tasks( | |
printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False | |
): | |
import traceback | |
tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] | |
if printout: | |
[task.print_stack() for task in tasks] | |
out = [ | |
{ | |
"locals": task._coro.cr_frame.f_locals, | |
"file": task._coro.cr_frame.f_code.co_filename, | |
"firstline": task._coro.cr_frame.f_code.co_firstlineno, | |
"linelo": task._coro.cr_frame.f_lineno, | |
"stack": traceback.format_stack(task._coro.cr_frame), | |
"task": task if with_task else None, | |
} | |
for task in tasks | |
] | |
if cancel: | |
for t in tasks: | |
cbs = t._callbacks | |
t.cancel() | |
asyncio.futures.Future.set_exception(t, exc) | |
asyncio.futures.Future.cancel(t) | |
[cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures | |
try: | |
t._coro.throw(exc) # exits coro, unless explicitly handled | |
except exc: | |
pass | |
return out | |
class AbstractAsyncStreamedFile(AbstractBufferedFile): | |
# no read buffering, and always auto-commit | |
# TODO: readahead might still be useful here, but needs async version | |
async def read(self, length=-1): | |
""" | |
Return data from cache, or fetch pieces as necessary | |
Parameters | |
---------- | |
length: int (-1) | |
Number of bytes to read; if <0, all remaining bytes. | |
""" | |
length = -1 if length is None else int(length) | |
if self.mode != "rb": | |
raise ValueError("File not in read mode") | |
if length < 0: | |
length = self.size - self.loc | |
if self.closed: | |
raise ValueError("I/O operation on closed file.") | |
if length == 0: | |
# don't even bother calling fetch | |
return b"" | |
out = await self._fetch_range(self.loc, self.loc + length) | |
self.loc += len(out) | |
return out | |
async def write(self, data): | |
""" | |
Write data to buffer. | |
Buffer only sent on flush() or if buffer is greater than | |
or equal to blocksize. | |
Parameters | |
---------- | |
data: bytes | |
Set of bytes to be written. | |
""" | |
if self.mode not in {"wb", "ab"}: | |
raise ValueError("File not in write mode") | |
if self.closed: | |
raise ValueError("I/O operation on closed file.") | |
if self.forced: | |
raise ValueError("This file has been force-flushed, can only close") | |
out = self.buffer.write(data) | |
self.loc += out | |
if self.buffer.tell() >= self.blocksize: | |
await self.flush() | |
return out | |
async def close(self): | |
"""Close file | |
Finalizes writes, discards cache | |
""" | |
if getattr(self, "_unclosable", False): | |
return | |
if self.closed: | |
return | |
if self.mode == "rb": | |
self.cache = None | |
else: | |
if not self.forced: | |
await self.flush(force=True) | |
if self.fs is not None: | |
self.fs.invalidate_cache(self.path) | |
self.fs.invalidate_cache(self.fs._parent(self.path)) | |
self.closed = True | |
async def flush(self, force=False): | |
if self.closed: | |
raise ValueError("Flush on closed file") | |
if force and self.forced: | |
raise ValueError("Force flush cannot be called more than once") | |
if force: | |
self.forced = True | |
if self.mode not in {"wb", "ab"}: | |
# no-op to flush on read-mode | |
return | |
if not force and self.buffer.tell() < self.blocksize: | |
# Defer write on small block | |
return | |
if self.offset is None: | |
# Initialize a multipart upload | |
self.offset = 0 | |
try: | |
await self._initiate_upload() | |
except: | |
self.closed = True | |
raise | |
if await self._upload_chunk(final=force) is not False: | |
self.offset += self.buffer.seek(0, 2) | |
self.buffer = io.BytesIO() | |
async def __aenter__(self): | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
await self.close() | |
async def _fetch_range(self, start, end): | |
raise NotImplementedError | |
async def _initiate_upload(self): | |
pass | |
async def _upload_chunk(self, final=False): | |
raise NotImplementedError | |