|
|
|
|
|
|
|
|
import importlib |
|
|
import sys |
|
|
import types |
|
|
|
|
|
|
|
|
def get_func_name(func): |
|
|
if isinstance(func, str): |
|
|
return func |
|
|
return '.'.join((func.__module__, func.__qualname__)) |
|
|
|
|
|
|
|
|
def dummy_function_wrapper(func_name): |
|
|
def dummy_function(*args, **kwargs): |
|
|
raise RuntimeError('function {} no exist'.format(func_name)) |
|
|
|
|
|
return dummy_function |
|
|
|
|
|
|
|
|
class Patch: |
|
|
def __init__(self, orig_func_name, new_func, create_dummy): |
|
|
split_name = orig_func_name.rsplit('.', 1) |
|
|
if len(split_name) == 1: |
|
|
self.orig_module_name, self.orig_func_name = orig_func_name, None |
|
|
else: |
|
|
self.orig_module_name, self.orig_func_name = split_name |
|
|
self.orig_module = None |
|
|
self.orig_func = None |
|
|
|
|
|
self.patch_func = None |
|
|
self.wrappers = [] |
|
|
if new_func is None: |
|
|
new_func = dummy_function_wrapper(orig_func_name) |
|
|
self.set_patch_func(new_func) |
|
|
self.is_applied = False |
|
|
self.create_dummy = create_dummy |
|
|
|
|
|
@property |
|
|
def orig_func_id(self): |
|
|
return id(self.orig_func) |
|
|
|
|
|
@property |
|
|
def patch_func_id(self): |
|
|
return id(self.patch_func) |
|
|
|
|
|
def set_patch_func(self, new_func, force_patch=False): |
|
|
if hasattr(new_func, '__name__') and new_func.__name__.endswith(('wrapper', 'decorator')): |
|
|
self.wrappers.append(new_func) |
|
|
else: |
|
|
if self.patch_func and not force_patch: |
|
|
raise RuntimeError('the patch of {} exist !'.format(self.orig_func_name)) |
|
|
self.patch_func = new_func |
|
|
self.is_applied = False |
|
|
|
|
|
def apply_patch(self): |
|
|
if self.is_applied: |
|
|
return |
|
|
|
|
|
self.orig_module, self.orig_func = Patch.parse_path(self.orig_module_name, self.orig_func_name, self.create_dummy) |
|
|
|
|
|
final_patch_func = self.orig_func |
|
|
if self.patch_func is not None: |
|
|
final_patch_func = self.patch_func |
|
|
|
|
|
for wrapper in self.wrappers: |
|
|
final_patch_func = wrapper(final_patch_func) |
|
|
|
|
|
if self.orig_func_name is not None: |
|
|
setattr(self.orig_module, self.orig_func_name, final_patch_func) |
|
|
for key, value in sys.modules.copy().items(): |
|
|
if self.orig_func_name is not None and hasattr(value, self.orig_func_name) \ |
|
|
and id(getattr(value, self.orig_func_name)) == self.orig_func_id: |
|
|
setattr(value, self.orig_func_name, final_patch_func) |
|
|
self.is_applied = True |
|
|
|
|
|
@staticmethod |
|
|
def parse_path(module_path, function_name, create_dummy): |
|
|
from importlib.machinery import ModuleSpec |
|
|
modules = module_path.split('.') |
|
|
for i in range(1, len(modules) + 1): |
|
|
parent = '.'.join(modules[:i - 1]) |
|
|
path = '.'.join(modules[:i]) |
|
|
try: |
|
|
importlib.import_module(path) |
|
|
except ModuleNotFoundError as e: |
|
|
if not parent or not hasattr(importlib.import_module(parent), modules[i - 1]): |
|
|
if not create_dummy: |
|
|
raise ModuleNotFoundError(e) from e |
|
|
sys.modules[path] = types.ModuleType(path) |
|
|
sys.modules[path].__file__ = 'mindspeed.dummy_module.py' |
|
|
sys.modules[path].__spec__ = ModuleSpec(path, None) |
|
|
if parent: |
|
|
setattr(importlib.import_module(parent), modules[i - 1], sys.modules[path]) |
|
|
else: |
|
|
module = getattr(importlib.import_module(parent), modules[i - 1]) |
|
|
if hasattr(module, function_name): |
|
|
return module, getattr(module, function_name) |
|
|
elif create_dummy: |
|
|
return module, dummy_function_wrapper(function_name) |
|
|
else: |
|
|
raise RuntimeError('no exist {} of {}'.format(function_name, module)) |
|
|
|
|
|
if function_name is not None and not hasattr(sys.modules[module_path], function_name): |
|
|
setattr(sys.modules[module_path], function_name, None) |
|
|
return sys.modules[module_path], getattr(sys.modules[module_path], function_name) if function_name is not None else None |
|
|
|
|
|
|
|
|
class MindSpeedPatchesManager: |
|
|
patches_info = {} |
|
|
|
|
|
@staticmethod |
|
|
def register_patch(orig_func_name, new_func=None, force_patch=False, create_dummy=False): |
|
|
if orig_func_name not in MindSpeedPatchesManager.patches_info: |
|
|
MindSpeedPatchesManager.patches_info[orig_func_name] = Patch(orig_func_name, new_func, create_dummy) |
|
|
else: |
|
|
MindSpeedPatchesManager.patches_info.get(orig_func_name).set_patch_func(new_func, force_patch) |
|
|
|
|
|
@staticmethod |
|
|
def apply_patches(): |
|
|
for patch in MindSpeedPatchesManager.patches_info.values(): |
|
|
patch.apply_patch() |
|
|
|