|
|
|
|
|
import json |
|
import os |
|
|
|
import plugins |
|
from bridge.bridge import Bridge |
|
from bridge.context import ContextType |
|
from bridge.reply import Reply, ReplyType |
|
from common import const |
|
from common.log import logger |
|
from config import conf |
|
from plugins import * |
|
|
|
|
|
class RolePlay: |
|
def __init__(self, bot, sessionid, desc, wrapper=None): |
|
self.bot = bot |
|
self.sessionid = sessionid |
|
self.wrapper = wrapper or "%s" |
|
self.desc = desc |
|
self.bot.sessions.build_session(self.sessionid, system_prompt=self.desc) |
|
|
|
def reset(self): |
|
self.bot.sessions.clear_session(self.sessionid) |
|
|
|
def action(self, user_action): |
|
session = self.bot.sessions.build_session(self.sessionid) |
|
if session.system_prompt != self.desc: |
|
session.set_system_prompt(self.desc) |
|
prompt = self.wrapper % user_action |
|
return prompt |
|
|
|
|
|
@plugins.register( |
|
name="Role", |
|
desire_priority=0, |
|
namecn="角色扮演", |
|
desc="为你的Bot设置预设角色", |
|
version="1.0", |
|
author="lanvent", |
|
) |
|
class Role(Plugin): |
|
def __init__(self): |
|
super().__init__() |
|
curdir = os.path.dirname(__file__) |
|
config_path = os.path.join(curdir, "roles.json") |
|
try: |
|
with open(config_path, "r", encoding="utf-8") as f: |
|
config = json.load(f) |
|
self.tags = {tag: (desc, []) for tag, desc in config["tags"].items()} |
|
self.roles = {} |
|
for role in config["roles"]: |
|
self.roles[role["title"].lower()] = role |
|
for tag in role["tags"]: |
|
if tag not in self.tags: |
|
logger.warning(f"[Role] unknown tag {tag} ") |
|
self.tags[tag] = (tag, []) |
|
self.tags[tag][1].append(role) |
|
for tag in list(self.tags.keys()): |
|
if len(self.tags[tag][1]) == 0: |
|
logger.debug(f"[Role] no role found for tag {tag} ") |
|
del self.tags[tag] |
|
|
|
if len(self.roles) == 0: |
|
raise Exception("no role found") |
|
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context |
|
self.roleplays = {} |
|
logger.info("[Role] inited") |
|
except Exception as e: |
|
if isinstance(e, FileNotFoundError): |
|
logger.warn(f"[Role] init failed, {config_path} not found, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .") |
|
else: |
|
logger.warn("[Role] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .") |
|
raise e |
|
|
|
def get_role(self, name, find_closest=True, min_sim=0.35): |
|
name = name.lower() |
|
found_role = None |
|
if name in self.roles: |
|
found_role = name |
|
elif find_closest: |
|
import difflib |
|
|
|
def str_simularity(a, b): |
|
return difflib.SequenceMatcher(None, a, b).ratio() |
|
|
|
max_sim = min_sim |
|
max_role = None |
|
for role in self.roles: |
|
sim = str_simularity(name, role) |
|
if sim >= max_sim: |
|
max_sim = sim |
|
max_role = role |
|
found_role = max_role |
|
return found_role |
|
|
|
def on_handle_context(self, e_context: EventContext): |
|
if e_context["context"].type != ContextType.TEXT: |
|
return |
|
btype = Bridge().get_bot_type("chat") |
|
if btype not in [const.OPEN_AI, const.CHATGPT, const.CHATGPTONAZURE, const.LINKAI]: |
|
return |
|
bot = Bridge().get_bot("chat") |
|
content = e_context["context"].content[:] |
|
clist = e_context["context"].content.split(maxsplit=1) |
|
desckey = None |
|
customize = False |
|
sessionid = e_context["context"]["session_id"] |
|
trigger_prefix = conf().get("plugin_trigger_prefix", "$") |
|
if clist[0] == f"{trigger_prefix}停止扮演": |
|
if sessionid in self.roleplays: |
|
self.roleplays[sessionid].reset() |
|
del self.roleplays[sessionid] |
|
reply = Reply(ReplyType.INFO, "角色扮演结束!") |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
elif clist[0] == f"{trigger_prefix}角色": |
|
desckey = "descn" |
|
elif clist[0].lower() == f"{trigger_prefix}role": |
|
desckey = "description" |
|
elif clist[0] == f"{trigger_prefix}设定扮演": |
|
customize = True |
|
elif clist[0] == f"{trigger_prefix}角色类型": |
|
if len(clist) > 1: |
|
tag = clist[1].strip() |
|
help_text = "角色列表:\n" |
|
for key, value in self.tags.items(): |
|
if value[0] == tag: |
|
tag = key |
|
break |
|
if tag == "所有": |
|
for role in self.roles.values(): |
|
help_text += f"{role['title']}: {role['remark']}\n" |
|
elif tag in self.tags: |
|
for role in self.tags[tag][1]: |
|
help_text += f"{role['title']}: {role['remark']}\n" |
|
else: |
|
help_text = f"未知角色类型。\n" |
|
help_text += "目前的角色类型有: \n" |
|
help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n" |
|
else: |
|
help_text = f"请输入角色类型。\n" |
|
help_text += "目前的角色类型有: \n" |
|
help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "\n" |
|
reply = Reply(ReplyType.INFO, help_text) |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
elif sessionid not in self.roleplays: |
|
return |
|
logger.debug("[Role] on_handle_context. content: %s" % content) |
|
if desckey is not None: |
|
if len(clist) == 1 or (len(clist) > 1 and clist[1].lower() in ["help", "帮助"]): |
|
reply = Reply(ReplyType.INFO, self.get_help_text(verbose=True)) |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
role = self.get_role(clist[1]) |
|
if role is None: |
|
reply = Reply(ReplyType.ERROR, "角色不存在") |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
else: |
|
self.roleplays[sessionid] = RolePlay( |
|
bot, |
|
sessionid, |
|
self.roles[role][desckey], |
|
self.roles[role].get("wrapper", "%s"), |
|
) |
|
reply = Reply(ReplyType.INFO, f"预设角色为 {role}:\n" + self.roles[role][desckey]) |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
elif customize == True: |
|
self.roleplays[sessionid] = RolePlay(bot, sessionid, clist[1], "%s") |
|
reply = Reply(ReplyType.INFO, f"角色设定为:\n{clist[1]}") |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
else: |
|
prompt = self.roleplays[sessionid].action(content) |
|
e_context["context"].type = ContextType.TEXT |
|
e_context["context"].content = prompt |
|
e_context.action = EventAction.BREAK |
|
|
|
def get_help_text(self, verbose=False, **kwargs): |
|
help_text = "让机器人扮演不同的角色。\n" |
|
if not verbose: |
|
return help_text |
|
trigger_prefix = conf().get("plugin_trigger_prefix", "$") |
|
help_text = f"使用方法:\n{trigger_prefix}角色" + " 预设角色名: 设定角色为{预设角色名}。\n" + f"{trigger_prefix}role" + " 预设角色名: 同上,但使用英文设定。\n" |
|
help_text += f"{trigger_prefix}设定扮演" + " 角色设定: 设定自定义角色人设为{角色设定}。\n" |
|
help_text += f"{trigger_prefix}停止扮演: 清除设定的角色。\n" |
|
help_text += f"{trigger_prefix}角色类型" + " 角色类型: 查看某类{角色类型}的所有预设角色,为所有时输出所有预设角色。\n" |
|
help_text += "\n目前的角色类型有: \n" |
|
help_text += ",".join([self.tags[tag][0] for tag in self.tags]) + "。\n" |
|
help_text += f"\n命令例子: \n{trigger_prefix}角色 写作助理\n" |
|
help_text += f"{trigger_prefix}角色类型 所有\n" |
|
help_text += f"{trigger_prefix}停止扮演\n" |
|
return help_text |
|
|