File size: 4,977 Bytes
b6068b4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
from __future__ import annotations
from io import BytesIO
from os import PathLike
from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess
from typing import (
IO,
Any,
AsyncIterable,
Mapping,
Sequence,
cast,
)
from ..abc import Process
from ._eventloop import get_asynclib
from ._tasks import create_task_group
async def run_process(
command: str | bytes | Sequence[str | bytes],
*,
input: bytes | None = None,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
check: bool = True,
cwd: str | bytes | PathLike[str] | None = None,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
) -> CompletedProcess[bytes]:
"""
Run an external command in a subprocess and wait until it completes.
.. seealso:: :func:`subprocess.run`
:param command: either a string to pass to the shell, or an iterable of strings containing the
executable name or path and its arguments
:param input: bytes passed to the standard input of the subprocess
:param stdout: either :data:`subprocess.PIPE` or :data:`subprocess.DEVNULL`
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL` or
:data:`subprocess.STDOUT`
:param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the process
terminates with a return code other than 0
:param cwd: If not ``None``, change the working directory to this before running the command
:param env: if not ``None``, this mapping replaces the inherited environment variables from the
parent process
:param start_new_session: if ``true`` the setsid() system call will be made in the child
process prior to the execution of the subprocess. (POSIX only)
:return: an object representing the completed process
:raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process exits with a
nonzero return code
"""
async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
buffer = BytesIO()
async for chunk in stream:
buffer.write(chunk)
stream_contents[index] = buffer.getvalue()
async with await open_process(
command,
stdin=PIPE if input else DEVNULL,
stdout=stdout,
stderr=stderr,
cwd=cwd,
env=env,
start_new_session=start_new_session,
) as process:
stream_contents: list[bytes | None] = [None, None]
try:
async with create_task_group() as tg:
if process.stdout:
tg.start_soon(drain_stream, process.stdout, 0)
if process.stderr:
tg.start_soon(drain_stream, process.stderr, 1)
if process.stdin and input:
await process.stdin.send(input)
await process.stdin.aclose()
await process.wait()
except BaseException:
process.kill()
raise
output, errors = stream_contents
if check and process.returncode != 0:
raise CalledProcessError(cast(int, process.returncode), command, output, errors)
return CompletedProcess(command, cast(int, process.returncode), output, errors)
async def open_process(
command: str | bytes | Sequence[str | bytes],
*,
stdin: int | IO[Any] | None = PIPE,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
cwd: str | bytes | PathLike[str] | None = None,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
) -> Process:
"""
Start an external command in a subprocess.
.. seealso:: :class:`subprocess.Popen`
:param command: either a string to pass to the shell, or an iterable of strings containing the
executable name or path and its arguments
:param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a
file-like object, or ``None``
:param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
a file-like object, or ``None``
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`,
:data:`subprocess.STDOUT`, a file-like object, or ``None``
:param cwd: If not ``None``, the working directory is changed before executing
:param env: If env is not ``None``, it must be a mapping that defines the environment
variables for the new process
:param start_new_session: if ``true`` the setsid() system call will be made in the child
process prior to the execution of the subprocess. (POSIX only)
:return: an asynchronous process object
"""
shell = isinstance(command, str)
return await get_asynclib().open_process(
command,
shell=shell,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd,
env=env,
start_new_session=start_new_session,
)
|