from __future__ import annotations import os import pathlib import sys from dataclasses import dataclass from functools import partial from os import PathLike from typing import ( IO, TYPE_CHECKING, Any, AnyStr, AsyncIterator, Callable, Generic, Iterable, Iterator, Sequence, cast, overload, ) from .. import to_thread from ..abc import AsyncResource if sys.version_info >= (3, 8): from typing import Final else: from typing_extensions import Final if TYPE_CHECKING: from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer else: ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object class AsyncFile(AsyncResource, Generic[AnyStr]): """ An asynchronous file object. This class wraps a standard file object and provides async friendly versions of the following blocking methods (where available on the original file object): * read * read1 * readline * readlines * readinto * readinto1 * write * writelines * truncate * seek * tell * flush All other methods are directly passed through. This class supports the asynchronous context manager protocol which closes the underlying file at the end of the context block. This class also supports asynchronous iteration:: async with await open_file(...) as f: async for line in f: print(line) """ def __init__(self, fp: IO[AnyStr]) -> None: self._fp: Any = fp def __getattr__(self, name: str) -> object: return getattr(self._fp, name) @property def wrapped(self) -> IO[AnyStr]: """The wrapped file object.""" return self._fp async def __aiter__(self) -> AsyncIterator[AnyStr]: while True: line = await self.readline() if line: yield line else: break async def aclose(self) -> None: return await to_thread.run_sync(self._fp.close) async def read(self, size: int = -1) -> AnyStr: return await to_thread.run_sync(self._fp.read, size) async def read1(self: AsyncFile[bytes], size: int = -1) -> bytes: return await to_thread.run_sync(self._fp.read1, size) async def readline(self) -> AnyStr: return await to_thread.run_sync(self._fp.readline) async def readlines(self) -> list[AnyStr]: return await to_thread.run_sync(self._fp.readlines) async def readinto(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes: return await to_thread.run_sync(self._fp.readinto, b) async def readinto1(self: AsyncFile[bytes], b: WriteableBuffer) -> bytes: return await to_thread.run_sync(self._fp.readinto1, b) @overload async def write(self: AsyncFile[bytes], b: ReadableBuffer) -> int: ... @overload async def write(self: AsyncFile[str], b: str) -> int: ... async def write(self, b: ReadableBuffer | str) -> int: return await to_thread.run_sync(self._fp.write, b) @overload async def writelines( self: AsyncFile[bytes], lines: Iterable[ReadableBuffer] ) -> None: ... @overload async def writelines(self: AsyncFile[str], lines: Iterable[str]) -> None: ... async def writelines(self, lines: Iterable[ReadableBuffer] | Iterable[str]) -> None: return await to_thread.run_sync(self._fp.writelines, lines) async def truncate(self, size: int | None = None) -> int: return await to_thread.run_sync(self._fp.truncate, size) async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int: return await to_thread.run_sync(self._fp.seek, offset, whence) async def tell(self) -> int: return await to_thread.run_sync(self._fp.tell) async def flush(self) -> None: return await to_thread.run_sync(self._fp.flush) @overload async def open_file( file: str | PathLike[str] | int, mode: OpenBinaryMode, buffering: int = ..., encoding: str | None = ..., errors: str | None = ..., newline: str | None = ..., closefd: bool = ..., opener: Callable[[str, int], int] | None = ..., ) -> AsyncFile[bytes]: ... @overload async def open_file( file: str | PathLike[str] | int, mode: OpenTextMode = ..., buffering: int = ..., encoding: str | None = ..., errors: str | None = ..., newline: str | None = ..., closefd: bool = ..., opener: Callable[[str, int], int] | None = ..., ) -> AsyncFile[str]: ... async def open_file( file: str | PathLike[str] | int, mode: str = "r", buffering: int = -1, encoding: str | None = None, errors: str | None = None, newline: str | None = None, closefd: bool = True, opener: Callable[[str, int], int] | None = None, ) -> AsyncFile[Any]: """ Open a file asynchronously. The arguments are exactly the same as for the builtin :func:`open`. :return: an asynchronous file object """ fp = await to_thread.run_sync( open, file, mode, buffering, encoding, errors, newline, closefd, opener ) return AsyncFile(fp) def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]: """ Wrap an existing file as an asynchronous file. :param file: an existing file-like object :return: an asynchronous file object """ return AsyncFile(file) @dataclass(eq=False) class _PathIterator(AsyncIterator["Path"]): iterator: Iterator[PathLike[str]] async def __anext__(self) -> Path: nextval = await to_thread.run_sync(next, self.iterator, None, cancellable=True) if nextval is None: raise StopAsyncIteration from None return Path(cast("PathLike[str]", nextval)) class Path: """ An asynchronous version of :class:`pathlib.Path`. This class cannot be substituted for :class:`pathlib.Path` or :class:`pathlib.PurePath`, but it is compatible with the :class:`os.PathLike` interface. It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for the deprecated :meth:`~pathlib.Path.link_to` method. Any methods that do disk I/O need to be awaited on. These methods are: * :meth:`~pathlib.Path.absolute` * :meth:`~pathlib.Path.chmod` * :meth:`~pathlib.Path.cwd` * :meth:`~pathlib.Path.exists` * :meth:`~pathlib.Path.expanduser` * :meth:`~pathlib.Path.group` * :meth:`~pathlib.Path.hardlink_to` * :meth:`~pathlib.Path.home` * :meth:`~pathlib.Path.is_block_device` * :meth:`~pathlib.Path.is_char_device` * :meth:`~pathlib.Path.is_dir` * :meth:`~pathlib.Path.is_fifo` * :meth:`~pathlib.Path.is_file` * :meth:`~pathlib.Path.is_mount` * :meth:`~pathlib.Path.lchmod` * :meth:`~pathlib.Path.lstat` * :meth:`~pathlib.Path.mkdir` * :meth:`~pathlib.Path.open` * :meth:`~pathlib.Path.owner` * :meth:`~pathlib.Path.read_bytes` * :meth:`~pathlib.Path.read_text` * :meth:`~pathlib.Path.readlink` * :meth:`~pathlib.Path.rename` * :meth:`~pathlib.Path.replace` * :meth:`~pathlib.Path.rmdir` * :meth:`~pathlib.Path.samefile` * :meth:`~pathlib.Path.stat` * :meth:`~pathlib.Path.touch` * :meth:`~pathlib.Path.unlink` * :meth:`~pathlib.Path.write_bytes` * :meth:`~pathlib.Path.write_text` Additionally, the following methods return an async iterator yielding :class:`~.Path` objects: * :meth:`~pathlib.Path.glob` * :meth:`~pathlib.Path.iterdir` * :meth:`~pathlib.Path.rglob` """ __slots__ = "_path", "__weakref__" __weakref__: Any def __init__(self, *args: str | PathLike[str]) -> None: self._path: Final[pathlib.Path] = pathlib.Path(*args) def __fspath__(self) -> str: return self._path.__fspath__() def __str__(self) -> str: return self._path.__str__() def __repr__(self) -> str: return f"{self.__class__.__name__}({self.as_posix()!r})" def __bytes__(self) -> bytes: return self._path.__bytes__() def __hash__(self) -> int: return self._path.__hash__() def __eq__(self, other: object) -> bool: target = other._path if isinstance(other, Path) else other return self._path.__eq__(target) def __lt__(self, other: Path) -> bool: target = other._path if isinstance(other, Path) else other return self._path.__lt__(target) def __le__(self, other: Path) -> bool: target = other._path if isinstance(other, Path) else other return self._path.__le__(target) def __gt__(self, other: Path) -> bool: target = other._path if isinstance(other, Path) else other return self._path.__gt__(target) def __ge__(self, other: Path) -> bool: target = other._path if isinstance(other, Path) else other return self._path.__ge__(target) def __truediv__(self, other: Any) -> Path: return Path(self._path / other) def __rtruediv__(self, other: Any) -> Path: return Path(other) / self @property def parts(self) -> tuple[str, ...]: return self._path.parts @property def drive(self) -> str: return self._path.drive @property def root(self) -> str: return self._path.root @property def anchor(self) -> str: return self._path.anchor @property def parents(self) -> Sequence[Path]: return tuple(Path(p) for p in self._path.parents) @property def parent(self) -> Path: return Path(self._path.parent) @property def name(self) -> str: return self._path.name @property def suffix(self) -> str: return self._path.suffix @property def suffixes(self) -> list[str]: return self._path.suffixes @property def stem(self) -> str: return self._path.stem async def absolute(self) -> Path: path = await to_thread.run_sync(self._path.absolute) return Path(path) def as_posix(self) -> str: return self._path.as_posix() def as_uri(self) -> str: return self._path.as_uri() def match(self, path_pattern: str) -> bool: return self._path.match(path_pattern) def is_relative_to(self, *other: str | PathLike[str]) -> bool: try: self.relative_to(*other) return True except ValueError: return False async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: func = partial(os.chmod, follow_symlinks=follow_symlinks) return await to_thread.run_sync(func, self._path, mode) @classmethod async def cwd(cls) -> Path: path = await to_thread.run_sync(pathlib.Path.cwd) return cls(path) async def exists(self) -> bool: return await to_thread.run_sync(self._path.exists, cancellable=True) async def expanduser(self) -> Path: return Path(await to_thread.run_sync(self._path.expanduser, cancellable=True)) def glob(self, pattern: str) -> AsyncIterator[Path]: gen = self._path.glob(pattern) return _PathIterator(gen) async def group(self) -> str: return await to_thread.run_sync(self._path.group, cancellable=True) async def hardlink_to(self, target: str | pathlib.Path | Path) -> None: if isinstance(target, Path): target = target._path await to_thread.run_sync(os.link, target, self) @classmethod async def home(cls) -> Path: home_path = await to_thread.run_sync(pathlib.Path.home) return cls(home_path) def is_absolute(self) -> bool: return self._path.is_absolute() async def is_block_device(self) -> bool: return await to_thread.run_sync(self._path.is_block_device, cancellable=True) async def is_char_device(self) -> bool: return await to_thread.run_sync(self._path.is_char_device, cancellable=True) async def is_dir(self) -> bool: return await to_thread.run_sync(self._path.is_dir, cancellable=True) async def is_fifo(self) -> bool: return await to_thread.run_sync(self._path.is_fifo, cancellable=True) async def is_file(self) -> bool: return await to_thread.run_sync(self._path.is_file, cancellable=True) async def is_mount(self) -> bool: return await to_thread.run_sync(os.path.ismount, self._path, cancellable=True) def is_reserved(self) -> bool: return self._path.is_reserved() async def is_socket(self) -> bool: return await to_thread.run_sync(self._path.is_socket, cancellable=True) async def is_symlink(self) -> bool: return await to_thread.run_sync(self._path.is_symlink, cancellable=True) def iterdir(self) -> AsyncIterator[Path]: gen = self._path.iterdir() return _PathIterator(gen) def joinpath(self, *args: str | PathLike[str]) -> Path: return Path(self._path.joinpath(*args)) async def lchmod(self, mode: int) -> None: await to_thread.run_sync(self._path.lchmod, mode) async def lstat(self) -> os.stat_result: return await to_thread.run_sync(self._path.lstat, cancellable=True) async def mkdir( self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False ) -> None: await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok) @overload async def open( self, mode: OpenBinaryMode, buffering: int = ..., encoding: str | None = ..., errors: str | None = ..., newline: str | None = ..., ) -> AsyncFile[bytes]: ... @overload async def open( self, mode: OpenTextMode = ..., buffering: int = ..., encoding: str | None = ..., errors: str | None = ..., newline: str | None = ..., ) -> AsyncFile[str]: ... async def open( self, mode: str = "r", buffering: int = -1, encoding: str | None = None, errors: str | None = None, newline: str | None = None, ) -> AsyncFile[Any]: fp = await to_thread.run_sync( self._path.open, mode, buffering, encoding, errors, newline ) return AsyncFile(fp) async def owner(self) -> str: return await to_thread.run_sync(self._path.owner, cancellable=True) async def read_bytes(self) -> bytes: return await to_thread.run_sync(self._path.read_bytes) async def read_text( self, encoding: str | None = None, errors: str | None = None ) -> str: return await to_thread.run_sync(self._path.read_text, encoding, errors) def relative_to(self, *other: str | PathLike[str]) -> Path: return Path(self._path.relative_to(*other)) async def readlink(self) -> Path: target = await to_thread.run_sync(os.readlink, self._path) return Path(cast(str, target)) async def rename(self, target: str | pathlib.PurePath | Path) -> Path: if isinstance(target, Path): target = target._path await to_thread.run_sync(self._path.rename, target) return Path(target) async def replace(self, target: str | pathlib.PurePath | Path) -> Path: if isinstance(target, Path): target = target._path await to_thread.run_sync(self._path.replace, target) return Path(target) async def resolve(self, strict: bool = False) -> Path: func = partial(self._path.resolve, strict=strict) return Path(await to_thread.run_sync(func, cancellable=True)) def rglob(self, pattern: str) -> AsyncIterator[Path]: gen = self._path.rglob(pattern) return _PathIterator(gen) async def rmdir(self) -> None: await to_thread.run_sync(self._path.rmdir) async def samefile( self, other_path: str | bytes | int | pathlib.Path | Path ) -> bool: if isinstance(other_path, Path): other_path = other_path._path return await to_thread.run_sync( self._path.samefile, other_path, cancellable=True ) async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result: func = partial(os.stat, follow_symlinks=follow_symlinks) return await to_thread.run_sync(func, self._path, cancellable=True) async def symlink_to( self, target: str | pathlib.Path | Path, target_is_directory: bool = False, ) -> None: if isinstance(target, Path): target = target._path await to_thread.run_sync(self._path.symlink_to, target, target_is_directory) async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: await to_thread.run_sync(self._path.touch, mode, exist_ok) async def unlink(self, missing_ok: bool = False) -> None: try: await to_thread.run_sync(self._path.unlink) except FileNotFoundError: if not missing_ok: raise def with_name(self, name: str) -> Path: return Path(self._path.with_name(name)) def with_stem(self, stem: str) -> Path: return Path(self._path.with_name(stem + self._path.suffix)) def with_suffix(self, suffix: str) -> Path: return Path(self._path.with_suffix(suffix)) async def write_bytes(self, data: bytes) -> int: return await to_thread.run_sync(self._path.write_bytes, data) async def write_text( self, data: str, encoding: str | None = None, errors: str | None = None, newline: str | None = None, ) -> int: # Path.write_text() does not support the "newline" parameter before Python 3.10 def sync_write_text() -> int: with self._path.open( "w", encoding=encoding, errors=errors, newline=newline ) as fp: return fp.write(data) return await to_thread.run_sync(sync_write_text) PathLike.register(Path)