Spaces:
Paused
Paused
| # mypy: allow-untyped-defs | |
| """Implementation of the cache provider.""" | |
| # This plugin was not named "cache" to avoid conflicts with the external | |
| # pytest-cache version. | |
| import dataclasses | |
| import errno | |
| import json | |
| import os | |
| from pathlib import Path | |
| import tempfile | |
| from typing import Dict | |
| from typing import final | |
| from typing import Generator | |
| from typing import Iterable | |
| from typing import List | |
| from typing import Optional | |
| from typing import Set | |
| from typing import Union | |
| from .pathlib import resolve_from_str | |
| from .pathlib import rm_rf | |
| from .reports import CollectReport | |
| from _pytest import nodes | |
| from _pytest._io import TerminalWriter | |
| from _pytest.config import Config | |
| from _pytest.config import ExitCode | |
| from _pytest.config import hookimpl | |
| from _pytest.config.argparsing import Parser | |
| from _pytest.deprecated import check_ispytest | |
| from _pytest.fixtures import fixture | |
| from _pytest.fixtures import FixtureRequest | |
| from _pytest.main import Session | |
| from _pytest.nodes import Directory | |
| from _pytest.nodes import File | |
| from _pytest.reports import TestReport | |
| README_CONTENT = """\ | |
| # pytest cache directory # | |
| This directory contains data from the pytest's cache plugin, | |
| which provides the `--lf` and `--ff` options, as well as the `cache` fixture. | |
| **Do not** commit this to version control. | |
| See [the docs](https://docs.pytest.org/en/stable/how-to/cache.html) for more information. | |
| """ | |
| CACHEDIR_TAG_CONTENT = b"""\ | |
| Signature: 8a477f597d28d172789f06886806bc55 | |
| # This file is a cache directory tag created by pytest. | |
| # For information about cache directory tags, see: | |
| # https://bford.info/cachedir/spec.html | |
| """ | |
| class Cache: | |
| """Instance of the `cache` fixture.""" | |
| _cachedir: Path = dataclasses.field(repr=False) | |
| _config: Config = dataclasses.field(repr=False) | |
| # Sub-directory under cache-dir for directories created by `mkdir()`. | |
| _CACHE_PREFIX_DIRS = "d" | |
| # Sub-directory under cache-dir for values created by `set()`. | |
| _CACHE_PREFIX_VALUES = "v" | |
| def __init__( | |
| self, cachedir: Path, config: Config, *, _ispytest: bool = False | |
| ) -> None: | |
| check_ispytest(_ispytest) | |
| self._cachedir = cachedir | |
| self._config = config | |
| def for_config(cls, config: Config, *, _ispytest: bool = False) -> "Cache": | |
| """Create the Cache instance for a Config. | |
| :meta private: | |
| """ | |
| check_ispytest(_ispytest) | |
| cachedir = cls.cache_dir_from_config(config, _ispytest=True) | |
| if config.getoption("cacheclear") and cachedir.is_dir(): | |
| cls.clear_cache(cachedir, _ispytest=True) | |
| return cls(cachedir, config, _ispytest=True) | |
| def clear_cache(cls, cachedir: Path, _ispytest: bool = False) -> None: | |
| """Clear the sub-directories used to hold cached directories and values. | |
| :meta private: | |
| """ | |
| check_ispytest(_ispytest) | |
| for prefix in (cls._CACHE_PREFIX_DIRS, cls._CACHE_PREFIX_VALUES): | |
| d = cachedir / prefix | |
| if d.is_dir(): | |
| rm_rf(d) | |
| def cache_dir_from_config(config: Config, *, _ispytest: bool = False) -> Path: | |
| """Get the path to the cache directory for a Config. | |
| :meta private: | |
| """ | |
| check_ispytest(_ispytest) | |
| return resolve_from_str(config.getini("cache_dir"), config.rootpath) | |
| def warn(self, fmt: str, *, _ispytest: bool = False, **args: object) -> None: | |
| """Issue a cache warning. | |
| :meta private: | |
| """ | |
| check_ispytest(_ispytest) | |
| import warnings | |
| from _pytest.warning_types import PytestCacheWarning | |
| warnings.warn( | |
| PytestCacheWarning(fmt.format(**args) if args else fmt), | |
| self._config.hook, | |
| stacklevel=3, | |
| ) | |
| def _mkdir(self, path: Path) -> None: | |
| self._ensure_cache_dir_and_supporting_files() | |
| path.mkdir(exist_ok=True, parents=True) | |
| def mkdir(self, name: str) -> Path: | |
| """Return a directory path object with the given name. | |
| If the directory does not yet exist, it will be created. You can use | |
| it to manage files to e.g. store/retrieve database dumps across test | |
| sessions. | |
| .. versionadded:: 7.0 | |
| :param name: | |
| Must be a string not containing a ``/`` separator. | |
| Make sure the name contains your plugin or application | |
| identifiers to prevent clashes with other cache users. | |
| """ | |
| path = Path(name) | |
| if len(path.parts) > 1: | |
| raise ValueError("name is not allowed to contain path separators") | |
| res = self._cachedir.joinpath(self._CACHE_PREFIX_DIRS, path) | |
| self._mkdir(res) | |
| return res | |
| def _getvaluepath(self, key: str) -> Path: | |
| return self._cachedir.joinpath(self._CACHE_PREFIX_VALUES, Path(key)) | |
| def get(self, key: str, default): | |
| """Return the cached value for the given key. | |
| If no value was yet cached or the value cannot be read, the specified | |
| default is returned. | |
| :param key: | |
| Must be a ``/`` separated value. Usually the first | |
| name is the name of your plugin or your application. | |
| :param default: | |
| The value to return in case of a cache-miss or invalid cache value. | |
| """ | |
| path = self._getvaluepath(key) | |
| try: | |
| with path.open("r", encoding="UTF-8") as f: | |
| return json.load(f) | |
| except (ValueError, OSError): | |
| return default | |
| def set(self, key: str, value: object) -> None: | |
| """Save value for the given key. | |
| :param key: | |
| Must be a ``/`` separated value. Usually the first | |
| name is the name of your plugin or your application. | |
| :param value: | |
| Must be of any combination of basic python types, | |
| including nested types like lists of dictionaries. | |
| """ | |
| path = self._getvaluepath(key) | |
| try: | |
| self._mkdir(path.parent) | |
| except OSError as exc: | |
| self.warn( | |
| f"could not create cache path {path}: {exc}", | |
| _ispytest=True, | |
| ) | |
| return | |
| data = json.dumps(value, ensure_ascii=False, indent=2) | |
| try: | |
| f = path.open("w", encoding="UTF-8") | |
| except OSError as exc: | |
| self.warn( | |
| f"cache could not write path {path}: {exc}", | |
| _ispytest=True, | |
| ) | |
| else: | |
| with f: | |
| f.write(data) | |
| def _ensure_cache_dir_and_supporting_files(self) -> None: | |
| """Create the cache dir and its supporting files.""" | |
| if self._cachedir.is_dir(): | |
| return | |
| self._cachedir.parent.mkdir(parents=True, exist_ok=True) | |
| with tempfile.TemporaryDirectory( | |
| prefix="pytest-cache-files-", | |
| dir=self._cachedir.parent, | |
| ) as newpath: | |
| path = Path(newpath) | |
| # Reset permissions to the default, see #12308. | |
| # Note: there's no way to get the current umask atomically, eek. | |
| umask = os.umask(0o022) | |
| os.umask(umask) | |
| path.chmod(0o777 - umask) | |
| with open(path.joinpath("README.md"), "xt", encoding="UTF-8") as f: | |
| f.write(README_CONTENT) | |
| with open(path.joinpath(".gitignore"), "xt", encoding="UTF-8") as f: | |
| f.write("# Created by pytest automatically.\n*\n") | |
| with open(path.joinpath("CACHEDIR.TAG"), "xb") as f: | |
| f.write(CACHEDIR_TAG_CONTENT) | |
| try: | |
| path.rename(self._cachedir) | |
| except OSError as e: | |
| # If 2 concurrent pytests both race to the rename, the loser | |
| # gets "Directory not empty" from the rename. In this case, | |
| # everything is handled so just continue (while letting the | |
| # temporary directory be cleaned up). | |
| if e.errno != errno.ENOTEMPTY: | |
| raise | |
| else: | |
| # Create a directory in place of the one we just moved so that | |
| # `TemporaryDirectory`'s cleanup doesn't complain. | |
| # | |
| # TODO: pass ignore_cleanup_errors=True when we no longer support python < 3.10. | |
| # See https://github.com/python/cpython/issues/74168. Note that passing | |
| # delete=False would do the wrong thing in case of errors and isn't supported | |
| # until python 3.12. | |
| path.mkdir() | |
| class LFPluginCollWrapper: | |
| def __init__(self, lfplugin: "LFPlugin") -> None: | |
| self.lfplugin = lfplugin | |
| self._collected_at_least_one_failure = False | |
| def pytest_make_collect_report( | |
| self, collector: nodes.Collector | |
| ) -> Generator[None, CollectReport, CollectReport]: | |
| res = yield | |
| if isinstance(collector, (Session, Directory)): | |
| # Sort any lf-paths to the beginning. | |
| lf_paths = self.lfplugin._last_failed_paths | |
| # Use stable sort to prioritize last failed. | |
| def sort_key(node: Union[nodes.Item, nodes.Collector]) -> bool: | |
| return node.path in lf_paths | |
| res.result = sorted( | |
| res.result, | |
| key=sort_key, | |
| reverse=True, | |
| ) | |
| elif isinstance(collector, File): | |
| if collector.path in self.lfplugin._last_failed_paths: | |
| result = res.result | |
| lastfailed = self.lfplugin.lastfailed | |
| # Only filter with known failures. | |
| if not self._collected_at_least_one_failure: | |
| if not any(x.nodeid in lastfailed for x in result): | |
| return res | |
| self.lfplugin.config.pluginmanager.register( | |
| LFPluginCollSkipfiles(self.lfplugin), "lfplugin-collskip" | |
| ) | |
| self._collected_at_least_one_failure = True | |
| session = collector.session | |
| result[:] = [ | |
| x | |
| for x in result | |
| if x.nodeid in lastfailed | |
| # Include any passed arguments (not trivial to filter). | |
| or session.isinitpath(x.path) | |
| # Keep all sub-collectors. | |
| or isinstance(x, nodes.Collector) | |
| ] | |
| return res | |
| class LFPluginCollSkipfiles: | |
| def __init__(self, lfplugin: "LFPlugin") -> None: | |
| self.lfplugin = lfplugin | |
| def pytest_make_collect_report( | |
| self, collector: nodes.Collector | |
| ) -> Optional[CollectReport]: | |
| if isinstance(collector, File): | |
| if collector.path not in self.lfplugin._last_failed_paths: | |
| self.lfplugin._skipped_files += 1 | |
| return CollectReport( | |
| collector.nodeid, "passed", longrepr=None, result=[] | |
| ) | |
| return None | |
| class LFPlugin: | |
| """Plugin which implements the --lf (run last-failing) option.""" | |
| def __init__(self, config: Config) -> None: | |
| self.config = config | |
| active_keys = "lf", "failedfirst" | |
| self.active = any(config.getoption(key) for key in active_keys) | |
| assert config.cache | |
| self.lastfailed: Dict[str, bool] = config.cache.get("cache/lastfailed", {}) | |
| self._previously_failed_count: Optional[int] = None | |
| self._report_status: Optional[str] = None | |
| self._skipped_files = 0 # count skipped files during collection due to --lf | |
| if config.getoption("lf"): | |
| self._last_failed_paths = self.get_last_failed_paths() | |
| config.pluginmanager.register( | |
| LFPluginCollWrapper(self), "lfplugin-collwrapper" | |
| ) | |
| def get_last_failed_paths(self) -> Set[Path]: | |
| """Return a set with all Paths of the previously failed nodeids and | |
| their parents.""" | |
| rootpath = self.config.rootpath | |
| result = set() | |
| for nodeid in self.lastfailed: | |
| path = rootpath / nodeid.split("::")[0] | |
| result.add(path) | |
| result.update(path.parents) | |
| return {x for x in result if x.exists()} | |
| def pytest_report_collectionfinish(self) -> Optional[str]: | |
| if self.active and self.config.getoption("verbose") >= 0: | |
| return "run-last-failure: %s" % self._report_status | |
| return None | |
| def pytest_runtest_logreport(self, report: TestReport) -> None: | |
| if (report.when == "call" and report.passed) or report.skipped: | |
| self.lastfailed.pop(report.nodeid, None) | |
| elif report.failed: | |
| self.lastfailed[report.nodeid] = True | |
| def pytest_collectreport(self, report: CollectReport) -> None: | |
| passed = report.outcome in ("passed", "skipped") | |
| if passed: | |
| if report.nodeid in self.lastfailed: | |
| self.lastfailed.pop(report.nodeid) | |
| self.lastfailed.update((item.nodeid, True) for item in report.result) | |
| else: | |
| self.lastfailed[report.nodeid] = True | |
| def pytest_collection_modifyitems( | |
| self, config: Config, items: List[nodes.Item] | |
| ) -> Generator[None, None, None]: | |
| res = yield | |
| if not self.active: | |
| return res | |
| if self.lastfailed: | |
| previously_failed = [] | |
| previously_passed = [] | |
| for item in items: | |
| if item.nodeid in self.lastfailed: | |
| previously_failed.append(item) | |
| else: | |
| previously_passed.append(item) | |
| self._previously_failed_count = len(previously_failed) | |
| if not previously_failed: | |
| # Running a subset of all tests with recorded failures | |
| # only outside of it. | |
| self._report_status = "%d known failures not in selected tests" % ( | |
| len(self.lastfailed), | |
| ) | |
| else: | |
| if self.config.getoption("lf"): | |
| items[:] = previously_failed | |
| config.hook.pytest_deselected(items=previously_passed) | |
| else: # --failedfirst | |
| items[:] = previously_failed + previously_passed | |
| noun = "failure" if self._previously_failed_count == 1 else "failures" | |
| suffix = " first" if self.config.getoption("failedfirst") else "" | |
| self._report_status = ( | |
| f"rerun previous {self._previously_failed_count} {noun}{suffix}" | |
| ) | |
| if self._skipped_files > 0: | |
| files_noun = "file" if self._skipped_files == 1 else "files" | |
| self._report_status += f" (skipped {self._skipped_files} {files_noun})" | |
| else: | |
| self._report_status = "no previously failed tests, " | |
| if self.config.getoption("last_failed_no_failures") == "none": | |
| self._report_status += "deselecting all items." | |
| config.hook.pytest_deselected(items=items[:]) | |
| items[:] = [] | |
| else: | |
| self._report_status += "not deselecting items." | |
| return res | |
| def pytest_sessionfinish(self, session: Session) -> None: | |
| config = self.config | |
| if config.getoption("cacheshow") or hasattr(config, "workerinput"): | |
| return | |
| assert config.cache is not None | |
| saved_lastfailed = config.cache.get("cache/lastfailed", {}) | |
| if saved_lastfailed != self.lastfailed: | |
| config.cache.set("cache/lastfailed", self.lastfailed) | |
| class NFPlugin: | |
| """Plugin which implements the --nf (run new-first) option.""" | |
| def __init__(self, config: Config) -> None: | |
| self.config = config | |
| self.active = config.option.newfirst | |
| assert config.cache is not None | |
| self.cached_nodeids = set(config.cache.get("cache/nodeids", [])) | |
| def pytest_collection_modifyitems( | |
| self, items: List[nodes.Item] | |
| ) -> Generator[None, None, None]: | |
| res = yield | |
| if self.active: | |
| new_items: Dict[str, nodes.Item] = {} | |
| other_items: Dict[str, nodes.Item] = {} | |
| for item in items: | |
| if item.nodeid not in self.cached_nodeids: | |
| new_items[item.nodeid] = item | |
| else: | |
| other_items[item.nodeid] = item | |
| items[:] = self._get_increasing_order( | |
| new_items.values() | |
| ) + self._get_increasing_order(other_items.values()) | |
| self.cached_nodeids.update(new_items) | |
| else: | |
| self.cached_nodeids.update(item.nodeid for item in items) | |
| return res | |
| def _get_increasing_order(self, items: Iterable[nodes.Item]) -> List[nodes.Item]: | |
| return sorted(items, key=lambda item: item.path.stat().st_mtime, reverse=True) | |
| def pytest_sessionfinish(self) -> None: | |
| config = self.config | |
| if config.getoption("cacheshow") or hasattr(config, "workerinput"): | |
| return | |
| if config.getoption("collectonly"): | |
| return | |
| assert config.cache is not None | |
| config.cache.set("cache/nodeids", sorted(self.cached_nodeids)) | |
| def pytest_addoption(parser: Parser) -> None: | |
| group = parser.getgroup("general") | |
| group.addoption( | |
| "--lf", | |
| "--last-failed", | |
| action="store_true", | |
| dest="lf", | |
| help="Rerun only the tests that failed " | |
| "at the last run (or all if none failed)", | |
| ) | |
| group.addoption( | |
| "--ff", | |
| "--failed-first", | |
| action="store_true", | |
| dest="failedfirst", | |
| help="Run all tests, but run the last failures first. " | |
| "This may re-order tests and thus lead to " | |
| "repeated fixture setup/teardown.", | |
| ) | |
| group.addoption( | |
| "--nf", | |
| "--new-first", | |
| action="store_true", | |
| dest="newfirst", | |
| help="Run tests from new files first, then the rest of the tests " | |
| "sorted by file mtime", | |
| ) | |
| group.addoption( | |
| "--cache-show", | |
| action="append", | |
| nargs="?", | |
| dest="cacheshow", | |
| help=( | |
| "Show cache contents, don't perform collection or tests. " | |
| "Optional argument: glob (default: '*')." | |
| ), | |
| ) | |
| group.addoption( | |
| "--cache-clear", | |
| action="store_true", | |
| dest="cacheclear", | |
| help="Remove all cache contents at start of test run", | |
| ) | |
| cache_dir_default = ".pytest_cache" | |
| if "TOX_ENV_DIR" in os.environ: | |
| cache_dir_default = os.path.join(os.environ["TOX_ENV_DIR"], cache_dir_default) | |
| parser.addini("cache_dir", default=cache_dir_default, help="Cache directory path") | |
| group.addoption( | |
| "--lfnf", | |
| "--last-failed-no-failures", | |
| action="store", | |
| dest="last_failed_no_failures", | |
| choices=("all", "none"), | |
| default="all", | |
| help="With ``--lf``, determines whether to execute tests when there " | |
| "are no previously (known) failures or when no " | |
| "cached ``lastfailed`` data was found. " | |
| "``all`` (the default) runs the full test suite again. " | |
| "``none`` just emits a message about no known failures and exits successfully.", | |
| ) | |
| def pytest_cmdline_main(config: Config) -> Optional[Union[int, ExitCode]]: | |
| if config.option.cacheshow and not config.option.help: | |
| from _pytest.main import wrap_session | |
| return wrap_session(config, cacheshow) | |
| return None | |
| def pytest_configure(config: Config) -> None: | |
| config.cache = Cache.for_config(config, _ispytest=True) | |
| config.pluginmanager.register(LFPlugin(config), "lfplugin") | |
| config.pluginmanager.register(NFPlugin(config), "nfplugin") | |
| def cache(request: FixtureRequest) -> Cache: | |
| """Return a cache object that can persist state between testing sessions. | |
| cache.get(key, default) | |
| cache.set(key, value) | |
| Keys must be ``/`` separated strings, where the first part is usually the | |
| name of your plugin or application to avoid clashes with other cache users. | |
| Values can be any object handled by the json stdlib module. | |
| """ | |
| assert request.config.cache is not None | |
| return request.config.cache | |
| def pytest_report_header(config: Config) -> Optional[str]: | |
| """Display cachedir with --cache-show and if non-default.""" | |
| if config.option.verbose > 0 or config.getini("cache_dir") != ".pytest_cache": | |
| assert config.cache is not None | |
| cachedir = config.cache._cachedir | |
| # TODO: evaluate generating upward relative paths | |
| # starting with .., ../.. if sensible | |
| try: | |
| displaypath = cachedir.relative_to(config.rootpath) | |
| except ValueError: | |
| displaypath = cachedir | |
| return f"cachedir: {displaypath}" | |
| return None | |
| def cacheshow(config: Config, session: Session) -> int: | |
| from pprint import pformat | |
| assert config.cache is not None | |
| tw = TerminalWriter() | |
| tw.line("cachedir: " + str(config.cache._cachedir)) | |
| if not config.cache._cachedir.is_dir(): | |
| tw.line("cache is empty") | |
| return 0 | |
| glob = config.option.cacheshow[0] | |
| if glob is None: | |
| glob = "*" | |
| dummy = object() | |
| basedir = config.cache._cachedir | |
| vdir = basedir / Cache._CACHE_PREFIX_VALUES | |
| tw.sep("-", "cache values for %r" % glob) | |
| for valpath in sorted(x for x in vdir.rglob(glob) if x.is_file()): | |
| key = str(valpath.relative_to(vdir)) | |
| val = config.cache.get(key, dummy) | |
| if val is dummy: | |
| tw.line("%s contains unreadable content, will be ignored" % key) | |
| else: | |
| tw.line("%s contains:" % key) | |
| for line in pformat(val).splitlines(): | |
| tw.line(" " + line) | |
| ddir = basedir / Cache._CACHE_PREFIX_DIRS | |
| if ddir.is_dir(): | |
| contents = sorted(ddir.rglob(glob)) | |
| tw.sep("-", "cache directories for %r" % glob) | |
| for p in contents: | |
| # if p.is_dir(): | |
| # print("%s/" % p.relative_to(basedir)) | |
| if p.is_file(): | |
| key = str(p.relative_to(basedir)) | |
| tw.line(f"{key} is a file of length {p.stat().st_size:d}") | |
| return 0 | |