# encoding:utf-8 import json import os import plugins from bridge.bridge import Bridge from bridge.context import ContextType from bridge.reply import Reply, ReplyType from common import const from common.log import logger from config import conf from plugins import * class RolePlay: def __init__(self, bot, sessionid, desc, wrapper=None): self.bot = bot self.sessionid = sessionid self.wrapper = wrapper or "%s" # 用于包装用户输入 self.desc = desc self.bot.sessions.build_session(self.sessionid, system_prompt=self.desc) def reset(self): self.bot.sessions.clear_session(self.sessionid) def action(self, user_action): session = self.bot.sessions.build_session(self.sessionid) if session.system_prompt != self.desc: # 目前没有触发session过期事件,这里先简单判断,然后重置 session.set_system_prompt(self.desc) prompt = self.wrapper % user_action return prompt @plugins.register( name="Role", desire_priority=0, namecn="角色扮演", desc="为你的Bot设置预设角色", version="1.0", author="lanvent", ) class Role(Plugin): def __init__(self): super().__init__() curdir = os.path.dirname(__file__) config_path = os.path.join(curdir, "roles.json") try: with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) self.tags = {tag: (desc, []) for tag, desc in config["tags"].items()} self.roles = {} for role in config["roles"]: self.roles[role["title"].lower()] = role for tag in role["tags"]: if tag not in self.tags: logger.warning(f"[Role] unknown tag {tag} ") self.tags[tag] = (tag, []) self.tags[tag][1].append(role) for tag in list(self.tags.keys()): if len(self.tags[tag][1]) == 0: logger.debug(f"[Role] no role found for tag {tag} ") del self.tags[tag] if len(self.roles) == 0: raise Exception("no role found") self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context self.roleplays = {} logger.info("[Role] inited") except Exception as e: if isinstance(e, FileNotFoundError): logger.warn(f"[Role] init failed, {config_path} not found, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .") else: logger.warn("[Role] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .") raise e def get_role(self, name, find_closest=True, min_sim=0.35): name = name.lower() found_role = None if name in self.roles: found_role = name elif find_closest: import difflib def str_simularity(a, b): return difflib.SequenceMatcher(None, a, b).ratio() max_sim = min_sim max_role = None for role in self.roles: sim = str_simularity(name, role) if sim >= max_sim: max_sim = sim max_role = role found_role = max_role return found_role def on_handle_context(self, e_context: EventContext): if e_context["context"].type != ContextType.TEXT: return btype = Bridge().get_bot_type("chat") if btype not in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI]: return bot = Bridge().get_bot("chat") content = e_context["context"].content[:] clist = e_context["context"].content.split(maxsplit=1) desckey = None customize = False sessionid = e_context["context"]["session_id"] trigger_prefix = conf().get("plugin_trigger_prefix", "$") if clist[0] == f"{trigger_prefix}停止扮演": if sessionid in self.roleplays: self.roleplays[sessionid].reset() del self.roleplays[sessionid] reply = Reply(ReplyType.INFO, "角色扮演结束!") e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS return elif clist[0] == f"{trigger_prefix}角色": desckey = "descn" elif clist[0].lower() == f"{trigger_prefix}role": desckey = "description" elif clist[0] == f"{trigger_prefix}设定扮演": customize = True elif clist[0] == f"{trigger_prefix}角色类型": if len(clist) > 1: tag = clist[1].strip() help_text = "角色列表:\n" for key, value in self.tags.items(): if value[0] == tag: tag = key break if tag == "所有": for role in self.roles.values(): help_text += f"{role['title']}: {role['remark']}\n" elif tag in self.tags: for role in self.tags[tag][1]: help_text += f"{role['title']}: {role['remark']}\n" else: help_text = f"未知角色类型。\n" help_text += "目前的角色类型有: \n" help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n" else: help_text = f"请输入角色类型。\n" help_text += "目前的角色类型有: \n" help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n" reply = Reply(ReplyType.INFO, help_text) e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS return elif sessionid not in self.roleplays: return logger.debug("[Role] on_handle_context. content: %s" % content) if desckey is not None: if len(clist) == 1 or (len(clist) > 1 and clist[1].lower() in ["help", "帮助"]): reply = Reply(ReplyType.INFO, self.get_help_text(verbose=True)) e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS return role = self.get_role(clist[1]) if role is None: reply = Reply(ReplyType.ERROR, "角色不存在") e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS return else: self.roleplays[sessionid] = RolePlay( bot, sessionid, self.roles[role][desckey], self.roles[role].get("wrapper", "%s"), ) reply = Reply(ReplyType.INFO, f"预设角色为 {role}:\n" + self.roles[role][desckey]) e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS elif customize == True: self.roleplays[sessionid] = RolePlay(bot, sessionid, clist[1], "%s") reply = Reply(ReplyType.INFO, f"角色设定为:\n{clist[1]}") e_context["reply"] = reply e_context.action = EventAction.BREAK_PASS else: prompt = self.roleplays[sessionid].action(content) e_context["context"].type = ContextType.TEXT e_context["context"].content = prompt e_context.action = EventAction.BREAK def get_help_text(self, verbose=False, **kwargs): help_text = "让机器人扮演不同的角色。\n" if not verbose: return help_text trigger_prefix = conf().get("plugin_trigger_prefix", "$") help_text = f"使用方法:\n{trigger_prefix}角色" + " 预设角色名: 设定角色为{预设角色名}。\n" + f"{trigger_prefix}role" + " 预设角色名: 同上,但使用英文设定。\n" help_text += f"{trigger_prefix}设定扮演" + " 角色设定: 设定自定义角色人设为{角色设定}。\n" help_text += f"{trigger_prefix}停止扮演: 清除设定的角色。\n" help_text += f"{trigger_prefix}角色类型" + " 角色类型: 查看某类{角色类型}的所有预设角色,为所有时输出所有预设角色。\n" help_text += "\n目前的角色类型有: \n" help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "。\n" help_text += f"\n命令例子: \n{trigger_prefix}角色 写作助理\n" help_text += f"{trigger_prefix}角色类型 所有\n" help_text += f"{trigger_prefix}停止扮演\n" return help_text