import json import traceback class PluginManager(object): def __init__(self, APP_VERSION, PROD, CPU_ONLY, logger): super(PluginManager, self).__init__() self.APP_VERSION = APP_VERSION self.CPU_ONLY = CPU_ONLY self.PROD = PROD self.path = "./resources/app" if PROD else "." self.modules_path = "resources.app." if PROD else "" self.logger = logger self.setupModules = set() self.enabledPlugins = set() self.teardownModules = {} self.refresh_active_plugins() def reset_plugins (self): self.plugins = { "custom-event": [], "start": { "pre": [], "mid": [], "post": [] }, "load-model": { "pre": [], "mid": [], "post": [] }, "synth-line": { "pre": [], "mid": [], "pre_energy": [], "post": [] }, "batch-synth-line": { "pre": [], "mid": [], "post": [] }, "mp-output-audio": { "pre": [], "post": [] }, "arpabet-replace": { "pre": [], "post": [] }, "output-audio": { "pre": [], "mid": [], "post": [] }, } self.plugins_context_cache = {} for key in self.plugins.keys(): if key=="custom-event": pass else: self.plugins_context_cache[key] = {} for sub_key in self.plugins[key].keys(): self.plugins_context_cache[key][sub_key] = {} def set_context_cache (self, event, hook, plugin_id, data): self.plugins_context_cache[event][hook][plugin_id] = data def get_active_plugins_count (self): active_plugins = [] for _ in self.plugins["custom-event"]: active_plugins.append(["custom-event", None]) for key in self.plugins.keys(): if key=="custom-event": continue plugin_triggers = list(self.plugins[key].keys()) for trigger_type in plugin_triggers: for plugin in self.plugins[key][trigger_type]: active_plugins.append([key, trigger_type]) return len(active_plugins) # For ease of access def set_context (self, data): self.context = data def refresh_active_plugins (self): with open("plugins.txt") as f: lines = f.read().split("\n") removed_plugins = [] for line in lines: if not line.startswith("*") and line in self.enabledPlugins: removed_plugins.append(line) for plugin_id in removed_plugins: if plugin_id in self.teardownModules: for func in self.teardownModules[plugin_id]: params = {"logger": self.logger, "appVersion": self.APP_VERSION, "isCPUonly": self.CPU_ONLY, "isDev": not self.PROD} func(params) self.reset_plugins() status = [] for line in lines: if line.startswith("*"): plugin_id = line[1:] self.logger.info(f'plugin_id: {plugin_id}') try: with open(f'{self.path}/plugins/{plugin_id}/plugin.json') as f: plugin_json = f.read() plugin_json = json.loads(plugin_json) minVersionOk = checkVersionRequirements(plugin_json["min-app-version"] if "min-app-version" in plugin_json else None, self.APP_VERSION) maxVersionOk = checkVersionRequirements(plugin_json["max-app-version"] if "max-app-version" in plugin_json else None, self.APP_VERSION, True) if not minVersionOk or not maxVersionOk: self.logger.info(f'minVersionOk {minVersionOk}') self.logger.info(f'maxVersionOk {maxVersionOk}') continue for key in self.plugins.keys(): if key=="custom-event": self.load_module_function(plugin_json, plugin_id, ["back-end-hooks", "custom-event"], []) else: plugin_triggers = list(self.plugins[key].keys()) for trigger_type in plugin_triggers: self.load_module_function(plugin_json, plugin_id, ["back-end-hooks", key, trigger_type], []) self.enabledPlugins.add(plugin_id) status.append("OK") except: self.logger.info(traceback.format_exc()) status.append(plugin_id) return status def load_module_function (self, plugin_json, plugin_name, structure, structure2): if structure[0] in plugin_json and plugin_json[structure[0]] is not None: key = structure[0] structure2.append(key) plugin_json = plugin_json[key] del structure[0] if len(structure): return self.load_module_function(plugin_json, plugin_name, structure, structure2) else: file_name = plugin_json["file"] function = plugin_json["function"] if not file_name: return if file_name.endswith(".py"): setup = {"logger": self.logger, "appVersion": self.APP_VERSION, "isCPUonly": self.CPU_ONLY, "isDev": not self.PROD} with open(f'{self.path}/plugins/{plugin_name}/{file_name}') as f: locals_data = locals() locals_data["setupData"] = setup locals_data["plugins"] = self.plugins exec(f.read(), None, locals_data) import types for key in list(locals_data.keys()): if isinstance(locals_data[key], types.FunctionType): if key==function: if structure2[-1]=="custom-event": self.plugins[structure2[-1]].append([plugin_name, file_name, locals_data[key]]) else: self.plugins[structure2[-2]][structure2[-1]].append([plugin_name, file_name, locals_data[key]]) elif key=="setup": if f'{plugin_name}/{file_name}' not in self.setupModules: self.setupModules.add(f'{plugin_name}/{file_name}') locals_data[key](setup) elif key=="teardown": if plugin_name not in self.teardownModules.keys(): self.teardownModules[plugin_name] = [] self.teardownModules[plugin_name].append(locals_data[key]) else: self.logger.info(f'[Plugin: {plugin_name}]: Cannot import {file_name} file for {structure2[-1]} {structure2[-2]} entry-point: Only python files are supported right now.') def run_plugins (self, plist, event="", data=None): response = None if event=="custom-event" and "pluginId" not in data: self.logger.info(f'Custom event called, but "pluginId" not specified. Not running.') return None if len(plist): self.logger.info("Running plugins for event:" + event) for [plugin_name, file_name, function] in plist: if event=="custom-event" and plugin_name!=data["pluginId"]: continue hook, eventName = event.split(" ") if plugin_name in self.plugins_context_cache[eventName][hook].keys(): data["context_cache"] = self.plugins_context_cache[eventName][hook][plugin_name] # else: # data["context_cache"] = None try: self.logger.info(plugin_name) self.logger.set_logger_prefix(plugin_name) function(data) if "context_cache" in data.keys(): self.plugins_context_cache[eventName][hook][plugin_name] = data["context_cache"] self.logger.set_logger_prefix("") except: self.logger.info(f'[Plugin run error at event "{event}": {plugin_name}]') self.logger.info(traceback.format_exc()) return response def checkVersionRequirements (requirements, appVersion, checkMax=False): if not requirements: return True appVersionRequirement = [int(val) for val in str(requirements).split(".")] appVersionInts = [int(val) for val in str(appVersion).split(".")] appVersionOk = True if checkMax: if appVersionRequirement[0] >= appVersionInts[0]: if len(appVersionRequirement)>1 and int(appVersionRequirement[0])==appVersionInts[0]: if appVersionRequirement[1] >= appVersionInts[1]: if len(appVersionRequirement)>2 and int(appVersionRequirement[1])==appVersionInts[1]: if appVersionRequirement[2] >= appVersionInts[2]: pass else: appVersion = False else: appVersionOk = False else: appVersionOk = False else: if appVersionRequirement[0] <= appVersionInts[0]: if len(appVersionRequirement)>1 and int(appVersionRequirement[0])==appVersionInts[0]: if appVersionRequirement[1] <= appVersionInts[1]: if len(appVersionRequirement)>2 and int(appVersionRequirement[1])==appVersionInts[1]: if appVersionRequirement[2] <= appVersionInts[2]: pass else: appVersion = False else: appVersionOk = False else: appVersionOk = False return appVersionOk