Spaces:
Sleeping
Sleeping
| import os | |
| from collections import defaultdict | |
| from enum import IntEnum, unique | |
| from io import BytesIO | |
| from multiprocessing.pool import ThreadPool | |
| from threading import Lock, RLock | |
| from types import MappingProxyType | |
| from typing import Any, Callable, Optional, Union | |
| from .abstract import AbstractContext, AbstractOperation | |
| fdsync = getattr(os, "fdatasync", os.fsync) | |
| NATIVE_PREAD_PWRITE = hasattr(os, "pread") and hasattr(os, "pwrite") | |
| class OpCode(IntEnum): | |
| READ = 0 | |
| WRITE = 1 | |
| FSYNC = 2 | |
| FDSYNC = 3 | |
| NOOP = -1 | |
| class Context(AbstractContext): | |
| """ | |
| python aio context implementation | |
| """ | |
| MAX_POOL_SIZE = 128 | |
| def __init__(self, max_requests: int = 32, pool_size: int = 8): | |
| assert pool_size < self.MAX_POOL_SIZE | |
| self.__max_requests = max_requests | |
| self.pool = ThreadPool(pool_size) | |
| self._in_progress = 0 | |
| self._closed = False | |
| self._closed_lock = Lock() | |
| if not NATIVE_PREAD_PWRITE: | |
| self._locks_cleaner = RLock() # type: ignore | |
| self._locks = defaultdict(RLock) # type: ignore | |
| def max_requests(self) -> int: | |
| return self.__max_requests | |
| def _execute(self, operation: "Operation"): | |
| handler = self._OP_MAP[operation.opcode] | |
| def on_error(exc): | |
| self._in_progress -= 1 | |
| operation.exception = exc | |
| operation.written = 0 | |
| operation.callback(None) | |
| def on_success(result): | |
| self._in_progress -= 1 | |
| operation.written = result | |
| operation.callback(result) | |
| if self._in_progress > self.__max_requests: | |
| raise RuntimeError( | |
| "Maximum simultaneous requests have been reached", | |
| ) | |
| self._in_progress += 1 | |
| self.pool.apply_async( | |
| handler, args=(self, operation), | |
| callback=on_success, | |
| error_callback=on_error, | |
| ) | |
| if NATIVE_PREAD_PWRITE: | |
| def __pread(self, fd, size, offset): | |
| return os.pread(fd, size, offset) | |
| def __pwrite(self, fd, bytes, offset): | |
| return os.pwrite(fd, bytes, offset) | |
| else: | |
| def __pread(self, fd, size, offset): | |
| with self._locks[fd]: | |
| os.lseek(fd, 0, os.SEEK_SET) | |
| os.lseek(fd, offset, os.SEEK_SET) | |
| return os.read(fd, size) | |
| def __pwrite(self, fd, bytes, offset): | |
| with self._locks[fd]: | |
| os.lseek(fd, 0, os.SEEK_SET) | |
| os.lseek(fd, offset, os.SEEK_SET) | |
| return os.write(fd, bytes) | |
| def _handle_read(self, operation: "Operation"): | |
| return operation.buffer.write( | |
| self.__pread( | |
| operation.fileno, | |
| operation.nbytes, | |
| operation.offset, | |
| ), | |
| ) | |
| def _handle_write(self, operation: "Operation"): | |
| return self.__pwrite( | |
| operation.fileno, operation.buffer.getvalue(), operation.offset, | |
| ) | |
| def _handle_fsync(self, operation: "Operation"): | |
| return os.fsync(operation.fileno) | |
| def _handle_fdsync(self, operation: "Operation"): | |
| return fdsync(operation.fileno) | |
| def _handle_noop(self, operation: "Operation"): | |
| return | |
| def submit(self, *aio_operations) -> int: | |
| operations = [] | |
| for operation in aio_operations: | |
| if not isinstance(operation, Operation): | |
| raise ValueError("Invalid Operation %r", operation) | |
| operations.append(operation) | |
| count = 0 | |
| for operation in operations: | |
| self._execute(operation) | |
| count += 1 | |
| return count | |
| def cancel(self, *aio_operations) -> int: | |
| """ | |
| Cancels multiple Operations. Returns | |
| Operation.cancel(aio_op1, aio_op2, aio_opN, ...) -> int | |
| (Always returns zero, this method exists for compatibility reasons) | |
| """ | |
| return 0 | |
| def close(self): | |
| if self._closed: | |
| return | |
| with self._closed_lock: | |
| self.pool.close() | |
| self._closed = True | |
| def __del__(self): | |
| if self.pool.close(): | |
| self.close() | |
| _OP_MAP = MappingProxyType({ | |
| OpCode.READ: _handle_read, | |
| OpCode.WRITE: _handle_write, | |
| OpCode.FSYNC: _handle_fsync, | |
| OpCode.FDSYNC: _handle_fdsync, | |
| OpCode.NOOP: _handle_noop, | |
| }) | |
| # noinspection PyPropertyDefinition | |
| class Operation(AbstractOperation): | |
| """ | |
| python aio operation implementation | |
| """ | |
| def __init__( | |
| self, | |
| fd: int, | |
| nbytes: Optional[int], | |
| offset: Optional[int], | |
| opcode: OpCode, | |
| payload: Optional[bytes] = None, | |
| priority: Optional[int] = None, | |
| ): | |
| self.callback = None # type: Optional[Callable[[int], Any]] | |
| self.buffer = BytesIO() | |
| if opcode == OpCode.WRITE and payload: | |
| self.buffer = BytesIO(payload) | |
| self.opcode = opcode | |
| self.__fileno = fd | |
| self.__offset = offset or 0 | |
| self.__opcode = opcode | |
| self.__nbytes = nbytes or 0 | |
| self.__priority = priority or 0 | |
| self.exception = None | |
| self.written = 0 | |
| def read( | |
| cls, nbytes: int, fd: int, offset: int, priority=0, | |
| ) -> "Operation": | |
| """ | |
| Creates a new instance of Operation on read mode. | |
| """ | |
| return cls(fd, nbytes, offset, opcode=OpCode.READ, priority=priority) | |
| def write( | |
| cls, payload_bytes: bytes, fd: int, offset: int, priority=0, | |
| ) -> "Operation": | |
| """ | |
| Creates a new instance of AIOOperation on write mode. | |
| """ | |
| return cls( | |
| fd, | |
| len(payload_bytes), | |
| offset, | |
| payload=payload_bytes, | |
| opcode=OpCode.WRITE, | |
| priority=priority, | |
| ) | |
| def fsync(cls, fd: int, priority=0) -> "Operation": | |
| """ | |
| Creates a new instance of AIOOperation on fsync mode. | |
| """ | |
| return cls(fd, None, None, opcode=OpCode.FSYNC, priority=priority) | |
| def fdsync(cls, fd: int, priority=0) -> "Operation": | |
| """ | |
| Creates a new instance of AIOOperation on fdsync mode. | |
| """ | |
| return cls(fd, None, None, opcode=OpCode.FDSYNC, priority=priority) | |
| def get_value(self) -> Union[bytes, int]: | |
| """ | |
| Method returns a bytes value of AIOOperation's result or None. | |
| """ | |
| if self.exception: | |
| raise self.exception | |
| if self.opcode == OpCode.WRITE: | |
| return self.written | |
| if self.buffer is None: | |
| return | |
| return self.buffer.getvalue() | |
| def fileno(self) -> int: | |
| return self.__fileno | |
| def offset(self) -> int: | |
| return self.__offset | |
| def payload(self) -> Optional[memoryview]: | |
| return self.buffer.getbuffer() | |
| def nbytes(self) -> int: | |
| return self.__nbytes | |
| def set_callback(self, callback: Callable[[int], Any]) -> bool: | |
| self.callback = callback | |
| return True | |