Spaces:
Sleeping
Sleeping
| # mypy: allow-untyped-defs | |
| """Access and control log capturing.""" | |
| from __future__ import annotations | |
| from contextlib import contextmanager | |
| from contextlib import nullcontext | |
| from datetime import datetime | |
| from datetime import timedelta | |
| from datetime import timezone | |
| import io | |
| from io import StringIO | |
| import logging | |
| from logging import LogRecord | |
| import os | |
| from pathlib import Path | |
| import re | |
| from types import TracebackType | |
| from typing import AbstractSet | |
| from typing import Dict | |
| from typing import final | |
| from typing import Generator | |
| from typing import Generic | |
| from typing import List | |
| from typing import Literal | |
| from typing import Mapping | |
| from typing import TYPE_CHECKING | |
| from typing import TypeVar | |
| from _pytest import nodes | |
| from _pytest._io import TerminalWriter | |
| from _pytest.capture import CaptureManager | |
| from _pytest.config import _strtobool | |
| from _pytest.config import Config | |
| from _pytest.config import create_terminal_writer | |
| from _pytest.config import hookimpl | |
| from _pytest.config import UsageError | |
| from _pytest.config.argparsing import Parser | |
| from _pytest.deprecated import check_ispytest | |
| from _pytest.fixtures import fixture | |
| from _pytest.fixtures import FixtureRequest | |
| from _pytest.main import Session | |
| from _pytest.stash import StashKey | |
| from _pytest.terminal import TerminalReporter | |
| if TYPE_CHECKING: | |
| logging_StreamHandler = logging.StreamHandler[StringIO] | |
| else: | |
| logging_StreamHandler = logging.StreamHandler | |
| DEFAULT_LOG_FORMAT = "%(levelname)-8s %(name)s:%(filename)s:%(lineno)d %(message)s" | |
| DEFAULT_LOG_DATE_FORMAT = "%H:%M:%S" | |
| _ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m") | |
| caplog_handler_key = StashKey["LogCaptureHandler"]() | |
| caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]() | |
| def _remove_ansi_escape_sequences(text: str) -> str: | |
| return _ANSI_ESCAPE_SEQ.sub("", text) | |
| class DatetimeFormatter(logging.Formatter): | |
| """A logging formatter which formats record with | |
| :func:`datetime.datetime.strftime` formatter instead of | |
| :func:`time.strftime` in case of microseconds in format string. | |
| """ | |
| def formatTime(self, record: LogRecord, datefmt: str | None = None) -> str: | |
| if datefmt and "%f" in datefmt: | |
| ct = self.converter(record.created) | |
| tz = timezone(timedelta(seconds=ct.tm_gmtoff), ct.tm_zone) | |
| # Construct `datetime.datetime` object from `struct_time` | |
| # and msecs information from `record` | |
| # Using int() instead of round() to avoid it exceeding 1_000_000 and causing a ValueError (#11861). | |
| dt = datetime(*ct[0:6], microsecond=int(record.msecs * 1000), tzinfo=tz) | |
| return dt.strftime(datefmt) | |
| # Use `logging.Formatter` for non-microsecond formats | |
| return super().formatTime(record, datefmt) | |
| class ColoredLevelFormatter(DatetimeFormatter): | |
| """A logging formatter which colorizes the %(levelname)..s part of the | |
| log format passed to __init__.""" | |
| LOGLEVEL_COLOROPTS: Mapping[int, AbstractSet[str]] = { | |
| logging.CRITICAL: {"red"}, | |
| logging.ERROR: {"red", "bold"}, | |
| logging.WARNING: {"yellow"}, | |
| logging.WARN: {"yellow"}, | |
| logging.INFO: {"green"}, | |
| logging.DEBUG: {"purple"}, | |
| logging.NOTSET: set(), | |
| } | |
| LEVELNAME_FMT_REGEX = re.compile(r"%\(levelname\)([+-.]?\d*(?:\.\d+)?s)") | |
| def __init__(self, terminalwriter: TerminalWriter, *args, **kwargs) -> None: | |
| super().__init__(*args, **kwargs) | |
| self._terminalwriter = terminalwriter | |
| self._original_fmt = self._style._fmt | |
| self._level_to_fmt_mapping: dict[int, str] = {} | |
| for level, color_opts in self.LOGLEVEL_COLOROPTS.items(): | |
| self.add_color_level(level, *color_opts) | |
| def add_color_level(self, level: int, *color_opts: str) -> None: | |
| """Add or update color opts for a log level. | |
| :param level: | |
| Log level to apply a style to, e.g. ``logging.INFO``. | |
| :param color_opts: | |
| ANSI escape sequence color options. Capitalized colors indicates | |
| background color, i.e. ``'green', 'Yellow', 'bold'`` will give bold | |
| green text on yellow background. | |
| .. warning:: | |
| This is an experimental API. | |
| """ | |
| assert self._fmt is not None | |
| levelname_fmt_match = self.LEVELNAME_FMT_REGEX.search(self._fmt) | |
| if not levelname_fmt_match: | |
| return | |
| levelname_fmt = levelname_fmt_match.group() | |
| formatted_levelname = levelname_fmt % {"levelname": logging.getLevelName(level)} | |
| # add ANSI escape sequences around the formatted levelname | |
| color_kwargs = {name: True for name in color_opts} | |
| colorized_formatted_levelname = self._terminalwriter.markup( | |
| formatted_levelname, **color_kwargs | |
| ) | |
| self._level_to_fmt_mapping[level] = self.LEVELNAME_FMT_REGEX.sub( | |
| colorized_formatted_levelname, self._fmt | |
| ) | |
| def format(self, record: logging.LogRecord) -> str: | |
| fmt = self._level_to_fmt_mapping.get(record.levelno, self._original_fmt) | |
| self._style._fmt = fmt | |
| return super().format(record) | |
| class PercentStyleMultiline(logging.PercentStyle): | |
| """A logging style with special support for multiline messages. | |
| If the message of a record consists of multiple lines, this style | |
| formats the message as if each line were logged separately. | |
| """ | |
| def __init__(self, fmt: str, auto_indent: int | str | bool | None) -> None: | |
| super().__init__(fmt) | |
| self._auto_indent = self._get_auto_indent(auto_indent) | |
| def _get_auto_indent(auto_indent_option: int | str | bool | None) -> int: | |
| """Determine the current auto indentation setting. | |
| Specify auto indent behavior (on/off/fixed) by passing in | |
| extra={"auto_indent": [value]} to the call to logging.log() or | |
| using a --log-auto-indent [value] command line or the | |
| log_auto_indent [value] config option. | |
| Default behavior is auto-indent off. | |
| Using the string "True" or "on" or the boolean True as the value | |
| turns auto indent on, using the string "False" or "off" or the | |
| boolean False or the int 0 turns it off, and specifying a | |
| positive integer fixes the indentation position to the value | |
| specified. | |
| Any other values for the option are invalid, and will silently be | |
| converted to the default. | |
| :param None|bool|int|str auto_indent_option: | |
| User specified option for indentation from command line, config | |
| or extra kwarg. Accepts int, bool or str. str option accepts the | |
| same range of values as boolean config options, as well as | |
| positive integers represented in str form. | |
| :returns: | |
| Indentation value, which can be | |
| -1 (automatically determine indentation) or | |
| 0 (auto-indent turned off) or | |
| >0 (explicitly set indentation position). | |
| """ | |
| if auto_indent_option is None: | |
| return 0 | |
| elif isinstance(auto_indent_option, bool): | |
| if auto_indent_option: | |
| return -1 | |
| else: | |
| return 0 | |
| elif isinstance(auto_indent_option, int): | |
| return int(auto_indent_option) | |
| elif isinstance(auto_indent_option, str): | |
| try: | |
| return int(auto_indent_option) | |
| except ValueError: | |
| pass | |
| try: | |
| if _strtobool(auto_indent_option): | |
| return -1 | |
| except ValueError: | |
| return 0 | |
| return 0 | |
| def format(self, record: logging.LogRecord) -> str: | |
| if "\n" in record.message: | |
| if hasattr(record, "auto_indent"): | |
| # Passed in from the "extra={}" kwarg on the call to logging.log(). | |
| auto_indent = self._get_auto_indent(record.auto_indent) | |
| else: | |
| auto_indent = self._auto_indent | |
| if auto_indent: | |
| lines = record.message.splitlines() | |
| formatted = self._fmt % {**record.__dict__, "message": lines[0]} | |
| if auto_indent < 0: | |
| indentation = _remove_ansi_escape_sequences(formatted).find( | |
| lines[0] | |
| ) | |
| else: | |
| # Optimizes logging by allowing a fixed indentation. | |
| indentation = auto_indent | |
| lines[0] = formatted | |
| return ("\n" + " " * indentation).join(lines) | |
| return self._fmt % record.__dict__ | |
| def get_option_ini(config: Config, *names: str): | |
| for name in names: | |
| ret = config.getoption(name) # 'default' arg won't work as expected | |
| if ret is None: | |
| ret = config.getini(name) | |
| if ret: | |
| return ret | |
| def pytest_addoption(parser: Parser) -> None: | |
| """Add options to control log capturing.""" | |
| group = parser.getgroup("logging") | |
| def add_option_ini(option, dest, default=None, type=None, **kwargs): | |
| parser.addini( | |
| dest, default=default, type=type, help="Default value for " + option | |
| ) | |
| group.addoption(option, dest=dest, **kwargs) | |
| add_option_ini( | |
| "--log-level", | |
| dest="log_level", | |
| default=None, | |
| metavar="LEVEL", | |
| help=( | |
| "Level of messages to catch/display." | |
| " Not set by default, so it depends on the root/parent log handler's" | |
| ' effective level, where it is "WARNING" by default.' | |
| ), | |
| ) | |
| add_option_ini( | |
| "--log-format", | |
| dest="log_format", | |
| default=DEFAULT_LOG_FORMAT, | |
| help="Log format used by the logging module", | |
| ) | |
| add_option_ini( | |
| "--log-date-format", | |
| dest="log_date_format", | |
| default=DEFAULT_LOG_DATE_FORMAT, | |
| help="Log date format used by the logging module", | |
| ) | |
| parser.addini( | |
| "log_cli", | |
| default=False, | |
| type="bool", | |
| help='Enable log display during test run (also known as "live logging")', | |
| ) | |
| add_option_ini( | |
| "--log-cli-level", dest="log_cli_level", default=None, help="CLI logging level" | |
| ) | |
| add_option_ini( | |
| "--log-cli-format", | |
| dest="log_cli_format", | |
| default=None, | |
| help="Log format used by the logging module", | |
| ) | |
| add_option_ini( | |
| "--log-cli-date-format", | |
| dest="log_cli_date_format", | |
| default=None, | |
| help="Log date format used by the logging module", | |
| ) | |
| add_option_ini( | |
| "--log-file", | |
| dest="log_file", | |
| default=None, | |
| help="Path to a file when logging will be written to", | |
| ) | |
| add_option_ini( | |
| "--log-file-mode", | |
| dest="log_file_mode", | |
| default="w", | |
| choices=["w", "a"], | |
| help="Log file open mode", | |
| ) | |
| add_option_ini( | |
| "--log-file-level", | |
| dest="log_file_level", | |
| default=None, | |
| help="Log file logging level", | |
| ) | |
| add_option_ini( | |
| "--log-file-format", | |
| dest="log_file_format", | |
| default=None, | |
| help="Log format used by the logging module", | |
| ) | |
| add_option_ini( | |
| "--log-file-date-format", | |
| dest="log_file_date_format", | |
| default=None, | |
| help="Log date format used by the logging module", | |
| ) | |
| add_option_ini( | |
| "--log-auto-indent", | |
| dest="log_auto_indent", | |
| default=None, | |
| help="Auto-indent multiline messages passed to the logging module. Accepts true|on, false|off or an integer.", | |
| ) | |
| group.addoption( | |
| "--log-disable", | |
| action="append", | |
| default=[], | |
| dest="logger_disable", | |
| help="Disable a logger by name. Can be passed multiple times.", | |
| ) | |
| _HandlerType = TypeVar("_HandlerType", bound=logging.Handler) | |
| # Not using @contextmanager for performance reasons. | |
| class catching_logs(Generic[_HandlerType]): | |
| """Context manager that prepares the whole logging machinery properly.""" | |
| __slots__ = ("handler", "level", "orig_level") | |
| def __init__(self, handler: _HandlerType, level: int | None = None) -> None: | |
| self.handler = handler | |
| self.level = level | |
| def __enter__(self) -> _HandlerType: | |
| root_logger = logging.getLogger() | |
| if self.level is not None: | |
| self.handler.setLevel(self.level) | |
| root_logger.addHandler(self.handler) | |
| if self.level is not None: | |
| self.orig_level = root_logger.level | |
| root_logger.setLevel(min(self.orig_level, self.level)) | |
| return self.handler | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_val: BaseException | None, | |
| exc_tb: TracebackType | None, | |
| ) -> None: | |
| root_logger = logging.getLogger() | |
| if self.level is not None: | |
| root_logger.setLevel(self.orig_level) | |
| root_logger.removeHandler(self.handler) | |
| class LogCaptureHandler(logging_StreamHandler): | |
| """A logging handler that stores log records and the log text.""" | |
| def __init__(self) -> None: | |
| """Create a new log handler.""" | |
| super().__init__(StringIO()) | |
| self.records: list[logging.LogRecord] = [] | |
| def emit(self, record: logging.LogRecord) -> None: | |
| """Keep the log records in a list in addition to the log text.""" | |
| self.records.append(record) | |
| super().emit(record) | |
| def reset(self) -> None: | |
| self.records = [] | |
| self.stream = StringIO() | |
| def clear(self) -> None: | |
| self.records.clear() | |
| self.stream = StringIO() | |
| def handleError(self, record: logging.LogRecord) -> None: | |
| if logging.raiseExceptions: | |
| # Fail the test if the log message is bad (emit failed). | |
| # The default behavior of logging is to print "Logging error" | |
| # to stderr with the call stack and some extra details. | |
| # pytest wants to make such mistakes visible during testing. | |
| raise # noqa: PLE0704 | |
| class LogCaptureFixture: | |
| """Provides access and control of log capturing.""" | |
| def __init__(self, item: nodes.Node, *, _ispytest: bool = False) -> None: | |
| check_ispytest(_ispytest) | |
| self._item = item | |
| self._initial_handler_level: int | None = None | |
| # Dict of log name -> log level. | |
| self._initial_logger_levels: dict[str | None, int] = {} | |
| self._initial_disabled_logging_level: int | None = None | |
| def _finalize(self) -> None: | |
| """Finalize the fixture. | |
| This restores the log levels and the disabled logging levels changed by :meth:`set_level`. | |
| """ | |
| # Restore log levels. | |
| if self._initial_handler_level is not None: | |
| self.handler.setLevel(self._initial_handler_level) | |
| for logger_name, level in self._initial_logger_levels.items(): | |
| logger = logging.getLogger(logger_name) | |
| logger.setLevel(level) | |
| # Disable logging at the original disabled logging level. | |
| if self._initial_disabled_logging_level is not None: | |
| logging.disable(self._initial_disabled_logging_level) | |
| self._initial_disabled_logging_level = None | |
| def handler(self) -> LogCaptureHandler: | |
| """Get the logging handler used by the fixture.""" | |
| return self._item.stash[caplog_handler_key] | |
| def get_records( | |
| self, when: Literal["setup", "call", "teardown"] | |
| ) -> list[logging.LogRecord]: | |
| """Get the logging records for one of the possible test phases. | |
| :param when: | |
| Which test phase to obtain the records from. | |
| Valid values are: "setup", "call" and "teardown". | |
| :returns: The list of captured records at the given stage. | |
| .. versionadded:: 3.4 | |
| """ | |
| return self._item.stash[caplog_records_key].get(when, []) | |
| def text(self) -> str: | |
| """The formatted log text.""" | |
| return _remove_ansi_escape_sequences(self.handler.stream.getvalue()) | |
| def records(self) -> list[logging.LogRecord]: | |
| """The list of log records.""" | |
| return self.handler.records | |
| def record_tuples(self) -> list[tuple[str, int, str]]: | |
| """A list of a stripped down version of log records intended | |
| for use in assertion comparison. | |
| The format of the tuple is: | |
| (logger_name, log_level, message) | |
| """ | |
| return [(r.name, r.levelno, r.getMessage()) for r in self.records] | |
| def messages(self) -> list[str]: | |
| """A list of format-interpolated log messages. | |
| Unlike 'records', which contains the format string and parameters for | |
| interpolation, log messages in this list are all interpolated. | |
| Unlike 'text', which contains the output from the handler, log | |
| messages in this list are unadorned with levels, timestamps, etc, | |
| making exact comparisons more reliable. | |
| Note that traceback or stack info (from :func:`logging.exception` or | |
| the `exc_info` or `stack_info` arguments to the logging functions) is | |
| not included, as this is added by the formatter in the handler. | |
| .. versionadded:: 3.7 | |
| """ | |
| return [r.getMessage() for r in self.records] | |
| def clear(self) -> None: | |
| """Reset the list of log records and the captured log text.""" | |
| self.handler.clear() | |
| def _force_enable_logging( | |
| self, level: int | str, logger_obj: logging.Logger | |
| ) -> int: | |
| """Enable the desired logging level if the global level was disabled via ``logging.disabled``. | |
| Only enables logging levels greater than or equal to the requested ``level``. | |
| Does nothing if the desired ``level`` wasn't disabled. | |
| :param level: | |
| The logger level caplog should capture. | |
| All logging is enabled if a non-standard logging level string is supplied. | |
| Valid level strings are in :data:`logging._nameToLevel`. | |
| :param logger_obj: The logger object to check. | |
| :return: The original disabled logging level. | |
| """ | |
| original_disable_level: int = logger_obj.manager.disable | |
| if isinstance(level, str): | |
| # Try to translate the level string to an int for `logging.disable()` | |
| level = logging.getLevelName(level) | |
| if not isinstance(level, int): | |
| # The level provided was not valid, so just un-disable all logging. | |
| logging.disable(logging.NOTSET) | |
| elif not logger_obj.isEnabledFor(level): | |
| # Each level is `10` away from other levels. | |
| # https://docs.python.org/3/library/logging.html#logging-levels | |
| disable_level = max(level - 10, logging.NOTSET) | |
| logging.disable(disable_level) | |
| return original_disable_level | |
| def set_level(self, level: int | str, logger: str | None = None) -> None: | |
| """Set the threshold level of a logger for the duration of a test. | |
| Logging messages which are less severe than this level will not be captured. | |
| .. versionchanged:: 3.4 | |
| The levels of the loggers changed by this function will be | |
| restored to their initial values at the end of the test. | |
| Will enable the requested logging level if it was disabled via :func:`logging.disable`. | |
| :param level: The level. | |
| :param logger: The logger to update. If not given, the root logger. | |
| """ | |
| logger_obj = logging.getLogger(logger) | |
| # Save the original log-level to restore it during teardown. | |
| self._initial_logger_levels.setdefault(logger, logger_obj.level) | |
| logger_obj.setLevel(level) | |
| if self._initial_handler_level is None: | |
| self._initial_handler_level = self.handler.level | |
| self.handler.setLevel(level) | |
| initial_disabled_logging_level = self._force_enable_logging(level, logger_obj) | |
| if self._initial_disabled_logging_level is None: | |
| self._initial_disabled_logging_level = initial_disabled_logging_level | |
| def at_level(self, level: int | str, logger: str | None = None) -> Generator[None]: | |
| """Context manager that sets the level for capturing of logs. After | |
| the end of the 'with' statement the level is restored to its original | |
| value. | |
| Will enable the requested logging level if it was disabled via :func:`logging.disable`. | |
| :param level: The level. | |
| :param logger: The logger to update. If not given, the root logger. | |
| """ | |
| logger_obj = logging.getLogger(logger) | |
| orig_level = logger_obj.level | |
| logger_obj.setLevel(level) | |
| handler_orig_level = self.handler.level | |
| self.handler.setLevel(level) | |
| original_disable_level = self._force_enable_logging(level, logger_obj) | |
| try: | |
| yield | |
| finally: | |
| logger_obj.setLevel(orig_level) | |
| self.handler.setLevel(handler_orig_level) | |
| logging.disable(original_disable_level) | |
| def filtering(self, filter_: logging.Filter) -> Generator[None]: | |
| """Context manager that temporarily adds the given filter to the caplog's | |
| :meth:`handler` for the 'with' statement block, and removes that filter at the | |
| end of the block. | |
| :param filter_: A custom :class:`logging.Filter` object. | |
| .. versionadded:: 7.5 | |
| """ | |
| self.handler.addFilter(filter_) | |
| try: | |
| yield | |
| finally: | |
| self.handler.removeFilter(filter_) | |
| def caplog(request: FixtureRequest) -> Generator[LogCaptureFixture]: | |
| """Access and control log capturing. | |
| Captured logs are available through the following properties/methods:: | |
| * caplog.messages -> list of format-interpolated log messages | |
| * caplog.text -> string containing formatted log output | |
| * caplog.records -> list of logging.LogRecord instances | |
| * caplog.record_tuples -> list of (logger_name, level, message) tuples | |
| * caplog.clear() -> clear captured records and formatted log output string | |
| """ | |
| result = LogCaptureFixture(request.node, _ispytest=True) | |
| yield result | |
| result._finalize() | |
| def get_log_level_for_setting(config: Config, *setting_names: str) -> int | None: | |
| for setting_name in setting_names: | |
| log_level = config.getoption(setting_name) | |
| if log_level is None: | |
| log_level = config.getini(setting_name) | |
| if log_level: | |
| break | |
| else: | |
| return None | |
| if isinstance(log_level, str): | |
| log_level = log_level.upper() | |
| try: | |
| return int(getattr(logging, log_level, log_level)) | |
| except ValueError as e: | |
| # Python logging does not recognise this as a logging level | |
| raise UsageError( | |
| f"'{log_level}' is not recognized as a logging level name for " | |
| f"'{setting_name}'. Please consider passing the " | |
| "logging level num instead." | |
| ) from e | |
| # run after terminalreporter/capturemanager are configured | |
| def pytest_configure(config: Config) -> None: | |
| config.pluginmanager.register(LoggingPlugin(config), "logging-plugin") | |
| class LoggingPlugin: | |
| """Attaches to the logging module and captures log messages for each test.""" | |
| def __init__(self, config: Config) -> None: | |
| """Create a new plugin to capture log messages. | |
| The formatter can be safely shared across all handlers so | |
| create a single one for the entire test session here. | |
| """ | |
| self._config = config | |
| # Report logging. | |
| self.formatter = self._create_formatter( | |
| get_option_ini(config, "log_format"), | |
| get_option_ini(config, "log_date_format"), | |
| get_option_ini(config, "log_auto_indent"), | |
| ) | |
| self.log_level = get_log_level_for_setting(config, "log_level") | |
| self.caplog_handler = LogCaptureHandler() | |
| self.caplog_handler.setFormatter(self.formatter) | |
| self.report_handler = LogCaptureHandler() | |
| self.report_handler.setFormatter(self.formatter) | |
| # File logging. | |
| self.log_file_level = get_log_level_for_setting( | |
| config, "log_file_level", "log_level" | |
| ) | |
| log_file = get_option_ini(config, "log_file") or os.devnull | |
| if log_file != os.devnull: | |
| directory = os.path.dirname(os.path.abspath(log_file)) | |
| if not os.path.isdir(directory): | |
| os.makedirs(directory) | |
| self.log_file_mode = get_option_ini(config, "log_file_mode") or "w" | |
| self.log_file_handler = _FileHandler( | |
| log_file, mode=self.log_file_mode, encoding="UTF-8" | |
| ) | |
| log_file_format = get_option_ini(config, "log_file_format", "log_format") | |
| log_file_date_format = get_option_ini( | |
| config, "log_file_date_format", "log_date_format" | |
| ) | |
| log_file_formatter = DatetimeFormatter( | |
| log_file_format, datefmt=log_file_date_format | |
| ) | |
| self.log_file_handler.setFormatter(log_file_formatter) | |
| # CLI/live logging. | |
| self.log_cli_level = get_log_level_for_setting( | |
| config, "log_cli_level", "log_level" | |
| ) | |
| if self._log_cli_enabled(): | |
| terminal_reporter = config.pluginmanager.get_plugin("terminalreporter") | |
| # Guaranteed by `_log_cli_enabled()`. | |
| assert terminal_reporter is not None | |
| capture_manager = config.pluginmanager.get_plugin("capturemanager") | |
| # if capturemanager plugin is disabled, live logging still works. | |
| self.log_cli_handler: ( | |
| _LiveLoggingStreamHandler | _LiveLoggingNullHandler | |
| ) = _LiveLoggingStreamHandler(terminal_reporter, capture_manager) | |
| else: | |
| self.log_cli_handler = _LiveLoggingNullHandler() | |
| log_cli_formatter = self._create_formatter( | |
| get_option_ini(config, "log_cli_format", "log_format"), | |
| get_option_ini(config, "log_cli_date_format", "log_date_format"), | |
| get_option_ini(config, "log_auto_indent"), | |
| ) | |
| self.log_cli_handler.setFormatter(log_cli_formatter) | |
| self._disable_loggers(loggers_to_disable=config.option.logger_disable) | |
| def _disable_loggers(self, loggers_to_disable: list[str]) -> None: | |
| if not loggers_to_disable: | |
| return | |
| for name in loggers_to_disable: | |
| logger = logging.getLogger(name) | |
| logger.disabled = True | |
| def _create_formatter(self, log_format, log_date_format, auto_indent): | |
| # Color option doesn't exist if terminal plugin is disabled. | |
| color = getattr(self._config.option, "color", "no") | |
| if color != "no" and ColoredLevelFormatter.LEVELNAME_FMT_REGEX.search( | |
| log_format | |
| ): | |
| formatter: logging.Formatter = ColoredLevelFormatter( | |
| create_terminal_writer(self._config), log_format, log_date_format | |
| ) | |
| else: | |
| formatter = DatetimeFormatter(log_format, log_date_format) | |
| formatter._style = PercentStyleMultiline( | |
| formatter._style._fmt, auto_indent=auto_indent | |
| ) | |
| return formatter | |
| def set_log_path(self, fname: str) -> None: | |
| """Set the filename parameter for Logging.FileHandler(). | |
| Creates parent directory if it does not exist. | |
| .. warning:: | |
| This is an experimental API. | |
| """ | |
| fpath = Path(fname) | |
| if not fpath.is_absolute(): | |
| fpath = self._config.rootpath / fpath | |
| if not fpath.parent.exists(): | |
| fpath.parent.mkdir(exist_ok=True, parents=True) | |
| # https://github.com/python/mypy/issues/11193 | |
| stream: io.TextIOWrapper = fpath.open(mode=self.log_file_mode, encoding="UTF-8") # type: ignore[assignment] | |
| old_stream = self.log_file_handler.setStream(stream) | |
| if old_stream: | |
| old_stream.close() | |
| def _log_cli_enabled(self) -> bool: | |
| """Return whether live logging is enabled.""" | |
| enabled = self._config.getoption( | |
| "--log-cli-level" | |
| ) is not None or self._config.getini("log_cli") | |
| if not enabled: | |
| return False | |
| terminal_reporter = self._config.pluginmanager.get_plugin("terminalreporter") | |
| if terminal_reporter is None: | |
| # terminal reporter is disabled e.g. by pytest-xdist. | |
| return False | |
| return True | |
| def pytest_sessionstart(self) -> Generator[None]: | |
| self.log_cli_handler.set_when("sessionstart") | |
| with catching_logs(self.log_cli_handler, level=self.log_cli_level): | |
| with catching_logs(self.log_file_handler, level=self.log_file_level): | |
| return (yield) | |
| def pytest_collection(self) -> Generator[None]: | |
| self.log_cli_handler.set_when("collection") | |
| with catching_logs(self.log_cli_handler, level=self.log_cli_level): | |
| with catching_logs(self.log_file_handler, level=self.log_file_level): | |
| return (yield) | |
| def pytest_runtestloop(self, session: Session) -> Generator[None, object, object]: | |
| if session.config.option.collectonly: | |
| return (yield) | |
| if self._log_cli_enabled() and self._config.get_verbosity() < 1: | |
| # The verbose flag is needed to avoid messy test progress output. | |
| self._config.option.verbose = 1 | |
| with catching_logs(self.log_cli_handler, level=self.log_cli_level): | |
| with catching_logs(self.log_file_handler, level=self.log_file_level): | |
| return (yield) # Run all the tests. | |
| def pytest_runtest_logstart(self) -> None: | |
| self.log_cli_handler.reset() | |
| self.log_cli_handler.set_when("start") | |
| def pytest_runtest_logreport(self) -> None: | |
| self.log_cli_handler.set_when("logreport") | |
| def _runtest_for(self, item: nodes.Item, when: str) -> Generator[None]: | |
| """Implement the internals of the pytest_runtest_xxx() hooks.""" | |
| with catching_logs( | |
| self.caplog_handler, | |
| level=self.log_level, | |
| ) as caplog_handler, catching_logs( | |
| self.report_handler, | |
| level=self.log_level, | |
| ) as report_handler: | |
| caplog_handler.reset() | |
| report_handler.reset() | |
| item.stash[caplog_records_key][when] = caplog_handler.records | |
| item.stash[caplog_handler_key] = caplog_handler | |
| try: | |
| yield | |
| finally: | |
| log = report_handler.stream.getvalue().strip() | |
| item.add_report_section(when, "log", log) | |
| def pytest_runtest_setup(self, item: nodes.Item) -> Generator[None]: | |
| self.log_cli_handler.set_when("setup") | |
| empty: dict[str, list[logging.LogRecord]] = {} | |
| item.stash[caplog_records_key] = empty | |
| yield from self._runtest_for(item, "setup") | |
| def pytest_runtest_call(self, item: nodes.Item) -> Generator[None]: | |
| self.log_cli_handler.set_when("call") | |
| yield from self._runtest_for(item, "call") | |
| def pytest_runtest_teardown(self, item: nodes.Item) -> Generator[None]: | |
| self.log_cli_handler.set_when("teardown") | |
| try: | |
| yield from self._runtest_for(item, "teardown") | |
| finally: | |
| del item.stash[caplog_records_key] | |
| del item.stash[caplog_handler_key] | |
| def pytest_runtest_logfinish(self) -> None: | |
| self.log_cli_handler.set_when("finish") | |
| def pytest_sessionfinish(self) -> Generator[None]: | |
| self.log_cli_handler.set_when("sessionfinish") | |
| with catching_logs(self.log_cli_handler, level=self.log_cli_level): | |
| with catching_logs(self.log_file_handler, level=self.log_file_level): | |
| return (yield) | |
| def pytest_unconfigure(self) -> None: | |
| # Close the FileHandler explicitly. | |
| # (logging.shutdown might have lost the weakref?!) | |
| self.log_file_handler.close() | |
| class _FileHandler(logging.FileHandler): | |
| """A logging FileHandler with pytest tweaks.""" | |
| def handleError(self, record: logging.LogRecord) -> None: | |
| # Handled by LogCaptureHandler. | |
| pass | |
| class _LiveLoggingStreamHandler(logging_StreamHandler): | |
| """A logging StreamHandler used by the live logging feature: it will | |
| write a newline before the first log message in each test. | |
| During live logging we must also explicitly disable stdout/stderr | |
| capturing otherwise it will get captured and won't appear in the | |
| terminal. | |
| """ | |
| # Officially stream needs to be a IO[str], but TerminalReporter | |
| # isn't. So force it. | |
| stream: TerminalReporter = None # type: ignore | |
| def __init__( | |
| self, | |
| terminal_reporter: TerminalReporter, | |
| capture_manager: CaptureManager | None, | |
| ) -> None: | |
| super().__init__(stream=terminal_reporter) # type: ignore[arg-type] | |
| self.capture_manager = capture_manager | |
| self.reset() | |
| self.set_when(None) | |
| self._test_outcome_written = False | |
| def reset(self) -> None: | |
| """Reset the handler; should be called before the start of each test.""" | |
| self._first_record_emitted = False | |
| def set_when(self, when: str | None) -> None: | |
| """Prepare for the given test phase (setup/call/teardown).""" | |
| self._when = when | |
| self._section_name_shown = False | |
| if when == "start": | |
| self._test_outcome_written = False | |
| def emit(self, record: logging.LogRecord) -> None: | |
| ctx_manager = ( | |
| self.capture_manager.global_and_fixture_disabled() | |
| if self.capture_manager | |
| else nullcontext() | |
| ) | |
| with ctx_manager: | |
| if not self._first_record_emitted: | |
| self.stream.write("\n") | |
| self._first_record_emitted = True | |
| elif self._when in ("teardown", "finish"): | |
| if not self._test_outcome_written: | |
| self._test_outcome_written = True | |
| self.stream.write("\n") | |
| if not self._section_name_shown and self._when: | |
| self.stream.section("live log " + self._when, sep="-", bold=True) | |
| self._section_name_shown = True | |
| super().emit(record) | |
| def handleError(self, record: logging.LogRecord) -> None: | |
| # Handled by LogCaptureHandler. | |
| pass | |
| class _LiveLoggingNullHandler(logging.NullHandler): | |
| """A logging handler used when live logging is disabled.""" | |
| def reset(self) -> None: | |
| pass | |
| def set_when(self, when: str) -> None: | |
| pass | |
| def handleError(self, record: logging.LogRecord) -> None: | |
| # Handled by LogCaptureHandler. | |
| pass | |