|
import base64 |
|
import urllib |
|
|
|
import requests |
|
|
|
from fsspec import AbstractFileSystem |
|
from fsspec.spec import AbstractBufferedFile |
|
|
|
|
|
class DatabricksException(Exception): |
|
""" |
|
Helper class for exceptions raised in this module. |
|
""" |
|
|
|
def __init__(self, error_code, message): |
|
"""Create a new DatabricksException""" |
|
super().__init__(message) |
|
|
|
self.error_code = error_code |
|
self.message = message |
|
|
|
|
|
class DatabricksFileSystem(AbstractFileSystem): |
|
""" |
|
Get access to the Databricks filesystem implementation over HTTP. |
|
Can be used inside and outside of a databricks cluster. |
|
""" |
|
|
|
def __init__(self, instance, token, **kwargs): |
|
""" |
|
Create a new DatabricksFileSystem. |
|
|
|
Parameters |
|
---------- |
|
instance: str |
|
The instance URL of the databricks cluster. |
|
For example for an Azure databricks cluster, this |
|
has the form adb-<some-number>.<two digits>.azuredatabricks.net. |
|
token: str |
|
Your personal token. Find out more |
|
here: https://docs.databricks.com/dev-tools/api/latest/authentication.html |
|
""" |
|
self.instance = instance |
|
self.token = token |
|
|
|
self.session = requests.Session() |
|
self.session.headers.update({"Authorization": f"Bearer {self.token}"}) |
|
|
|
super().__init__(**kwargs) |
|
|
|
def ls(self, path, detail=True): |
|
""" |
|
List the contents of the given path. |
|
|
|
Parameters |
|
---------- |
|
path: str |
|
Absolute path |
|
detail: bool |
|
Return not only the list of filenames, |
|
but also additional information on file sizes |
|
and types. |
|
""" |
|
out = self._ls_from_cache(path) |
|
if not out: |
|
try: |
|
r = self._send_to_api( |
|
method="get", endpoint="list", json={"path": path} |
|
) |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
|
raise FileNotFoundError(e.message) |
|
|
|
raise e |
|
files = r["files"] |
|
out = [ |
|
{ |
|
"name": o["path"], |
|
"type": "directory" if o["is_dir"] else "file", |
|
"size": o["file_size"], |
|
} |
|
for o in files |
|
] |
|
self.dircache[path] = out |
|
|
|
if detail: |
|
return out |
|
return [o["name"] for o in out] |
|
|
|
def makedirs(self, path, exist_ok=True): |
|
""" |
|
Create a given absolute path and all of its parents. |
|
|
|
Parameters |
|
---------- |
|
path: str |
|
Absolute path to create |
|
exist_ok: bool |
|
If false, checks if the folder |
|
exists before creating it (and raises an |
|
Exception if this is the case) |
|
""" |
|
if not exist_ok: |
|
try: |
|
|
|
self._send_to_api( |
|
method="get", endpoint="get-status", json={"path": path} |
|
) |
|
raise FileExistsError(f"Path {path} already exists") |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
|
pass |
|
|
|
try: |
|
self._send_to_api(method="post", endpoint="mkdirs", json={"path": path}) |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_ALREADY_EXISTS": |
|
raise FileExistsError(e.message) |
|
|
|
raise e |
|
self.invalidate_cache(self._parent(path)) |
|
|
|
def mkdir(self, path, create_parents=True, **kwargs): |
|
""" |
|
Create a given absolute path and all of its parents. |
|
|
|
Parameters |
|
---------- |
|
path: str |
|
Absolute path to create |
|
create_parents: bool |
|
Whether to create all parents or not. |
|
"False" is not implemented so far. |
|
""" |
|
if not create_parents: |
|
raise NotImplementedError |
|
|
|
self.mkdirs(path, **kwargs) |
|
|
|
def rm(self, path, recursive=False): |
|
""" |
|
Remove the file or folder at the given absolute path. |
|
|
|
Parameters |
|
---------- |
|
path: str |
|
Absolute path what to remove |
|
recursive: bool |
|
Recursively delete all files in a folder. |
|
""" |
|
try: |
|
self._send_to_api( |
|
method="post", |
|
endpoint="delete", |
|
json={"path": path, "recursive": recursive}, |
|
) |
|
except DatabricksException as e: |
|
|
|
|
|
if e.error_code == "PARTIAL_DELETE": |
|
self.rm(path=path, recursive=recursive) |
|
elif e.error_code == "IO_ERROR": |
|
|
|
raise OSError(e.message) |
|
|
|
raise e |
|
self.invalidate_cache(self._parent(path)) |
|
|
|
def mv(self, source_path, destination_path, recursive=False, maxdepth=None): |
|
""" |
|
Move a source to a destination path. |
|
|
|
A note from the original [databricks API manual] |
|
(https://docs.databricks.com/dev-tools/api/latest/dbfs.html#move). |
|
|
|
When moving a large number of files the API call will time out after |
|
approximately 60s, potentially resulting in partially moved data. |
|
Therefore, for operations that move more than 10k files, we strongly |
|
discourage using the DBFS REST API. |
|
|
|
Parameters |
|
---------- |
|
source_path: str |
|
From where to move (absolute path) |
|
destination_path: str |
|
To where to move (absolute path) |
|
recursive: bool |
|
Not implemented to far. |
|
maxdepth: |
|
Not implemented to far. |
|
""" |
|
if recursive: |
|
raise NotImplementedError |
|
if maxdepth: |
|
raise NotImplementedError |
|
|
|
try: |
|
self._send_to_api( |
|
method="post", |
|
endpoint="move", |
|
json={"source_path": source_path, "destination_path": destination_path}, |
|
) |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
|
raise FileNotFoundError(e.message) |
|
elif e.error_code == "RESOURCE_ALREADY_EXISTS": |
|
raise FileExistsError(e.message) |
|
|
|
raise e |
|
self.invalidate_cache(self._parent(source_path)) |
|
self.invalidate_cache(self._parent(destination_path)) |
|
|
|
def _open(self, path, mode="rb", block_size="default", **kwargs): |
|
""" |
|
Overwrite the base class method to make sure to create a DBFile. |
|
All arguments are copied from the base method. |
|
|
|
Only the default blocksize is allowed. |
|
""" |
|
return DatabricksFile(self, path, mode=mode, block_size=block_size, **kwargs) |
|
|
|
def _send_to_api(self, method, endpoint, json): |
|
""" |
|
Send the given json to the DBFS API |
|
using a get or post request (specified by the argument `method`). |
|
|
|
Parameters |
|
---------- |
|
method: str |
|
Which http method to use for communication; "get" or "post". |
|
endpoint: str |
|
Where to send the request to (last part of the API URL) |
|
json: dict |
|
Dictionary of information to send |
|
""" |
|
if method == "post": |
|
session_call = self.session.post |
|
elif method == "get": |
|
session_call = self.session.get |
|
else: |
|
raise ValueError(f"Do not understand method {method}") |
|
|
|
url = urllib.parse.urljoin(f"https://{self.instance}/api/2.0/dbfs/", endpoint) |
|
|
|
r = session_call(url, json=json) |
|
|
|
|
|
|
|
try: |
|
r.raise_for_status() |
|
except requests.HTTPError as e: |
|
|
|
|
|
try: |
|
exception_json = e.response.json() |
|
except Exception: |
|
raise e |
|
|
|
raise DatabricksException(**exception_json) |
|
|
|
return r.json() |
|
|
|
def _create_handle(self, path, overwrite=True): |
|
""" |
|
Internal function to create a handle, which can be used to |
|
write blocks of a file to DBFS. |
|
A handle has a unique identifier which needs to be passed |
|
whenever written during this transaction. |
|
The handle is active for 10 minutes - after that a new |
|
write transaction needs to be created. |
|
Make sure to close the handle after you are finished. |
|
|
|
Parameters |
|
---------- |
|
path: str |
|
Absolute path for this file. |
|
overwrite: bool |
|
If a file already exist at this location, either overwrite |
|
it or raise an exception. |
|
""" |
|
try: |
|
r = self._send_to_api( |
|
method="post", |
|
endpoint="create", |
|
json={"path": path, "overwrite": overwrite}, |
|
) |
|
return r["handle"] |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_ALREADY_EXISTS": |
|
raise FileExistsError(e.message) |
|
|
|
raise e |
|
|
|
def _close_handle(self, handle): |
|
""" |
|
Close a handle, which was opened by :func:`_create_handle`. |
|
|
|
Parameters |
|
---------- |
|
handle: str |
|
Which handle to close. |
|
""" |
|
try: |
|
self._send_to_api(method="post", endpoint="close", json={"handle": handle}) |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
|
raise FileNotFoundError(e.message) |
|
|
|
raise e |
|
|
|
def _add_data(self, handle, data): |
|
""" |
|
Upload data to an already opened file handle |
|
(opened by :func:`_create_handle`). |
|
The maximal allowed data size is 1MB after |
|
conversion to base64. |
|
Remember to close the handle when you are finished. |
|
|
|
Parameters |
|
---------- |
|
handle: str |
|
Which handle to upload data to. |
|
data: bytes |
|
Block of data to add to the handle. |
|
""" |
|
data = base64.b64encode(data).decode() |
|
try: |
|
self._send_to_api( |
|
method="post", |
|
endpoint="add-block", |
|
json={"handle": handle, "data": data}, |
|
) |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
|
raise FileNotFoundError(e.message) |
|
elif e.error_code == "MAX_BLOCK_SIZE_EXCEEDED": |
|
raise ValueError(e.message) |
|
|
|
raise e |
|
|
|
def _get_data(self, path, start, end): |
|
""" |
|
Download data in bytes from a given absolute path in a block |
|
from [start, start+length]. |
|
The maximum number of allowed bytes to read is 1MB. |
|
|
|
Parameters |
|
---------- |
|
path: str |
|
Absolute path to download data from |
|
start: int |
|
Start position of the block |
|
end: int |
|
End position of the block |
|
""" |
|
try: |
|
r = self._send_to_api( |
|
method="get", |
|
endpoint="read", |
|
json={"path": path, "offset": start, "length": end - start}, |
|
) |
|
return base64.b64decode(r["data"]) |
|
except DatabricksException as e: |
|
if e.error_code == "RESOURCE_DOES_NOT_EXIST": |
|
raise FileNotFoundError(e.message) |
|
elif e.error_code in ["INVALID_PARAMETER_VALUE", "MAX_READ_SIZE_EXCEEDED"]: |
|
raise ValueError(e.message) |
|
|
|
raise e |
|
|
|
def invalidate_cache(self, path=None): |
|
if path is None: |
|
self.dircache.clear() |
|
else: |
|
self.dircache.pop(path, None) |
|
super().invalidate_cache(path) |
|
|
|
|
|
class DatabricksFile(AbstractBufferedFile): |
|
""" |
|
Helper class for files referenced in the DatabricksFileSystem. |
|
""" |
|
|
|
DEFAULT_BLOCK_SIZE = 1 * 2**20 |
|
|
|
def __init__( |
|
self, |
|
fs, |
|
path, |
|
mode="rb", |
|
block_size="default", |
|
autocommit=True, |
|
cache_type="readahead", |
|
cache_options=None, |
|
**kwargs, |
|
): |
|
""" |
|
Create a new instance of the DatabricksFile. |
|
|
|
The blocksize needs to be the default one. |
|
""" |
|
if block_size is None or block_size == "default": |
|
block_size = self.DEFAULT_BLOCK_SIZE |
|
|
|
assert ( |
|
block_size == self.DEFAULT_BLOCK_SIZE |
|
), f"Only the default block size is allowed, not {block_size}" |
|
|
|
super().__init__( |
|
fs, |
|
path, |
|
mode=mode, |
|
block_size=block_size, |
|
autocommit=autocommit, |
|
cache_type=cache_type, |
|
cache_options=cache_options or {}, |
|
**kwargs, |
|
) |
|
|
|
def _initiate_upload(self): |
|
"""Internal function to start a file upload""" |
|
self.handle = self.fs._create_handle(self.path) |
|
|
|
def _upload_chunk(self, final=False): |
|
"""Internal function to add a chunk of data to a started upload""" |
|
self.buffer.seek(0) |
|
data = self.buffer.getvalue() |
|
|
|
data_chunks = [ |
|
data[start:end] for start, end in self._to_sized_blocks(len(data)) |
|
] |
|
|
|
for data_chunk in data_chunks: |
|
self.fs._add_data(handle=self.handle, data=data_chunk) |
|
|
|
if final: |
|
self.fs._close_handle(handle=self.handle) |
|
return True |
|
|
|
def _fetch_range(self, start, end): |
|
"""Internal function to download a block of data""" |
|
return_buffer = b"" |
|
length = end - start |
|
for chunk_start, chunk_end in self._to_sized_blocks(length, start): |
|
return_buffer += self.fs._get_data( |
|
path=self.path, start=chunk_start, end=chunk_end |
|
) |
|
|
|
return return_buffer |
|
|
|
def _to_sized_blocks(self, length, start=0): |
|
"""Helper function to split a range from 0 to total_length into bloksizes""" |
|
end = start + length |
|
for data_chunk in range(start, end, self.blocksize): |
|
data_start = data_chunk |
|
data_end = min(end, data_chunk + self.blocksize) |
|
yield data_start, data_end |
|
|