|
from __future__ import annotations |
|
|
|
from io import BytesIO |
|
from os import PathLike |
|
from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess |
|
from typing import ( |
|
IO, |
|
Any, |
|
AsyncIterable, |
|
Mapping, |
|
Sequence, |
|
cast, |
|
) |
|
|
|
from ..abc import Process |
|
from ._eventloop import get_asynclib |
|
from ._tasks import create_task_group |
|
|
|
|
|
async def run_process( |
|
command: str | bytes | Sequence[str | bytes], |
|
*, |
|
input: bytes | None = None, |
|
stdout: int | IO[Any] | None = PIPE, |
|
stderr: int | IO[Any] | None = PIPE, |
|
check: bool = True, |
|
cwd: str | bytes | PathLike[str] | None = None, |
|
env: Mapping[str, str] | None = None, |
|
start_new_session: bool = False, |
|
) -> CompletedProcess[bytes]: |
|
""" |
|
Run an external command in a subprocess and wait until it completes. |
|
|
|
.. seealso:: :func:`subprocess.run` |
|
|
|
:param command: either a string to pass to the shell, or an iterable of strings containing the |
|
executable name or path and its arguments |
|
:param input: bytes passed to the standard input of the subprocess |
|
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL` |
|
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or |
|
:data:`subprocess.STDOUT` |
|
:param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process |
|
terminates with a return code other than 0 |
|
:param cwd: If not ``None``, change the working directory to this before running the command |
|
:param env: if not ``None``, this mapping replaces the inherited environment variables from the |
|
parent process |
|
:param start_new_session: if ``true`` the setsid() system call will be made in the child |
|
process prior to the execution of the subprocess. (POSIX only) |
|
:return: an object representing the completed process |
|
:raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process exits with a |
|
nonzero return code |
|
|
|
""" |
|
|
|
async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None: |
|
buffer = BytesIO() |
|
async for chunk in stream: |
|
buffer.write(chunk) |
|
|
|
stream_contents[index] = buffer.getvalue() |
|
|
|
async with await open_process( |
|
command, |
|
stdin=PIPE if input else DEVNULL, |
|
stdout=stdout, |
|
stderr=stderr, |
|
cwd=cwd, |
|
env=env, |
|
start_new_session=start_new_session, |
|
) as process: |
|
stream_contents: list[bytes | None] = [None, None] |
|
try: |
|
async with create_task_group() as tg: |
|
if process.stdout: |
|
tg.start_soon(drain_stream, process.stdout, 0) |
|
if process.stderr: |
|
tg.start_soon(drain_stream, process.stderr, 1) |
|
if process.stdin and input: |
|
await process.stdin.send(input) |
|
await process.stdin.aclose() |
|
|
|
await process.wait() |
|
except BaseException: |
|
process.kill() |
|
raise |
|
|
|
output, errors = stream_contents |
|
if check and process.returncode != 0: |
|
raise CalledProcessError(cast(int, process.returncode), command, output, errors) |
|
|
|
return CompletedProcess(command, cast(int, process.returncode), output, errors) |
|
|
|
|
|
async def open_process( |
|
command: str | bytes | Sequence[str | bytes], |
|
*, |
|
stdin: int | IO[Any] | None = PIPE, |
|
stdout: int | IO[Any] | None = PIPE, |
|
stderr: int | IO[Any] | None = PIPE, |
|
cwd: str | bytes | PathLike[str] | None = None, |
|
env: Mapping[str, str] | None = None, |
|
start_new_session: bool = False, |
|
) -> Process: |
|
""" |
|
Start an external command in a subprocess. |
|
|
|
.. seealso:: :class:`subprocess.Popen` |
|
|
|
:param command: either a string to pass to the shell, or an iterable of strings containing the |
|
executable name or path and its arguments |
|
:param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a |
|
file-like object, or ``None`` |
|
:param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, |
|
a file-like object, or ``None`` |
|
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, |
|
:data:`subprocess.STDOUT`, a file-like object, or ``None`` |
|
:param cwd: If not ``None``, the working directory is changed before executing |
|
:param env: If env is not ``None``, it must be a mapping that defines the environment |
|
variables for the new process |
|
:param start_new_session: if ``true`` the setsid() system call will be made in the child |
|
process prior to the execution of the subprocess. (POSIX only) |
|
:return: an asynchronous process object |
|
|
|
""" |
|
shell = isinstance(command, str) |
|
return await get_asynclib().open_process( |
|
command, |
|
shell=shell, |
|
stdin=stdin, |
|
stdout=stdout, |
|
stderr=stderr, |
|
cwd=cwd, |
|
env=env, |
|
start_new_session=start_new_session, |
|
) |
|
|