Spaces:
Runtime error
Runtime error
from __future__ import annotations | |
from collections.abc import AsyncIterable, Mapping, Sequence | |
from io import BytesIO | |
from os import PathLike | |
from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess | |
from typing import IO, Any, cast | |
from ..abc import Process | |
from ._eventloop import get_async_backend | |
from ._tasks import create_task_group | |
async def run_process( | |
command: str | bytes | Sequence[str | bytes], | |
*, | |
input: bytes | None = None, | |
stdout: int | IO[Any] | None = PIPE, | |
stderr: int | IO[Any] | None = PIPE, | |
check: bool = True, | |
cwd: str | bytes | PathLike[str] | None = None, | |
env: Mapping[str, str] | None = None, | |
start_new_session: bool = False, | |
) -> CompletedProcess[bytes]: | |
""" | |
Run an external command in a subprocess and wait until it completes. | |
.. seealso:: :func:`subprocess.run` | |
:param command: either a string to pass to the shell, or an iterable of strings | |
containing the executable name or path and its arguments | |
:param input: bytes passed to the standard input of the subprocess | |
:param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
a file-like object, or `None` | |
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
:data:`subprocess.STDOUT`, a file-like object, or `None` | |
:param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the | |
process terminates with a return code other than 0 | |
:param cwd: If not ``None``, change the working directory to this before running the | |
command | |
:param env: if not ``None``, this mapping replaces the inherited environment | |
variables from the parent process | |
:param start_new_session: if ``true`` the setsid() system call will be made in the | |
child process prior to the execution of the subprocess. (POSIX only) | |
:return: an object representing the completed process | |
:raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process | |
exits with a nonzero return code | |
""" | |
async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None: | |
buffer = BytesIO() | |
async for chunk in stream: | |
buffer.write(chunk) | |
stream_contents[index] = buffer.getvalue() | |
async with await open_process( | |
command, | |
stdin=PIPE if input else DEVNULL, | |
stdout=stdout, | |
stderr=stderr, | |
cwd=cwd, | |
env=env, | |
start_new_session=start_new_session, | |
) as process: | |
stream_contents: list[bytes | None] = [None, None] | |
try: | |
async with create_task_group() as tg: | |
if process.stdout: | |
tg.start_soon(drain_stream, process.stdout, 0) | |
if process.stderr: | |
tg.start_soon(drain_stream, process.stderr, 1) | |
if process.stdin and input: | |
await process.stdin.send(input) | |
await process.stdin.aclose() | |
await process.wait() | |
except BaseException: | |
process.kill() | |
raise | |
output, errors = stream_contents | |
if check and process.returncode != 0: | |
raise CalledProcessError(cast(int, process.returncode), command, output, errors) | |
return CompletedProcess(command, cast(int, process.returncode), output, errors) | |
async def open_process( | |
command: str | bytes | Sequence[str | bytes], | |
*, | |
stdin: int | IO[Any] | None = PIPE, | |
stdout: int | IO[Any] | None = PIPE, | |
stderr: int | IO[Any] | None = PIPE, | |
cwd: str | bytes | PathLike[str] | None = None, | |
env: Mapping[str, str] | None = None, | |
start_new_session: bool = False, | |
) -> Process: | |
""" | |
Start an external command in a subprocess. | |
.. seealso:: :class:`subprocess.Popen` | |
:param command: either a string to pass to the shell, or an iterable of strings | |
containing the executable name or path and its arguments | |
:param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a | |
file-like object, or ``None`` | |
:param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
a file-like object, or ``None`` | |
:param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
:data:`subprocess.STDOUT`, a file-like object, or ``None`` | |
:param cwd: If not ``None``, the working directory is changed before executing | |
:param env: If env is not ``None``, it must be a mapping that defines the | |
environment variables for the new process | |
:param start_new_session: if ``true`` the setsid() system call will be made in the | |
child process prior to the execution of the subprocess. (POSIX only) | |
:return: an asynchronous process object | |
""" | |
if isinstance(command, (str, bytes)): | |
return await get_async_backend().open_process( | |
command, | |
shell=True, | |
stdin=stdin, | |
stdout=stdout, | |
stderr=stderr, | |
cwd=cwd, | |
env=env, | |
start_new_session=start_new_session, | |
) | |
else: | |
return await get_async_backend().open_process( | |
command, | |
shell=False, | |
stdin=stdin, | |
stdout=stdout, | |
stderr=stderr, | |
cwd=cwd, | |
env=env, | |
start_new_session=start_new_session, | |
) | |