|
import base64 |
|
import io |
|
import re |
|
|
|
import requests |
|
|
|
import fsspec |
|
|
|
|
|
class JupyterFileSystem(fsspec.AbstractFileSystem): |
|
"""View of the files as seen by a Jupyter server (notebook or lab)""" |
|
|
|
protocol = ("jupyter", "jlab") |
|
|
|
def __init__(self, url, tok=None, **kwargs): |
|
""" |
|
|
|
Parameters |
|
---------- |
|
url : str |
|
Base URL of the server, like "http://127.0.0.1:8888". May include |
|
token in the string, which is given by the process when starting up |
|
tok : str |
|
If the token is obtained separately, can be given here |
|
kwargs |
|
""" |
|
if "?" in url: |
|
if tok is None: |
|
try: |
|
tok = re.findall("token=([a-z0-9]+)", url)[0] |
|
except IndexError as e: |
|
raise ValueError("Could not determine token") from e |
|
url = url.split("?", 1)[0] |
|
self.url = url.rstrip("/") + "/api/contents" |
|
self.session = requests.Session() |
|
if tok: |
|
self.session.headers["Authorization"] = f"token {tok}" |
|
|
|
super().__init__(**kwargs) |
|
|
|
def ls(self, path, detail=True, **kwargs): |
|
path = self._strip_protocol(path) |
|
r = self.session.get(f"{self.url}/{path}") |
|
if r.status_code == 404: |
|
return FileNotFoundError(path) |
|
r.raise_for_status() |
|
out = r.json() |
|
|
|
if out["type"] == "directory": |
|
out = out["content"] |
|
else: |
|
out = [out] |
|
for o in out: |
|
o["name"] = o.pop("path") |
|
o.pop("content") |
|
if o["type"] == "notebook": |
|
o["type"] = "file" |
|
if detail: |
|
return out |
|
return [o["name"] for o in out] |
|
|
|
def cat_file(self, path, start=None, end=None, **kwargs): |
|
path = self._strip_protocol(path) |
|
r = self.session.get(f"{self.url}/{path}") |
|
if r.status_code == 404: |
|
return FileNotFoundError(path) |
|
r.raise_for_status() |
|
out = r.json() |
|
if out["format"] == "text": |
|
|
|
b = out["content"].encode() |
|
else: |
|
b = base64.b64decode(out["content"]) |
|
return b[start:end] |
|
|
|
def pipe_file(self, path, value, **_): |
|
path = self._strip_protocol(path) |
|
json = { |
|
"name": path.rsplit("/", 1)[-1], |
|
"path": path, |
|
"size": len(value), |
|
"content": base64.b64encode(value).decode(), |
|
"format": "base64", |
|
"type": "file", |
|
} |
|
self.session.put(f"{self.url}/{path}", json=json) |
|
|
|
def mkdir(self, path, create_parents=True, **kwargs): |
|
path = self._strip_protocol(path) |
|
if create_parents and "/" in path: |
|
self.mkdir(path.rsplit("/", 1)[0], True) |
|
json = { |
|
"name": path.rsplit("/", 1)[-1], |
|
"path": path, |
|
"size": None, |
|
"content": None, |
|
"type": "directory", |
|
} |
|
self.session.put(f"{self.url}/{path}", json=json) |
|
|
|
def _rm(self, path): |
|
path = self._strip_protocol(path) |
|
self.session.delete(f"{self.url}/{path}") |
|
|
|
def _open(self, path, mode="rb", **kwargs): |
|
path = self._strip_protocol(path) |
|
if mode == "rb": |
|
data = self.cat_file(path) |
|
return io.BytesIO(data) |
|
else: |
|
return SimpleFileWriter(self, path, mode="wb") |
|
|
|
|
|
class SimpleFileWriter(fsspec.spec.AbstractBufferedFile): |
|
def _upload_chunk(self, final=False): |
|
"""Never uploads a chunk until file is done |
|
|
|
Not suitable for large files |
|
""" |
|
if final is False: |
|
return False |
|
self.buffer.seek(0) |
|
data = self.buffer.read() |
|
self.fs.pipe_file(self.path, data) |
|
|