|
|
|
|
|
import json |
|
import os |
|
import random |
|
import string |
|
import logging |
|
from typing import Tuple |
|
|
|
import bridge.bridge |
|
import plugins |
|
from bridge.bridge import Bridge |
|
from bridge.context import ContextType |
|
from bridge.reply import Reply, ReplyType |
|
from common import const |
|
from config import conf, load_config, global_config |
|
from plugins import * |
|
|
|
|
|
COMMANDS = { |
|
"help": { |
|
"alias": ["help", "帮助"], |
|
"desc": "回复此帮助", |
|
}, |
|
"helpp": { |
|
"alias": ["help", "帮助"], |
|
"args": ["插件名"], |
|
"desc": "回复指定插件的详细帮助", |
|
}, |
|
"auth": { |
|
"alias": ["auth", "认证"], |
|
"args": ["口令"], |
|
"desc": "管理员认证", |
|
}, |
|
"model": { |
|
"alias": ["model", "模型"], |
|
"desc": "查看和设置全局模型", |
|
}, |
|
"set_openai_api_key": { |
|
"alias": ["set_openai_api_key"], |
|
"args": ["api_key"], |
|
"desc": "设置你的OpenAI私有api_key", |
|
}, |
|
"reset_openai_api_key": { |
|
"alias": ["reset_openai_api_key"], |
|
"desc": "重置为默认的api_key", |
|
}, |
|
"set_gpt_model": { |
|
"alias": ["set_gpt_model"], |
|
"desc": "设置你的私有模型", |
|
}, |
|
"reset_gpt_model": { |
|
"alias": ["reset_gpt_model"], |
|
"desc": "重置你的私有模型", |
|
}, |
|
"gpt_model": { |
|
"alias": ["gpt_model"], |
|
"desc": "查询你使用的模型", |
|
}, |
|
"id": { |
|
"alias": ["id", "用户"], |
|
"desc": "获取用户id", |
|
}, |
|
"reset": { |
|
"alias": ["reset", "重置会话"], |
|
"desc": "重置会话", |
|
}, |
|
} |
|
|
|
ADMIN_COMMANDS = { |
|
"resume": { |
|
"alias": ["resume", "恢复服务"], |
|
"desc": "恢复服务", |
|
}, |
|
"stop": { |
|
"alias": ["stop", "暂停服务"], |
|
"desc": "暂停服务", |
|
}, |
|
"reconf": { |
|
"alias": ["reconf", "重载配置"], |
|
"desc": "重载配置(不包含插件配置)", |
|
}, |
|
"resetall": { |
|
"alias": ["resetall", "重置所有会话"], |
|
"desc": "重置所有会话", |
|
}, |
|
"scanp": { |
|
"alias": ["scanp", "扫描插件"], |
|
"desc": "扫描插件目录是否有新插件", |
|
}, |
|
"plist": { |
|
"alias": ["plist", "插件"], |
|
"desc": "打印当前插件列表", |
|
}, |
|
"setpri": { |
|
"alias": ["setpri", "设置插件优先级"], |
|
"args": ["插件名", "优先级"], |
|
"desc": "设置指定插件的优先级,越大越优先", |
|
}, |
|
"reloadp": { |
|
"alias": ["reloadp", "重载插件"], |
|
"args": ["插件名"], |
|
"desc": "重载指定插件配置", |
|
}, |
|
"enablep": { |
|
"alias": ["enablep", "启用插件"], |
|
"args": ["插件名"], |
|
"desc": "启用指定插件", |
|
}, |
|
"disablep": { |
|
"alias": ["disablep", "禁用插件"], |
|
"args": ["插件名"], |
|
"desc": "禁用指定插件", |
|
}, |
|
"installp": { |
|
"alias": ["installp", "安装插件"], |
|
"args": ["仓库地址或插件名"], |
|
"desc": "安装指定插件", |
|
}, |
|
"uninstallp": { |
|
"alias": ["uninstallp", "卸载插件"], |
|
"args": ["插件名"], |
|
"desc": "卸载指定插件", |
|
}, |
|
"updatep": { |
|
"alias": ["updatep", "更新插件"], |
|
"args": ["插件名"], |
|
"desc": "更新指定插件", |
|
}, |
|
"debug": { |
|
"alias": ["debug", "调试模式", "DEBUG"], |
|
"desc": "开启机器调试日志", |
|
}, |
|
} |
|
|
|
|
|
|
|
def get_help_text(isadmin, isgroup): |
|
help_text = "通用指令\n" |
|
for cmd, info in COMMANDS.items(): |
|
if cmd in ["auth", "set_openai_api_key", "reset_openai_api_key", "set_gpt_model", "reset_gpt_model", "gpt_model"]: |
|
continue |
|
if cmd == "id" and conf().get("channel_type", "wx") not in ["wxy", "wechatmp"]: |
|
continue |
|
alias = ["#" + a for a in info["alias"][:1]] |
|
help_text += f"{','.join(alias)} " |
|
if "args" in info: |
|
args = [a for a in info["args"]] |
|
help_text += f"{' '.join(args)}" |
|
help_text += f": {info['desc']}\n" |
|
|
|
|
|
plugins = PluginManager().list_plugins() |
|
help_text += "\n可用插件" |
|
for plugin in plugins: |
|
if plugins[plugin].enabled and not plugins[plugin].hidden: |
|
namecn = plugins[plugin].namecn |
|
help_text += "\n%s:" % namecn |
|
help_text += PluginManager().instances[plugin].get_help_text(verbose=False).strip() |
|
|
|
if ADMIN_COMMANDS and isadmin: |
|
help_text += "\n\n管理员指令:\n" |
|
for cmd, info in ADMIN_COMMANDS.items(): |
|
alias = ["#" + a for a in info["alias"][:1]] |
|
help_text += f"{','.join(alias)} " |
|
if "args" in info: |
|
args = [a for a in info["args"]] |
|
help_text += f"{' '.join(args)}" |
|
help_text += f": {info['desc']}\n" |
|
return help_text |
|
|
|
|
|
@plugins.register( |
|
name="Godcmd", |
|
desire_priority=999, |
|
hidden=True, |
|
desc="为你的机器人添加指令集,有用户和管理员两种角色,加载顺序请放在首位,初次运行后插件目录会生成配置文件, 填充管理员密码后即可认证", |
|
version="1.0", |
|
author="lanvent", |
|
) |
|
class Godcmd(Plugin): |
|
def __init__(self): |
|
super().__init__() |
|
|
|
config_path = os.path.join(os.path.dirname(__file__), "config.json") |
|
gconf = super().load_config() |
|
if not gconf: |
|
if not os.path.exists(config_path): |
|
gconf = {"password": "", "admin_users": []} |
|
with open(config_path, "w") as f: |
|
json.dump(gconf, f, indent=4) |
|
if gconf["password"] == "": |
|
self.temp_password = "".join(random.sample(string.digits, 4)) |
|
logger.info("[Godcmd] 因未设置口令,本次的临时口令为%s。" % self.temp_password) |
|
else: |
|
self.temp_password = None |
|
custom_commands = conf().get("clear_memory_commands", []) |
|
for custom_command in custom_commands: |
|
if custom_command and custom_command.startswith("#"): |
|
custom_command = custom_command[1:] |
|
if custom_command and custom_command not in COMMANDS["reset"]["alias"]: |
|
COMMANDS["reset"]["alias"].append(custom_command) |
|
|
|
self.password = gconf["password"] |
|
self.admin_users = gconf["admin_users"] |
|
global_config["admin_users"] = self.admin_users |
|
self.isrunning = True |
|
|
|
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context |
|
logger.info("[Godcmd] inited") |
|
|
|
def on_handle_context(self, e_context: EventContext): |
|
context_type = e_context["context"].type |
|
if context_type != ContextType.TEXT: |
|
if not self.isrunning: |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
|
|
content = e_context["context"].content |
|
logger.debug("[Godcmd] on_handle_context. content: %s" % content) |
|
if content.startswith("#"): |
|
if len(content) == 1: |
|
reply = Reply() |
|
reply.type = ReplyType.ERROR |
|
reply.content = f"空指令,输入#help查看指令列表\n" |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
|
|
channel = e_context["channel"] |
|
user = e_context["context"]["receiver"] |
|
session_id = e_context["context"]["session_id"] |
|
isgroup = e_context["context"].get("isgroup", False) |
|
bottype = Bridge().get_bot_type("chat") |
|
bot = Bridge().get_bot("chat") |
|
|
|
command_parts = content[1:].strip().split() |
|
cmd = command_parts[0] |
|
args = command_parts[1:] |
|
isadmin = False |
|
if user in self.admin_users: |
|
isadmin = True |
|
ok = False |
|
result = "string" |
|
if any(cmd in info["alias"] for info in COMMANDS.values()): |
|
cmd = next(c for c, info in COMMANDS.items() if cmd in info["alias"]) |
|
if cmd == "auth": |
|
ok, result = self.authenticate(user, args, isadmin, isgroup) |
|
elif cmd == "help" or cmd == "helpp": |
|
if len(args) == 0: |
|
ok, result = True, get_help_text(isadmin, isgroup) |
|
else: |
|
|
|
plugins = PluginManager().list_plugins() |
|
query_name = args[0].upper() |
|
|
|
for name, plugincls in plugins.items(): |
|
if not plugincls.enabled: |
|
continue |
|
if query_name == name or query_name == plugincls.namecn: |
|
ok, result = True, PluginManager().instances[name].get_help_text(isgroup=isgroup, isadmin=isadmin, verbose=True) |
|
break |
|
if not ok: |
|
result = "插件不存在或未启用" |
|
elif cmd == "model": |
|
if not isadmin and not self.is_admin_in_group(e_context["context"]): |
|
ok, result = False, "需要管理员权限执行" |
|
elif len(args) == 0: |
|
model = conf().get("model") or const.GPT35 |
|
ok, result = True, "当前模型为: " + str(model) |
|
elif len(args) == 1: |
|
if args[0] not in const.MODEL_LIST: |
|
ok, result = False, "模型名称不存在" |
|
else: |
|
conf()["model"] = self.model_mapping(args[0]) |
|
Bridge().reset_bot() |
|
model = conf().get("model") or const.GPT35 |
|
ok, result = True, "模型设置为: " + str(model) |
|
elif cmd == "id": |
|
ok, result = True, user |
|
elif cmd == "set_openai_api_key": |
|
if len(args) == 1: |
|
user_data = conf().get_user_data(user) |
|
user_data["openai_api_key"] = args[0] |
|
ok, result = True, "你的OpenAI私有api_key已设置为" + args[0] |
|
else: |
|
ok, result = False, "请提供一个api_key" |
|
elif cmd == "reset_openai_api_key": |
|
try: |
|
user_data = conf().get_user_data(user) |
|
user_data.pop("openai_api_key") |
|
ok, result = True, "你的OpenAI私有api_key已清除" |
|
except Exception as e: |
|
ok, result = False, "你没有设置私有api_key" |
|
elif cmd == "set_gpt_model": |
|
if len(args) == 1: |
|
user_data = conf().get_user_data(user) |
|
user_data["gpt_model"] = args[0] |
|
ok, result = True, "你的GPT模型已设置为" + args[0] |
|
else: |
|
ok, result = False, "请提供一个GPT模型" |
|
elif cmd == "gpt_model": |
|
user_data = conf().get_user_data(user) |
|
model = conf().get("model") |
|
if "gpt_model" in user_data: |
|
model = user_data["gpt_model"] |
|
ok, result = True, "你的GPT模型为" + str(model) |
|
elif cmd == "reset_gpt_model": |
|
try: |
|
user_data = conf().get_user_data(user) |
|
user_data.pop("gpt_model") |
|
ok, result = True, "你的GPT模型已重置" |
|
except Exception as e: |
|
ok, result = False, "你没有设置私有GPT模型" |
|
elif cmd == "reset": |
|
if bottype in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI, const.BAIDU, const.XUNFEI]: |
|
bot.sessions.clear_session(session_id) |
|
if Bridge().chat_bots.get(bottype): |
|
Bridge().chat_bots.get(bottype).sessions.clear_session(session_id) |
|
channel.cancel_session(session_id) |
|
ok, result = True, "会话已重置" |
|
else: |
|
ok, result = False, "当前对话机器人不支持重置会话" |
|
logger.debug("[Godcmd] command: %s by %s" % (cmd, user)) |
|
elif any(cmd in info["alias"] for info in ADMIN_COMMANDS.values()): |
|
if isadmin: |
|
if isgroup: |
|
ok, result = False, "群聊不可执行管理员指令" |
|
else: |
|
cmd = next(c for c, info in ADMIN_COMMANDS.items() if cmd in info["alias"]) |
|
if cmd == "stop": |
|
self.isrunning = False |
|
ok, result = True, "服务已暂停" |
|
elif cmd == "resume": |
|
self.isrunning = True |
|
ok, result = True, "服务已恢复" |
|
elif cmd == "reconf": |
|
load_config() |
|
ok, result = True, "配置已重载" |
|
elif cmd == "resetall": |
|
if bottype in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI, |
|
const.BAIDU, const.XUNFEI]: |
|
channel.cancel_all_session() |
|
bot.sessions.clear_all_session() |
|
ok, result = True, "重置所有会话成功" |
|
else: |
|
ok, result = False, "当前对话机器人不支持重置会话" |
|
elif cmd == "debug": |
|
if logger.getEffectiveLevel() == logging.DEBUG: |
|
logger.setLevel(logging.INFO) |
|
ok, result = True, "DEBUG模式已关闭" |
|
else: |
|
logger.setLevel(logging.DEBUG) |
|
ok, result = True, "DEBUG模式已开启" |
|
elif cmd == "plist": |
|
plugins = PluginManager().list_plugins() |
|
ok = True |
|
result = "插件列表:\n" |
|
for name, plugincls in plugins.items(): |
|
result += f"{plugincls.name}_v{plugincls.version} {plugincls.priority} - " |
|
if plugincls.enabled: |
|
result += "已启用\n" |
|
else: |
|
result += "未启用\n" |
|
elif cmd == "scanp": |
|
new_plugins = PluginManager().scan_plugins() |
|
ok, result = True, "插件扫描完成" |
|
PluginManager().activate_plugins() |
|
if len(new_plugins) > 0: |
|
result += "\n发现新插件:\n" |
|
result += "\n".join([f"{p.name}_v{p.version}" for p in new_plugins]) |
|
else: |
|
result += ", 未发现新插件" |
|
elif cmd == "setpri": |
|
if len(args) != 2: |
|
ok, result = False, "请提供插件名和优先级" |
|
else: |
|
ok = PluginManager().set_plugin_priority(args[0], int(args[1])) |
|
if ok: |
|
result = "插件" + args[0] + "优先级已设置为" + args[1] |
|
else: |
|
result = "插件不存在" |
|
elif cmd == "reloadp": |
|
if len(args) != 1: |
|
ok, result = False, "请提供插件名" |
|
else: |
|
ok = PluginManager().reload_plugin(args[0]) |
|
if ok: |
|
result = "插件配置已重载" |
|
else: |
|
result = "插件不存在" |
|
elif cmd == "enablep": |
|
if len(args) != 1: |
|
ok, result = False, "请提供插件名" |
|
else: |
|
ok, result = PluginManager().enable_plugin(args[0]) |
|
elif cmd == "disablep": |
|
if len(args) != 1: |
|
ok, result = False, "请提供插件名" |
|
else: |
|
ok = PluginManager().disable_plugin(args[0]) |
|
if ok: |
|
result = "插件已禁用" |
|
else: |
|
result = "插件不存在" |
|
elif cmd == "installp": |
|
if len(args) != 1: |
|
ok, result = False, "请提供插件名或.git结尾的仓库地址" |
|
else: |
|
ok, result = PluginManager().install_plugin(args[0]) |
|
elif cmd == "uninstallp": |
|
if len(args) != 1: |
|
ok, result = False, "请提供插件名" |
|
else: |
|
ok, result = PluginManager().uninstall_plugin(args[0]) |
|
elif cmd == "updatep": |
|
if len(args) != 1: |
|
ok, result = False, "请提供插件名" |
|
else: |
|
ok, result = PluginManager().update_plugin(args[0]) |
|
logger.debug("[Godcmd] admin command: %s by %s" % (cmd, user)) |
|
else: |
|
ok, result = False, "需要管理员权限才能执行该指令" |
|
else: |
|
trigger_prefix = conf().get("plugin_trigger_prefix", "$") |
|
if trigger_prefix == "#": |
|
return |
|
ok, result = False, f"未知指令:{cmd}\n查看指令列表请输入#help \n" |
|
|
|
reply = Reply() |
|
if ok: |
|
reply.type = ReplyType.INFO |
|
else: |
|
reply.type = ReplyType.ERROR |
|
reply.content = result |
|
e_context["reply"] = reply |
|
|
|
e_context.action = EventAction.BREAK_PASS |
|
elif not self.isrunning: |
|
e_context.action = EventAction.BREAK_PASS |
|
|
|
def authenticate(self, userid, args, isadmin, isgroup) -> Tuple[bool, str]: |
|
if isgroup: |
|
return False, "请勿在群聊中认证" |
|
|
|
if isadmin: |
|
return False, "管理员账号无需认证" |
|
|
|
if len(args) != 1: |
|
return False, "请提供口令" |
|
|
|
password = args[0] |
|
if password == self.password: |
|
self.admin_users.append(userid) |
|
global_config["admin_users"].append(userid) |
|
return True, "认证成功" |
|
elif password == self.temp_password: |
|
self.admin_users.append(userid) |
|
global_config["admin_users"].append(userid) |
|
return True, "认证成功,请尽快设置口令" |
|
else: |
|
return False, "认证失败" |
|
|
|
def get_help_text(self, isadmin=False, isgroup=False, **kwargs): |
|
return get_help_text(isadmin, isgroup) |
|
|
|
|
|
def is_admin_in_group(self, context): |
|
if context["isgroup"]: |
|
return context.kwargs.get("msg").actual_user_id in global_config["admin_users"] |
|
return False |
|
|
|
|
|
def model_mapping(self, model) -> str: |
|
if model == "gpt-4-turbo": |
|
return const.GPT4_TURBO_PREVIEW |
|
return model |
|
|