|
import dask |
|
from distributed.client import Client, _get_global_client |
|
from distributed.worker import Worker |
|
|
|
from fsspec import filesystem |
|
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem |
|
from fsspec.utils import infer_storage_options |
|
|
|
|
|
def _get_client(client): |
|
if client is None: |
|
return _get_global_client() |
|
elif isinstance(client, Client): |
|
return client |
|
else: |
|
|
|
return Client(client) |
|
|
|
|
|
def _in_worker(): |
|
return bool(Worker._instances) |
|
|
|
|
|
class DaskWorkerFileSystem(AbstractFileSystem): |
|
"""View files accessible to a worker as any other remote file-system |
|
|
|
When instances are run on the worker, uses the real filesystem. When |
|
run on the client, they call the worker to provide information or data. |
|
|
|
**Warning** this implementation is experimental, and read-only for now. |
|
""" |
|
|
|
def __init__( |
|
self, target_protocol=None, target_options=None, fs=None, client=None, **kwargs |
|
): |
|
super().__init__(**kwargs) |
|
if not (fs is None) ^ (target_protocol is None): |
|
raise ValueError( |
|
"Please provide one of filesystem instance (fs) or" |
|
" target_protocol, not both" |
|
) |
|
self.target_protocol = target_protocol |
|
self.target_options = target_options |
|
self.worker = None |
|
self.client = client |
|
self.fs = fs |
|
self._determine_worker() |
|
|
|
@staticmethod |
|
def _get_kwargs_from_urls(path): |
|
so = infer_storage_options(path) |
|
if "host" in so and "port" in so: |
|
return {"client": f"{so['host']}:{so['port']}"} |
|
else: |
|
return {} |
|
|
|
def _determine_worker(self): |
|
if _in_worker(): |
|
self.worker = True |
|
if self.fs is None: |
|
self.fs = filesystem( |
|
self.target_protocol, **(self.target_options or {}) |
|
) |
|
else: |
|
self.worker = False |
|
self.client = _get_client(self.client) |
|
self.rfs = dask.delayed(self) |
|
|
|
def mkdir(self, *args, **kwargs): |
|
if self.worker: |
|
self.fs.mkdir(*args, **kwargs) |
|
else: |
|
self.rfs.mkdir(*args, **kwargs).compute() |
|
|
|
def rm(self, *args, **kwargs): |
|
if self.worker: |
|
self.fs.rm(*args, **kwargs) |
|
else: |
|
self.rfs.rm(*args, **kwargs).compute() |
|
|
|
def copy(self, *args, **kwargs): |
|
if self.worker: |
|
self.fs.copy(*args, **kwargs) |
|
else: |
|
self.rfs.copy(*args, **kwargs).compute() |
|
|
|
def mv(self, *args, **kwargs): |
|
if self.worker: |
|
self.fs.mv(*args, **kwargs) |
|
else: |
|
self.rfs.mv(*args, **kwargs).compute() |
|
|
|
def ls(self, *args, **kwargs): |
|
if self.worker: |
|
return self.fs.ls(*args, **kwargs) |
|
else: |
|
return self.rfs.ls(*args, **kwargs).compute() |
|
|
|
def _open( |
|
self, |
|
path, |
|
mode="rb", |
|
block_size=None, |
|
autocommit=True, |
|
cache_options=None, |
|
**kwargs, |
|
): |
|
if self.worker: |
|
return self.fs._open( |
|
path, |
|
mode=mode, |
|
block_size=block_size, |
|
autocommit=autocommit, |
|
cache_options=cache_options, |
|
**kwargs, |
|
) |
|
else: |
|
return DaskFile( |
|
fs=self, |
|
path=path, |
|
mode=mode, |
|
block_size=block_size, |
|
autocommit=autocommit, |
|
cache_options=cache_options, |
|
**kwargs, |
|
) |
|
|
|
def fetch_range(self, path, mode, start, end): |
|
if self.worker: |
|
with self._open(path, mode) as f: |
|
f.seek(start) |
|
return f.read(end - start) |
|
else: |
|
return self.rfs.fetch_range(path, mode, start, end).compute() |
|
|
|
|
|
class DaskFile(AbstractBufferedFile): |
|
def __init__(self, mode="rb", **kwargs): |
|
if mode != "rb": |
|
raise ValueError('Remote dask files can only be opened in "rb" mode') |
|
super().__init__(**kwargs) |
|
|
|
def _upload_chunk(self, final=False): |
|
pass |
|
|
|
def _initiate_upload(self): |
|
"""Create remote file/upload""" |
|
pass |
|
|
|
def _fetch_range(self, start, end): |
|
"""Get the specified set of bytes from remote""" |
|
return self.fs.fetch_range(self.path, self.mode, start, end) |
|
|