kushalExplores's picture
Upload folder using huggingface_hub
53dbcc1 verified
import os
from collections import defaultdict
from enum import IntEnum, unique
from io import BytesIO
from multiprocessing.pool import ThreadPool
from threading import Lock, RLock
from types import MappingProxyType
from typing import Any, Callable, Optional, Union
from .abstract import AbstractContext, AbstractOperation
fdsync = getattr(os, "fdatasync", os.fsync)
NATIVE_PREAD_PWRITE = hasattr(os, "pread") and hasattr(os, "pwrite")
@unique
class OpCode(IntEnum):
READ = 0
WRITE = 1
FSYNC = 2
FDSYNC = 3
NOOP = -1
class Context(AbstractContext):
"""
python aio context implementation
"""
MAX_POOL_SIZE = 128
def __init__(self, max_requests: int = 32, pool_size: int = 8):
assert pool_size < self.MAX_POOL_SIZE
self.__max_requests = max_requests
self.pool = ThreadPool(pool_size)
self._in_progress = 0
self._closed = False
self._closed_lock = Lock()
if not NATIVE_PREAD_PWRITE:
self._locks_cleaner = RLock() # type: ignore
self._locks = defaultdict(RLock) # type: ignore
@property
def max_requests(self) -> int:
return self.__max_requests
def _execute(self, operation: "Operation"):
handler = self._OP_MAP[operation.opcode]
def on_error(exc):
self._in_progress -= 1
operation.exception = exc
operation.written = 0
operation.callback(None)
def on_success(result):
self._in_progress -= 1
operation.written = result
operation.callback(result)
if self._in_progress > self.__max_requests:
raise RuntimeError(
"Maximum simultaneous requests have been reached",
)
self._in_progress += 1
self.pool.apply_async(
handler, args=(self, operation),
callback=on_success,
error_callback=on_error,
)
if NATIVE_PREAD_PWRITE:
def __pread(self, fd, size, offset):
return os.pread(fd, size, offset)
def __pwrite(self, fd, bytes, offset):
return os.pwrite(fd, bytes, offset)
else:
def __pread(self, fd, size, offset):
with self._locks[fd]:
os.lseek(fd, 0, os.SEEK_SET)
os.lseek(fd, offset, os.SEEK_SET)
return os.read(fd, size)
def __pwrite(self, fd, bytes, offset):
with self._locks[fd]:
os.lseek(fd, 0, os.SEEK_SET)
os.lseek(fd, offset, os.SEEK_SET)
return os.write(fd, bytes)
def _handle_read(self, operation: "Operation"):
return operation.buffer.write(
self.__pread(
operation.fileno,
operation.nbytes,
operation.offset,
),
)
def _handle_write(self, operation: "Operation"):
return self.__pwrite(
operation.fileno, operation.buffer.getvalue(), operation.offset,
)
def _handle_fsync(self, operation: "Operation"):
return os.fsync(operation.fileno)
def _handle_fdsync(self, operation: "Operation"):
return fdsync(operation.fileno)
def _handle_noop(self, operation: "Operation"):
return
def submit(self, *aio_operations) -> int:
operations = []
for operation in aio_operations:
if not isinstance(operation, Operation):
raise ValueError("Invalid Operation %r", operation)
operations.append(operation)
count = 0
for operation in operations:
self._execute(operation)
count += 1
return count
def cancel(self, *aio_operations) -> int:
"""
Cancels multiple Operations. Returns
Operation.cancel(aio_op1, aio_op2, aio_opN, ...) -> int
(Always returns zero, this method exists for compatibility reasons)
"""
return 0
def close(self):
if self._closed:
return
with self._closed_lock:
self.pool.close()
self._closed = True
def __del__(self):
if self.pool.close():
self.close()
_OP_MAP = MappingProxyType({
OpCode.READ: _handle_read,
OpCode.WRITE: _handle_write,
OpCode.FSYNC: _handle_fsync,
OpCode.FDSYNC: _handle_fdsync,
OpCode.NOOP: _handle_noop,
})
# noinspection PyPropertyDefinition
class Operation(AbstractOperation):
"""
python aio operation implementation
"""
def __init__(
self,
fd: int,
nbytes: Optional[int],
offset: Optional[int],
opcode: OpCode,
payload: Optional[bytes] = None,
priority: Optional[int] = None,
):
self.callback = None # type: Optional[Callable[[int], Any]]
self.buffer = BytesIO()
if opcode == OpCode.WRITE and payload:
self.buffer = BytesIO(payload)
self.opcode = opcode
self.__fileno = fd
self.__offset = offset or 0
self.__opcode = opcode
self.__nbytes = nbytes or 0
self.__priority = priority or 0
self.exception = None
self.written = 0
@classmethod
def read(
cls, nbytes: int, fd: int, offset: int, priority=0,
) -> "Operation":
"""
Creates a new instance of Operation on read mode.
"""
return cls(fd, nbytes, offset, opcode=OpCode.READ, priority=priority)
@classmethod
def write(
cls, payload_bytes: bytes, fd: int, offset: int, priority=0,
) -> "Operation":
"""
Creates a new instance of AIOOperation on write mode.
"""
return cls(
fd,
len(payload_bytes),
offset,
payload=payload_bytes,
opcode=OpCode.WRITE,
priority=priority,
)
@classmethod
def fsync(cls, fd: int, priority=0) -> "Operation":
"""
Creates a new instance of AIOOperation on fsync mode.
"""
return cls(fd, None, None, opcode=OpCode.FSYNC, priority=priority)
@classmethod
def fdsync(cls, fd: int, priority=0) -> "Operation":
"""
Creates a new instance of AIOOperation on fdsync mode.
"""
return cls(fd, None, None, opcode=OpCode.FDSYNC, priority=priority)
def get_value(self) -> Union[bytes, int]:
"""
Method returns a bytes value of AIOOperation's result or None.
"""
if self.exception:
raise self.exception
if self.opcode == OpCode.WRITE:
return self.written
if self.buffer is None:
return
return self.buffer.getvalue()
@property
def fileno(self) -> int:
return self.__fileno
@property
def offset(self) -> int:
return self.__offset
@property
def payload(self) -> Optional[memoryview]:
return self.buffer.getbuffer()
@property
def nbytes(self) -> int:
return self.__nbytes
def set_callback(self, callback: Callable[[int], Any]) -> bool:
self.callback = callback
return True