Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import contextlib | |
| import fcntl | |
| import logging | |
| import os | |
| import pathlib | |
| import re | |
| import select | |
| import socket | |
| import subprocess | |
| import threading | |
| import typing | |
| LOGGER = logging.getLogger(__name__) | |
| DEFAULT_LOG_FORMAT = "%(asctime)s - %(levelname)8s - %(process)8d - %(threadName)s - %(name)s: %(message)s" | |
| def _read_outputs(_process, _logger, _outputs): | |
| # Set stdout and stderr file descriptors to non-blocking mode | |
| try: | |
| fcntl.fcntl(_process.stdout, fcntl.F_SETFL, os.O_NONBLOCK) | |
| fcntl.fcntl(_process.stderr, fcntl.F_SETFL, os.O_NONBLOCK) | |
| except ValueError: # when selecting on closed files | |
| return | |
| buffers = {_process.stdout: "", _process.stderr: ""} | |
| rds = [_process.stdout, _process.stderr] | |
| while rds: | |
| try: | |
| readable, _, _ = select.select(rds, [], [], 1) | |
| except ValueError: # when selecting on closed files | |
| break | |
| for rd in readable: | |
| try: | |
| data = os.read(rd.fileno(), 4096) | |
| if not data: | |
| rds.remove(rd) | |
| continue | |
| decoded_data = data.decode("utf-8") | |
| buffers[rd] += decoded_data | |
| lines = buffers[rd].splitlines(keepends=True) | |
| if buffers[rd].endswith("\n"): | |
| complete_lines = lines | |
| buffers[rd] = "" | |
| else: | |
| complete_lines = lines[:-1] | |
| buffers[rd] = lines[-1] | |
| for line in complete_lines: | |
| line = line.rstrip() | |
| _logger.info(line) | |
| _outputs.append(line) | |
| except OSError: # Reading from an empty non-blocking file | |
| pass | |
| class ScriptThread(threading.Thread): | |
| def __init__(self, cmd, workdir=None, group=None, target=None, name=None, args=(), kwargs=None) -> None: | |
| super().__init__(group, target, name, args, kwargs, daemon=True) | |
| self.cmd = cmd | |
| self.workdir = workdir | |
| self._process_spawned_or_spawn_error_flag = None | |
| self.active = False | |
| self._process = None | |
| self.returncode = None | |
| self._output = [] | |
| self._logger = logging.getLogger(self.name) | |
| def __enter__(self): | |
| self.start(threading.Event()) | |
| self._process_spawned_or_spawn_error_flag.wait() | |
| return self | |
| def __exit__(self, *args): | |
| self.stop() | |
| self.join() | |
| self._process_spawned_or_spawn_error_flag = None | |
| def start(self, flag: typing.Optional[threading.Event] = None) -> None: | |
| if flag is None: | |
| flag = threading.Event() | |
| self._logger.info(f"Starting {self.name} script with \"{' '.join(self.cmd)}\" cmd") | |
| self._process_spawned_or_spawn_error_flag = flag | |
| super().start() | |
| def stop(self): | |
| self._logger.info(f"Stopping {self.name} script") | |
| self.active = False | |
| def run(self): | |
| import psutil | |
| self.returncode = None | |
| self._output = [] | |
| self._process = None | |
| os.environ.setdefault("PYTHONUNBUFFERED", "1") # to not buffer logs | |
| try: | |
| with psutil.Popen( | |
| self.cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, cwd=self.workdir | |
| ) as process: | |
| self._process = process | |
| self.active = True | |
| if self._process_spawned_or_spawn_error_flag: | |
| self._process_spawned_or_spawn_error_flag.set() | |
| while self.active and process.poll() is None and process.returncode is None: | |
| try: | |
| _read_outputs(process, self._logger, self._output) | |
| except KeyboardInterrupt: | |
| self.stop() | |
| finally: | |
| if self._process_spawned_or_spawn_error_flag: | |
| self._process_spawned_or_spawn_error_flag.set() | |
| if self.process: | |
| while self.process.poll() is None: | |
| _read_outputs(self.process, self._logger, self._output) | |
| _read_outputs(self.process, self._logger, self._output) | |
| self.returncode = process.wait() # pytype: disable=name-error | |
| self._logger.info(f"{self.name} process finished with {self.returncode}") | |
| self.active = False | |
| self._process = None | |
| def output(self): | |
| return "\n".join(self._output) | |
| def process(self): | |
| return self._process | |
| def find_free_port() -> int: | |
| with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: | |
| s.bind(("", 0)) | |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| return s.getsockname()[1] | |
| class ProcessMonitoring: | |
| """A class that dumps the state of a process and its children. | |
| This class uses the py-spy tool to dump the stack trace of a process and its | |
| children recursively. It also dumps the process information such as the parent | |
| and the command line. It allows registering custom monitors that can perform | |
| additional actions on the process. | |
| Attributes: | |
| _logger (logging.Logger): The logger object to write messages. | |
| _process (psutil.Process): The process object to monitor. | |
| _children_processes (list[psutil.Process]): The list of child processes to monitor. | |
| _log (logging.Logger.method): The logging method to use for messages. | |
| _remove_color (bool): Whether to remove ANSI escape sequences from the output. | |
| _ansi_escape (re.Pattern): The regular expression object to match ANSI escape sequences. | |
| _custom_monitors (list[typing.Callable[[int], None]]): The list of custom monitor functions to execute on each dump cycle. | |
| """ | |
| def __init__( | |
| self, | |
| pid: int, | |
| logger: typing.Optional[logging.Logger] = None, | |
| loglevel: int = logging.INFO, | |
| remove_color: bool = False, | |
| ): | |
| """Initializes the ProcessMonitoring object. | |
| Args: | |
| pid (int): The process ID of the process to monitor. | |
| logger (typing.Optional[logging.Logger], optional): The logger object to write messages. Defaults to None. | |
| loglevel (int, optional): The logging level to use for messages. Defaults to logging.INFO. | |
| remove_color (bool, optional): Whether to remove ANSI escape sequences from the output. Defaults to False. | |
| """ | |
| import re | |
| import psutil | |
| self._logger = logger or logging.getLogger("monitoring") | |
| self._process = psutil.Process(pid) | |
| self._children_processes = list(self._process.children(recursive=True)) | |
| self._log = { | |
| logging.DEBUG: self._logger.debug, | |
| logging.INFO: self._logger.info, | |
| logging.WARNING: self._logger.warning, | |
| logging.ERROR: self._logger.error, | |
| }[loglevel] | |
| self._log(f"Initial list of children processes: {self._children_processes}") | |
| self._remove_color = remove_color | |
| pattern = r"\x1b\[.*?m" | |
| self._ansi_escape = re.compile(pattern) | |
| self._custom_monitors = [] | |
| def register_custom_monitor(self, custom_monitor: typing.Callable[[int], None]) -> None: | |
| """Registers a custom monitor for the process. | |
| This method adds a custom monitor function to the list of monitors that are | |
| executed on each dump cycle. A custom monitor function should take an integer | |
| as an argument (the process ID) and return None. | |
| Args: | |
| custom_monitor (typing.Callable[[int], None]): The custom monitor function to register. | |
| """ | |
| self._custom_monitors.append(custom_monitor) | |
| def dump_state(self) -> None: | |
| """Dumps the state of the process and its children. | |
| This method calls the _dump_processes_stacktrace and _dump_child_processes | |
| methods to dump the stack trace and the process information of the process | |
| and its children recursively. | |
| """ | |
| self._dump_processes_stacktrace() | |
| self._dump_child_processes() | |
| def _dump_processes_stacktrace(self): | |
| import psutil | |
| import sh | |
| self._log("==== Dump process stacktrace") | |
| pyspy_cmd = sh.Command("py-spy") | |
| for process in [self._process] + self.children: | |
| try: | |
| result = pyspy_cmd("dump", "-ll", "--nonblocking", "-p", str(process.pid)) | |
| if self._remove_color: | |
| result = self._ansi_escape.sub("", str(result)) | |
| self._log(f"Dump stack trace for process (pid={process.pid}) with cmd {process.cmdline()}") | |
| for custom_monitor in self._custom_monitors: | |
| custom_monitor(process.pid) | |
| self._log(result) | |
| except psutil.NoSuchProcess as e: | |
| self._log(f"Error during handling process: {e}") | |
| except sh.ErrorReturnCode_1 as e: | |
| self._log(f"Error during calling py-spy process: {e}") | |
| def _dump_child_processes(self): | |
| import psutil | |
| self._log("==== Dump process info (with its children)") | |
| for process in [self._process] + self.children: | |
| try: | |
| self._log(f"{process} parent={process.parent()} ") | |
| except psutil.NoSuchProcess: | |
| self._log(f"{process} is missing in process table") | |
| def children(self): | |
| """Returns the list of child processes to monitor. | |
| This property returns the list of child processes to monitor, and updates it | |
| with any new children that are created by the process. | |
| Returns: | |
| list[psutil.Process]: The list of child processes to monitor. | |
| """ | |
| import psutil | |
| try: | |
| children = list(self._process.children(recursive=True)) | |
| self._children_processes = list(set(self._children_processes + children)) | |
| except psutil.NoSuchProcess: | |
| pass | |
| return self._children_processes | |
| def get_current_container_version(): | |
| container_version = os.environ.get("NVIDIA_PYTORCH_VERSION") or os.environ.get("NVIDIA_TENSORFLOW_VERSION") | |
| if container_version and "-" in container_version: | |
| container_version = container_version.split("-")[0] # TF version has format <year_month_version>-<tf_version> | |
| return container_version | |
| def verify_docker_image_in_readme_same_as_tested(readme_path, image_name_with_version): | |
| image_name, image_version = image_name_with_version.split(":") | |
| framework_name = image_name.split("/")[-1] | |
| readme_payload = pathlib.Path(readme_path).read_text() | |
| match_iterator = re.finditer( | |
| rf"(?P<container_registry>[\w/.\-:]+)/{framework_name}:(?P<image_version_with_python_version>[\w.-]+)", | |
| readme_payload, | |
| ) | |
| for entry in match_iterator: | |
| assert entry.group() == image_name_with_version, f"{entry.group()} != {image_name_with_version}" | |
| def search_warning_on_too_verbose_log_level(logs: str): | |
| pattern = r"Triton Inference Server is running with enabled verbose logs.*It may affect inference performance." | |
| return re.search(pattern, logs) | |
| class ProcessMonitoringThread: | |
| """A class that creates a thread to monitor a process. | |
| This class uses the ProcessMonitoring class to dump the state of a process | |
| and its children periodically. It also allows registering custom monitors | |
| that can perform additional actions on the process. | |
| Attributes: | |
| _monitoring (ProcessMonitoring): The ProcessMonitoring object that handles the dumping logic. | |
| _stop_event (threading.Event): The event object that signals the thread to stop its loop. | |
| _thread (threading.Thread): The thread object that runs the _run method in a loop. | |
| _interval (float): The interval in seconds between each dump cycle. | |
| """ | |
| def __init__(self, monitoring: ProcessMonitoring, interval: float = 60): | |
| """Initializes the ProcessMonitoringThread object. | |
| Args: | |
| monitoring (ProcessMonitoring): The ProcessMonitoring object that handles the dumping logic. | |
| interval (float, optional): The interval in seconds between each dump cycle. Defaults to 60. | |
| """ | |
| self._monitoring = monitoring | |
| self._interval = interval | |
| def start(self) -> None: | |
| """Starts the monitoring thread. | |
| This method creates a new thread that runs the _run method in a loop until | |
| the stop method is called or an exception occurs. It also sets the stop event | |
| object that can be used to signal the thread to stop gracefully. | |
| """ | |
| self._stop_event = threading.Event() | |
| self._thread = threading.Thread(target=self._run, daemon=True) | |
| self._thread.start() | |
| def stop(self) -> None: | |
| """Stops the monitoring thread. | |
| This method sets the stop event object that signals the thread to stop its loop. | |
| It also waits for the thread to join before returning. | |
| """ | |
| self._stop_event.set() | |
| self._thread.join() | |
| def __enter__(self): | |
| """Enters the context manager for the monitoring thread.""" | |
| self.start() | |
| return self | |
| def __exit__(self, *args): | |
| """Exits the context manager for the monitoring thread.""" | |
| self.stop() | |
| def _run(self): | |
| logging.info("Monitoring process") | |
| self._monitoring.dump_state() | |
| while not self._stop_event.wait(self._interval): | |
| logging.info("Monitoring process") | |
| self._monitoring.dump_state() | |
| class TestMonitoringContext: | |
| """A context manager that monitors test processes. | |
| This context manager creates threads to monitor the test processes and dumps | |
| their state periodically. It can extend argparse args with additional arguments. | |
| It supports splitting log into different files. The standard output log can have one level | |
| and the file log can have another level. It uses log rotation. | |
| """ | |
| def extend_args(parser): | |
| parser.add_argument( | |
| "--verbose", | |
| action="store_true", | |
| help="Provide verbose logs", | |
| ) | |
| parser.add_argument( | |
| "--log-path", | |
| type=str, | |
| default=None, | |
| help="Provide the path of external log for rotation", | |
| ) | |
| parser.add_argument( | |
| "--compress-logs", | |
| action="store_true", | |
| help="Enable logs compression", | |
| ) | |
| parser.add_argument( | |
| "--maximum-log-file", | |
| type=int, | |
| default=10 * 1024 * 1024, | |
| help="Maximum logfile size before rotation is started", | |
| required=False, | |
| ) | |
| parser.add_argument( | |
| "--enable-fault-handler", | |
| action="store_true", | |
| help="Enable faulthandler", | |
| ) | |
| parser.add_argument( | |
| "--faulthandler-interval", | |
| type=float, | |
| default=None, | |
| help="Enable faulthandler after specified number of seconds with repeat", | |
| required=False, | |
| ) | |
| parser.add_argument( | |
| "--process-monitoring-interval", | |
| type=float, | |
| default=None, | |
| help="Enable process monitoring after specified number of seconds with repeat", | |
| required=False, | |
| ) | |
| def __init__(self, args): | |
| """Initializes the TestMonitoringContext object. | |
| Args: | |
| args (argparse.Namespace): The argparse args object to extend with additional arguments. | |
| """ | |
| self._args = args | |
| def __enter__(self): | |
| import faulthandler | |
| import logging.handlers | |
| args = self._args | |
| self._loglevel = log_level = logging.DEBUG if args.verbose else logging.INFO | |
| logging.basicConfig(level=logging.DEBUG, format=DEFAULT_LOG_FORMAT) | |
| logger = logging.getLogger() | |
| if args.log_path is not None: | |
| # Create a rotating file handler for the file output logger | |
| # The file name is based on the log path argument, the maximum size is 10 MB, and the maximum number of files is 500 | |
| file_handler = logging.handlers.RotatingFileHandler( | |
| args.log_path, maxBytes=args.maximum_log_file, backupCount=500 | |
| ) | |
| file_handler.setFormatter(logging.Formatter(DEFAULT_LOG_FORMAT)) | |
| file_handler.setLevel(logging.DEBUG) | |
| if args.compress_logs: | |
| file_handler.namer = lambda name: name + ".gz" | |
| def gzip_rotation(source, dest): | |
| import gzip | |
| import os | |
| with open(source, "rb") as f_in: | |
| with gzip.open(dest, "wb") as f_out: | |
| f_out.writelines(f_in) | |
| os.remove(source) | |
| file_handler.rotator = gzip_rotation | |
| # Add the file handler to the default logger | |
| logger.addHandler(file_handler) | |
| # Get the stream handler that was created by basicConfig | |
| # Get the stream handler that was created by basicConfig | |
| stream_handler = logger.handlers[0] | |
| # Set the stream handler's level to match the log level argument | |
| stream_handler.setLevel(log_level) | |
| if args.enable_fault_handler: | |
| faulthandler.enable() | |
| if args.faulthandler_interval is not None: | |
| faulthandler.dump_traceback_later(args.faulthandler_interval, repeat=True, exit=False) | |
| custom_monitors = [] | |
| import os | |
| import psutil | |
| def monitor_ram_usage(pid=None): | |
| if pid is None: | |
| pid = os.getpid() | |
| process = psutil.Process(pid) | |
| logger.debug(f"MONITOR RAM USAGE ({pid}): {process.memory_info()}") | |
| custom_monitors.append(monitor_ram_usage) | |
| def monitor_file_descriptors(pid=None): | |
| if pid is None: | |
| pid = os.getpid() | |
| process = psutil.Process(pid) | |
| logger.debug(f"MONITOR FILE DESCRIPTORS ({pid}): {process.num_fds()}") | |
| custom_monitors.append(monitor_file_descriptors) | |
| def monitor_cpu_usage(pid=None): | |
| if pid is None: | |
| pid = os.getpid() | |
| process = psutil.Process(pid) | |
| logger.debug(f"MONITOR CPU USAGE ({pid}): {process.cpu_percent()}") | |
| custom_monitors.append(monitor_cpu_usage) | |
| def monitor_threads(pid=None): | |
| if pid is None: | |
| pid = os.getpid() | |
| process = psutil.Process(pid) | |
| logger.debug(f"MONITOR THREADS ({pid}): {process.num_threads()}") | |
| custom_monitors.append(monitor_threads) | |
| def monitor_process_dict(pid=None): | |
| if pid is None: | |
| pid = os.getpid() | |
| process = psutil.Process(pid) | |
| logger.debug(f"MONITOR PROCESS DICT ({pid}): {process.as_dict()}") | |
| custom_monitors.append(monitor_process_dict) | |
| if args.process_monitoring_interval is not None: | |
| monitoring = ProcessMonitoring(os.getpid(), logger, loglevel=logging.DEBUG, remove_color=True) | |
| for monitor in custom_monitors: | |
| monitoring.register_custom_monitor(monitor) | |
| self._monitor = ProcessMonitoringThread(monitoring, interval=args.process_monitoring_interval) | |
| self._monitor.start() | |
| return self | |
| def __exit__(self, *args): | |
| if hasattr(self, "_monitor"): | |
| self._monitor.stop() | |
| self._monitor = None | |