yerang's picture
Upload 1110 files
e3af00f verified
raw
history blame
10.1 kB
# Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Triton Inference Server class.
Use to start and maintain the Triton Inference Server process.
Examples of use:
server = TritonServer(
path="/path/to/tritonserver/binary",
libs_path="/path/to/tritonserver/libraries",
config=TritonServerConfig()
)
server.start()
"""
import ctypes.util
import importlib
import json
import logging
import os
import pathlib
import signal
import sys
import threading
import traceback
from typing import Callable, Dict, Literal, Optional, Sequence, Union
from pytriton.utils.logging import silence_3rd_party_loggers
from ..utils import endpoint_utils
from .triton_server_config import TritonServerConfig
LOGGER = logging.getLogger(__name__)
SERVER_OUTPUT_TIMEOUT_SECS = 30
_PROXY_REQUIRED_MODULES = ["numpy", "zmq"]
_PYTRITON_STARTED_IN_PY310 = (3, 10) <= sys.version_info < (3, 11)
silence_3rd_party_loggers()
def get_triton_python_backend_python_env() -> pathlib.Path:
"""Get the path to the python environment for the triton python backend.
Officially built python backend is built with python 3.8 so need to
use the same python version to run the python backend.
Also, python environment should contain packages required by the proxy.
Returns:
Path to the python environment with python 3.8
"""
env_path = pathlib.Path(sys.exec_prefix)
installed_modules = []
missing_modules = []
for module_name in _PROXY_REQUIRED_MODULES:
try:
importlib.import_module(module_name)
installed_modules.append(module_name)
except ImportError:
missing_modules.append(module_name)
if missing_modules:
raise RuntimeError(
"Python environment for python backend is missing required packages. "
f"Ensure that you have {', '.join(_PROXY_REQUIRED_MODULES)} installed in the {env_path} environment. "
f"Installed modules {', '.join(installed_modules)}. Missing modules {', '.join(missing_modules)}."
)
return env_path
class TritonServer:
"""Implementation of TritonServer interface that runs tritonserver locally as subprocess."""
def __init__(
self,
*,
path: Union[str, pathlib.Path],
libs_path: Union[str, pathlib.Path],
config: TritonServerConfig,
gpus: Optional[Sequence[int]] = None,
verbose: bool = True,
):
"""Triton server constructor.
Args:
path: The absolute path to the tritonserver executable
libs_path: The absolute path to the tritonserver libraries
config: The config object containing arguments for this server instance
gpus: sequence of GPUs device ids to attach to process
verbose: Enable verbose logging of server to STDOUT
"""
self._server_path = pathlib.Path(path)
self._server_libs_path = pathlib.Path(libs_path)
self._server_config = config
self._gpus = gpus
self._tritonserver_running_cmd = None
self._tritonserver_logs = ""
self._verbose = verbose
self._on_exit_lock = threading.RLock()
self._on_exit = []
assert self._server_config["model-repository"], "Triton Server requires --model-repository argument to be set."
def start(self) -> None:
"""Starts the tritonserver process.
The method can be executed multiple times and only single process is started.
"""
if self.is_alive():
raise RuntimeError(
f"You have to stop previously started tritonserver process first "
f"pid={self._tritonserver_running_cmd.pid}"
)
else:
env = self._get_env()
LOGGER.debug(f"Triton Server binary {self._server_path}. Environment:\n{json.dumps(env, indent=4)}")
tritonserver_cmd, *rest = self._server_path.as_posix().split(" ", 1)
import sh
tritonserver_cmd = sh.Command(tritonserver_cmd)
tritonserver_cmd = tritonserver_cmd.bake(*rest)
tritonserver_args = self._server_config.to_args_list()
def _preexec_fn():
PR_SET_PDEATHSIG = 1 # noqa
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
libc.prctl(PR_SET_PDEATHSIG, signal.SIGTERM)
self._tritonserver_logs = ""
self._tritonserver_running_cmd = tritonserver_cmd(
*tritonserver_args,
_env=env,
_err_to_out=True,
_out=self._record_logs,
_out_bufsize=0,
_err_bufsize=0,
_bg=True,
_bg_exc=False,
_done=self._handle_exit,
_preexec_fn=_preexec_fn,
)
def stop(self) -> None:
"""Send the SIGINT signal to running process and wait until server finished."""
if self.is_alive():
LOGGER.debug(
f"Stopping Triton Inference server - sending SIGINT signal and wait {SERVER_OUTPUT_TIMEOUT_SECS}s"
)
self._tritonserver_running_cmd.process.signal(signal.SIGINT)
try:
LOGGER.debug("Waiting for process to stop.")
self._tritonserver_running_cmd.wait(timeout=SERVER_OUTPUT_TIMEOUT_SECS)
except Exception:
message = traceback.format_exc()
LOGGER.debug(f"Error message: \n{message}")
try:
if self.is_alive():
LOGGER.debug("Timeout waiting for server. Trying to kill process.")
self._tritonserver_running_cmd.process.kill()
self._tritonserver_running_cmd.wait(timeout=SERVER_OUTPUT_TIMEOUT_SECS)
except Exception:
LOGGER.debug(f"Could not kill triton server pid={self._tritonserver_running_cmd.pid}")
message = traceback.format_exc()
LOGGER.debug(f"Error message: \n{message}")
def register_on_exit(self, callback: Callable) -> None:
"""Register callback executed on process exit.
Args:
callback: callable to register in callbacks
"""
with self._on_exit_lock:
self._on_exit.append(callback)
def unregister_on_exit(self, callback: Callable) -> None:
"""Unregister callback executed on process exit.
Args:
callback: callable to unregister from callbacks
"""
with self._on_exit_lock:
self._on_exit.remove(callback)
def is_alive(self) -> bool:
"""Verify if server is currently running.
Returns:
True when server is running, False otherwise
"""
return self._tritonserver_running_cmd is not None and self._tritonserver_running_cmd.is_alive()
def logs(self) -> str:
"""Return the server logs of running server.
Returns:
String with capture logs
"""
return self._tritonserver_logs
def get_endpoint(self, endpoint: Literal["http", "grpc", "metrics"]) -> str:
"""Get endpoint url.
Args:
endpoint: endpoint name
Returns:
endpoint url in form of {protocol}://{host}:{port}
"""
return endpoint_utils.get_endpoint(self._server_config, endpoint)
def _record_logs(self, line: Union[bytes, str]) -> None:
"""Record logs obtained from server process. If verbose logging enabled, print the log into STDOUT.
Args:
line: Log line obtained from server
"""
if isinstance(line, bytes):
line = line.decode("utf-8", errors="replace")
if self._verbose:
print(line, end="") # noqa: T201
self._tritonserver_logs += line
def _get_env(self) -> Dict:
"""Create and return environment variables for server execution.
Returns:
Dict with environment variables
"""
env = os.environ.copy()
if self._gpus and isinstance(self._gpus, (list, tuple)):
env["CUDA_VISIBLE_DEVICES"] = ",".join([str(gpu) for gpu in self._gpus])
if "LD_LIBRARY_PATH" in env:
env["LD_LIBRARY_PATH"] += ":" + self._server_libs_path.as_posix()
else:
env["LD_LIBRARY_PATH"] = self._server_libs_path.as_posix()
env_path = get_triton_python_backend_python_env()
python_bin_directory = env_path / "bin"
env["PATH"] = f"{python_bin_directory.as_posix()}:{env['PATH']}"
return env
def _handle_exit(self, _, success, exit_code) -> None:
"""Handle exit of server process. Trigger callbacks if provided.
Args:
success: Flag indicating if process succeeded or failed
exit_code: Exit code with which server process finished
"""
if not success:
LOGGER.warning("Triton Inference Server exited with failure. Please wait.")
LOGGER.debug(f"Triton Inference Server exit code {exit_code}")
else:
LOGGER.debug("Triton Inference Server stopped")
with self._on_exit_lock:
for callback in self._on_exit:
try:
callback(success, exit_code)
except Exception as e:
LOGGER.debug(f"Error during calling on_exit callback; {e}")