| | from collections import deque |
| |
|
| |
|
| | class Transaction: |
| | """Filesystem transaction write context |
| | |
| | Gathers files for deferred commit or discard, so that several write |
| | operations can be finalized semi-atomically. This works by having this |
| | instance as the ``.transaction`` attribute of the given filesystem |
| | """ |
| |
|
| | def __init__(self, fs, **kwargs): |
| | """ |
| | Parameters |
| | ---------- |
| | fs: FileSystem instance |
| | """ |
| | self.fs = fs |
| | self.files = deque() |
| |
|
| | def __enter__(self): |
| | self.start() |
| | return self |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | """End transaction and commit, if exit is not due to exception""" |
| | |
| | self.complete(commit=exc_type is None) |
| | if self.fs: |
| | self.fs._intrans = False |
| | self.fs._transaction = None |
| | self.fs = None |
| |
|
| | def start(self): |
| | """Start a transaction on this FileSystem""" |
| | self.files = deque() |
| | self.fs._intrans = True |
| |
|
| | def complete(self, commit=True): |
| | """Finish transaction: commit or discard all deferred files""" |
| | while self.files: |
| | f = self.files.popleft() |
| | if commit: |
| | f.commit() |
| | else: |
| | f.discard() |
| | self.fs._intrans = False |
| | self.fs._transaction = None |
| | self.fs = None |
| |
|
| |
|
| | class FileActor: |
| | def __init__(self): |
| | self.files = [] |
| |
|
| | def commit(self): |
| | for f in self.files: |
| | f.commit() |
| | self.files.clear() |
| |
|
| | def discard(self): |
| | for f in self.files: |
| | f.discard() |
| | self.files.clear() |
| |
|
| | def append(self, f): |
| | self.files.append(f) |
| |
|
| |
|
| | class DaskTransaction(Transaction): |
| | def __init__(self, fs): |
| | """ |
| | Parameters |
| | ---------- |
| | fs: FileSystem instance |
| | """ |
| | import distributed |
| |
|
| | super().__init__(fs) |
| | client = distributed.default_client() |
| | self.files = client.submit(FileActor, actor=True).result() |
| |
|
| | def complete(self, commit=True): |
| | """Finish transaction: commit or discard all deferred files""" |
| | if commit: |
| | self.files.commit().result() |
| | else: |
| | self.files.discard().result() |
| | self.fs._intrans = False |
| | self.fs = None |
| |
|