| from collections import deque |
|
|
|
|
| class Transaction: |
| """Filesystem transaction write context |
| |
| Gathers files for deferred commit or discard, so that several write |
| operations can be finalized semi-atomically. This works by having this |
| instance as the ``.transaction`` attribute of the given filesystem |
| """ |
|
|
| def __init__(self, fs, **kwargs): |
| """ |
| Parameters |
| ---------- |
| fs: FileSystem instance |
| """ |
| self.fs = fs |
| self.files = deque() |
|
|
| def __enter__(self): |
| self.start() |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """End transaction and commit, if exit is not due to exception""" |
| |
| self.complete(commit=exc_type is None) |
| if self.fs: |
| self.fs._intrans = False |
| self.fs._transaction = None |
| self.fs = None |
|
|
| def start(self): |
| """Start a transaction on this FileSystem""" |
| self.files = deque() |
| self.fs._intrans = True |
|
|
| def complete(self, commit=True): |
| """Finish transaction: commit or discard all deferred files""" |
| while self.files: |
| f = self.files.popleft() |
| if commit: |
| f.commit() |
| else: |
| f.discard() |
| self.fs._intrans = False |
| self.fs._transaction = None |
| self.fs = None |
|
|
|
|
| class FileActor: |
| def __init__(self): |
| self.files = [] |
|
|
| def commit(self): |
| for f in self.files: |
| f.commit() |
| self.files.clear() |
|
|
| def discard(self): |
| for f in self.files: |
| f.discard() |
| self.files.clear() |
|
|
| def append(self, f): |
| self.files.append(f) |
|
|
|
|
| class DaskTransaction(Transaction): |
| def __init__(self, fs): |
| """ |
| Parameters |
| ---------- |
| fs: FileSystem instance |
| """ |
| import distributed |
|
|
| super().__init__(fs) |
| client = distributed.default_client() |
| self.files = client.submit(FileActor, actor=True).result() |
|
|
| def complete(self, commit=True): |
| """Finish transaction: commit or discard all deferred files""" |
| if commit: |
| self.files.commit().result() |
| else: |
| self.files.discard().result() |
| self.fs._intrans = False |
| self.fs = None |
|
|