|
import contextlib |
|
import _imp |
|
import importlib |
|
import importlib.util |
|
import os |
|
import shutil |
|
import sys |
|
import unittest |
|
import warnings |
|
|
|
from .os_helper import unlink, temp_dir |
|
|
|
|
|
@contextlib.contextmanager |
|
def _ignore_deprecated_imports(ignore=True): |
|
"""Context manager to suppress package and module deprecation |
|
warnings when importing them. |
|
|
|
If ignore is False, this context manager has no effect. |
|
""" |
|
if ignore: |
|
with warnings.catch_warnings(): |
|
warnings.filterwarnings("ignore", ".+ (module|package)", |
|
DeprecationWarning) |
|
yield |
|
else: |
|
yield |
|
|
|
|
|
def unload(name): |
|
try: |
|
del sys.modules[name] |
|
except KeyError: |
|
pass |
|
|
|
|
|
def forget(modname): |
|
"""'Forget' a module was ever imported. |
|
|
|
This removes the module from sys.modules and deletes any PEP 3147/488 or |
|
legacy .pyc files. |
|
""" |
|
unload(modname) |
|
for dirname in sys.path: |
|
source = os.path.join(dirname, modname + '.py') |
|
|
|
|
|
unlink(source + 'c') |
|
for opt in ('', 1, 2): |
|
unlink(importlib.util.cache_from_source(source, optimization=opt)) |
|
|
|
|
|
def make_legacy_pyc(source): |
|
"""Move a PEP 3147/488 pyc file to its legacy pyc location. |
|
|
|
:param source: The file system path to the source file. The source file |
|
does not need to exist, however the PEP 3147/488 pyc file must exist. |
|
:return: The file system path to the legacy pyc file. |
|
""" |
|
pyc_file = importlib.util.cache_from_source(source) |
|
up_one = os.path.dirname(os.path.abspath(source)) |
|
legacy_pyc = os.path.join(up_one, source + 'c') |
|
shutil.move(pyc_file, legacy_pyc) |
|
return legacy_pyc |
|
|
|
|
|
def import_module(name, deprecated=False, *, required_on=()): |
|
"""Import and return the module to be tested, raising SkipTest if |
|
it is not available. |
|
|
|
If deprecated is True, any module or package deprecation messages |
|
will be suppressed. If a module is required on a platform but optional for |
|
others, set required_on to an iterable of platform prefixes which will be |
|
compared against sys.platform. |
|
""" |
|
with _ignore_deprecated_imports(deprecated): |
|
try: |
|
return importlib.import_module(name) |
|
except ImportError as msg: |
|
if sys.platform.startswith(tuple(required_on)): |
|
raise |
|
raise unittest.SkipTest(str(msg)) |
|
|
|
|
|
def _save_and_remove_modules(names): |
|
orig_modules = {} |
|
prefixes = tuple(name + '.' for name in names) |
|
for modname in list(sys.modules): |
|
if modname in names or modname.startswith(prefixes): |
|
orig_modules[modname] = sys.modules.pop(modname) |
|
return orig_modules |
|
|
|
|
|
@contextlib.contextmanager |
|
def frozen_modules(enabled=True): |
|
"""Force frozen modules to be used (or not). |
|
|
|
This only applies to modules that haven't been imported yet. |
|
Also, some essential modules will always be imported frozen. |
|
""" |
|
_imp._override_frozen_modules_for_tests(1 if enabled else -1) |
|
try: |
|
yield |
|
finally: |
|
_imp._override_frozen_modules_for_tests(0) |
|
|
|
|
|
def import_fresh_module(name, fresh=(), blocked=(), *, |
|
deprecated=False, |
|
usefrozen=False, |
|
): |
|
"""Import and return a module, deliberately bypassing sys.modules. |
|
|
|
This function imports and returns a fresh copy of the named Python module |
|
by removing the named module from sys.modules before doing the import. |
|
Note that unlike reload, the original module is not affected by |
|
this operation. |
|
|
|
*fresh* is an iterable of additional module names that are also removed |
|
from the sys.modules cache before doing the import. If one of these |
|
modules can't be imported, None is returned. |
|
|
|
*blocked* is an iterable of module names that are replaced with None |
|
in the module cache during the import to ensure that attempts to import |
|
them raise ImportError. |
|
|
|
The named module and any modules named in the *fresh* and *blocked* |
|
parameters are saved before starting the import and then reinserted into |
|
sys.modules when the fresh import is complete. |
|
|
|
Module and package deprecation messages are suppressed during this import |
|
if *deprecated* is True. |
|
|
|
This function will raise ImportError if the named module cannot be |
|
imported. |
|
|
|
If "usefrozen" is False (the default) then the frozen importer is |
|
disabled (except for essential modules like importlib._bootstrap). |
|
""" |
|
|
|
|
|
with _ignore_deprecated_imports(deprecated): |
|
|
|
|
|
fresh = list(fresh) |
|
blocked = list(blocked) |
|
names = {name, *fresh, *blocked} |
|
orig_modules = _save_and_remove_modules(names) |
|
for modname in blocked: |
|
sys.modules[modname] = None |
|
|
|
try: |
|
with frozen_modules(usefrozen): |
|
|
|
try: |
|
for modname in fresh: |
|
__import__(modname) |
|
except ImportError: |
|
return None |
|
return importlib.import_module(name) |
|
finally: |
|
_save_and_remove_modules(names) |
|
sys.modules.update(orig_modules) |
|
|
|
|
|
class CleanImport(object): |
|
"""Context manager to force import to return a new module reference. |
|
|
|
This is useful for testing module-level behaviours, such as |
|
the emission of a DeprecationWarning on import. |
|
|
|
Use like this: |
|
|
|
with CleanImport("foo"): |
|
importlib.import_module("foo") # new reference |
|
|
|
If "usefrozen" is False (the default) then the frozen importer is |
|
disabled (except for essential modules like importlib._bootstrap). |
|
""" |
|
|
|
def __init__(self, *module_names, usefrozen=False): |
|
self.original_modules = sys.modules.copy() |
|
for module_name in module_names: |
|
if module_name in sys.modules: |
|
module = sys.modules[module_name] |
|
|
|
|
|
|
|
|
|
if module.__name__ != module_name: |
|
del sys.modules[module.__name__] |
|
del sys.modules[module_name] |
|
self._frozen_modules = frozen_modules(usefrozen) |
|
|
|
def __enter__(self): |
|
self._frozen_modules.__enter__() |
|
return self |
|
|
|
def __exit__(self, *ignore_exc): |
|
sys.modules.update(self.original_modules) |
|
self._frozen_modules.__exit__(*ignore_exc) |
|
|
|
|
|
class DirsOnSysPath(object): |
|
"""Context manager to temporarily add directories to sys.path. |
|
|
|
This makes a copy of sys.path, appends any directories given |
|
as positional arguments, then reverts sys.path to the copied |
|
settings when the context ends. |
|
|
|
Note that *all* sys.path modifications in the body of the |
|
context manager, including replacement of the object, |
|
will be reverted at the end of the block. |
|
""" |
|
|
|
def __init__(self, *paths): |
|
self.original_value = sys.path[:] |
|
self.original_object = sys.path |
|
sys.path.extend(paths) |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, *ignore_exc): |
|
sys.path = self.original_object |
|
sys.path[:] = self.original_value |
|
|
|
|
|
def modules_setup(): |
|
return sys.modules.copy(), |
|
|
|
|
|
def modules_cleanup(oldmodules): |
|
|
|
|
|
|
|
encodings = [(k, v) for k, v in sys.modules.items() |
|
if k.startswith('encodings.')] |
|
sys.modules.clear() |
|
sys.modules.update(encodings) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sys.modules.update(oldmodules) |
|
|
|
|
|
@contextlib.contextmanager |
|
def isolated_modules(): |
|
""" |
|
Save modules on entry and cleanup on exit. |
|
""" |
|
(saved,) = modules_setup() |
|
try: |
|
yield |
|
finally: |
|
modules_cleanup(saved) |
|
|
|
|
|
def mock_register_at_fork(func): |
|
|
|
|
|
|
|
from unittest import mock |
|
return mock.patch('os.register_at_fork', create=True)(func) |
|
|
|
|
|
@contextlib.contextmanager |
|
def ready_to_import(name=None, source=""): |
|
from test.support import script_helper |
|
|
|
|
|
|
|
|
|
|
|
name = name or "spam" |
|
with temp_dir() as tempdir: |
|
path = script_helper.make_script(tempdir, name, source) |
|
old_module = sys.modules.pop(name, None) |
|
try: |
|
sys.path.insert(0, tempdir) |
|
yield name, path |
|
sys.path.remove(tempdir) |
|
finally: |
|
if old_module is not None: |
|
sys.modules[name] = old_module |
|
else: |
|
sys.modules.pop(name, None) |
|
|