xVASynth / resources /app /python /plugins_manager.py
Pendrokar's picture
relocate folders
ed18ebf
raw
history blame contribute delete
No virus
10.5 kB
import json
import traceback
class PluginManager(object):
def __init__(self, APP_VERSION, PROD, CPU_ONLY, logger):
super(PluginManager, self).__init__()
self.APP_VERSION = APP_VERSION
self.CPU_ONLY = CPU_ONLY
self.PROD = PROD
self.path = "./resources/app" if PROD else "."
self.modules_path = "resources.app." if PROD else ""
self.logger = logger
self.setupModules = set()
self.enabledPlugins = set()
self.teardownModules = {}
self.refresh_active_plugins()
def reset_plugins (self):
self.plugins = {
"custom-event": [],
"start": {
"pre": [],
"mid": [],
"post": []
},
"load-model": {
"pre": [],
"mid": [],
"post": []
},
"synth-line": {
"pre": [],
"mid": [],
"pre_energy": [],
"post": []
},
"batch-synth-line": {
"pre": [],
"mid": [],
"post": []
},
"mp-output-audio": {
"pre": [],
"post": []
},
"arpabet-replace": {
"pre": [],
"post": []
},
"output-audio": {
"pre": [],
"mid": [],
"post": []
},
}
self.plugins_context_cache = {}
for key in self.plugins.keys():
if key=="custom-event":
pass
else:
self.plugins_context_cache[key] = {}
for sub_key in self.plugins[key].keys():
self.plugins_context_cache[key][sub_key] = {}
def set_context_cache (self, event, hook, plugin_id, data):
self.plugins_context_cache[event][hook][plugin_id] = data
def get_active_plugins_count (self):
active_plugins = []
for _ in self.plugins["custom-event"]:
active_plugins.append(["custom-event", None])
for key in self.plugins.keys():
if key=="custom-event":
continue
plugin_triggers = list(self.plugins[key].keys())
for trigger_type in plugin_triggers:
for plugin in self.plugins[key][trigger_type]:
active_plugins.append([key, trigger_type])
return len(active_plugins)
# For ease of access
def set_context (self, data):
self.context = data
def refresh_active_plugins (self):
with open("plugins.txt") as f:
lines = f.read().split("\n")
removed_plugins = []
for line in lines:
if not line.startswith("*") and line in self.enabledPlugins:
removed_plugins.append(line)
for plugin_id in removed_plugins:
if plugin_id in self.teardownModules:
for func in self.teardownModules[plugin_id]:
params = {"logger": self.logger, "appVersion": self.APP_VERSION, "isCPUonly": self.CPU_ONLY, "isDev": not self.PROD}
func(params)
self.reset_plugins()
status = []
for line in lines:
if line.startswith("*"):
plugin_id = line[1:]
self.logger.info(f'plugin_id: {plugin_id}')
try:
with open(f'{self.path}/plugins/{plugin_id}/plugin.json') as f:
plugin_json = f.read()
plugin_json = json.loads(plugin_json)
minVersionOk = checkVersionRequirements(plugin_json["min-app-version"] if "min-app-version" in plugin_json else None, self.APP_VERSION)
maxVersionOk = checkVersionRequirements(plugin_json["max-app-version"] if "max-app-version" in plugin_json else None, self.APP_VERSION, True)
if not minVersionOk or not maxVersionOk:
self.logger.info(f'minVersionOk {minVersionOk}')
self.logger.info(f'maxVersionOk {maxVersionOk}')
continue
for key in self.plugins.keys():
if key=="custom-event":
self.load_module_function(plugin_json, plugin_id, ["back-end-hooks", "custom-event"], [])
else:
plugin_triggers = list(self.plugins[key].keys())
for trigger_type in plugin_triggers:
self.load_module_function(plugin_json, plugin_id, ["back-end-hooks", key, trigger_type], [])
self.enabledPlugins.add(plugin_id)
status.append("OK")
except:
self.logger.info(traceback.format_exc())
status.append(plugin_id)
return status
def load_module_function (self, plugin_json, plugin_name, structure, structure2):
if structure[0] in plugin_json and plugin_json[structure[0]] is not None:
key = structure[0]
structure2.append(key)
plugin_json = plugin_json[key]
del structure[0]
if len(structure):
return self.load_module_function(plugin_json, plugin_name, structure, structure2)
else:
file_name = plugin_json["file"]
function = plugin_json["function"]
if not file_name:
return
if file_name.endswith(".py"):
setup = {"logger": self.logger, "appVersion": self.APP_VERSION, "isCPUonly": self.CPU_ONLY, "isDev": not self.PROD}
with open(f'{self.path}/plugins/{plugin_name}/{file_name}') as f:
locals_data = locals()
locals_data["setupData"] = setup
locals_data["plugins"] = self.plugins
exec(f.read(), None, locals_data)
import types
for key in list(locals_data.keys()):
if isinstance(locals_data[key], types.FunctionType):
if key==function:
if structure2[-1]=="custom-event":
self.plugins[structure2[-1]].append([plugin_name, file_name, locals_data[key]])
else:
self.plugins[structure2[-2]][structure2[-1]].append([plugin_name, file_name, locals_data[key]])
elif key=="setup":
if f'{plugin_name}/{file_name}' not in self.setupModules:
self.setupModules.add(f'{plugin_name}/{file_name}')
locals_data[key](setup)
elif key=="teardown":
if plugin_name not in self.teardownModules.keys():
self.teardownModules[plugin_name] = []
self.teardownModules[plugin_name].append(locals_data[key])
else:
self.logger.info(f'[Plugin: {plugin_name}]: Cannot import {file_name} file for {structure2[-1]} {structure2[-2]} entry-point: Only python files are supported right now.')
def run_plugins (self, plist, event="", data=None):
response = None
if event=="custom-event" and "pluginId" not in data:
self.logger.info(f'Custom event called, but "pluginId" not specified. Not running.')
return None
if len(plist):
self.logger.info("Running plugins for event:" + event)
for [plugin_name, file_name, function] in plist:
if event=="custom-event" and plugin_name!=data["pluginId"]:
continue
hook, eventName = event.split(" ")
if plugin_name in self.plugins_context_cache[eventName][hook].keys():
data["context_cache"] = self.plugins_context_cache[eventName][hook][plugin_name]
# else:
# data["context_cache"] = None
try:
self.logger.info(plugin_name)
self.logger.set_logger_prefix(plugin_name)
function(data)
if "context_cache" in data.keys():
self.plugins_context_cache[eventName][hook][plugin_name] = data["context_cache"]
self.logger.set_logger_prefix("")
except:
self.logger.info(f'[Plugin run error at event "{event}": {plugin_name}]')
self.logger.info(traceback.format_exc())
return response
def checkVersionRequirements (requirements, appVersion, checkMax=False):
if not requirements:
return True
appVersionRequirement = [int(val) for val in str(requirements).split(".")]
appVersionInts = [int(val) for val in str(appVersion).split(".")]
appVersionOk = True
if checkMax:
if appVersionRequirement[0] >= appVersionInts[0]:
if len(appVersionRequirement)>1 and int(appVersionRequirement[0])==appVersionInts[0]:
if appVersionRequirement[1] >= appVersionInts[1]:
if len(appVersionRequirement)>2 and int(appVersionRequirement[1])==appVersionInts[1]:
if appVersionRequirement[2] >= appVersionInts[2]:
pass
else:
appVersion = False
else:
appVersionOk = False
else:
appVersionOk = False
else:
if appVersionRequirement[0] <= appVersionInts[0]:
if len(appVersionRequirement)>1 and int(appVersionRequirement[0])==appVersionInts[0]:
if appVersionRequirement[1] <= appVersionInts[1]:
if len(appVersionRequirement)>2 and int(appVersionRequirement[1])==appVersionInts[1]:
if appVersionRequirement[2] <= appVersionInts[2]:
pass
else:
appVersion = False
else:
appVersionOk = False
else:
appVersionOk = False
return appVersionOk