|
|
|
|
|
import json |
|
import os |
|
|
|
import plugins |
|
from bridge.context import ContextType |
|
from bridge.reply import Reply, ReplyType |
|
from common.log import logger |
|
from plugins import * |
|
|
|
from .lib.WordsSearch import WordsSearch |
|
|
|
|
|
@plugins.register( |
|
name="Banwords", |
|
desire_priority=100, |
|
hidden=True, |
|
desc="判断消息中是否有敏感词、决定是否回复。", |
|
version="1.0", |
|
author="lanvent", |
|
) |
|
class Banwords(Plugin): |
|
def __init__(self): |
|
super().__init__() |
|
try: |
|
|
|
conf = super().load_config() |
|
curdir = os.path.dirname(__file__) |
|
if not conf: |
|
|
|
config_path = os.path.join(curdir, "config.json") |
|
if not os.path.exists(config_path): |
|
conf = {"action": "ignore"} |
|
with open(config_path, "w") as f: |
|
json.dump(conf, f, indent=4) |
|
|
|
self.searchr = WordsSearch() |
|
self.action = conf["action"] |
|
banwords_path = os.path.join(curdir, "banwords.txt") |
|
with open(banwords_path, "r", encoding="utf-8") as f: |
|
words = [] |
|
for line in f: |
|
word = line.strip() |
|
if word: |
|
words.append(word) |
|
self.searchr.SetKeywords(words) |
|
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context |
|
if conf.get("reply_filter", True): |
|
self.handlers[Event.ON_DECORATE_REPLY] = self.on_decorate_reply |
|
self.reply_action = conf.get("reply_action", "ignore") |
|
logger.info("[Banwords] inited") |
|
except Exception as e: |
|
logger.warn("[Banwords] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/banwords .") |
|
raise e |
|
|
|
def on_handle_context(self, e_context: EventContext): |
|
if e_context["context"].type not in [ |
|
ContextType.TEXT, |
|
ContextType.IMAGE_CREATE, |
|
]: |
|
return |
|
|
|
content = e_context["context"].content |
|
logger.debug("[Banwords] on_handle_context. content: %s" % content) |
|
if self.action == "ignore": |
|
f = self.searchr.FindFirst(content) |
|
if f: |
|
logger.info("[Banwords] %s in message" % f["Keyword"]) |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
elif self.action == "replace": |
|
if self.searchr.ContainsAny(content): |
|
reply = Reply(ReplyType.INFO, "发言中包含敏感词,请重试: \n" + self.searchr.Replace(content)) |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
|
|
def on_decorate_reply(self, e_context: EventContext): |
|
if e_context["reply"].type not in [ReplyType.TEXT]: |
|
return |
|
|
|
reply = e_context["reply"] |
|
content = reply.content |
|
if self.reply_action == "ignore": |
|
f = self.searchr.FindFirst(content) |
|
if f: |
|
logger.info("[Banwords] %s in reply" % f["Keyword"]) |
|
e_context["reply"] = None |
|
e_context.action = EventAction.BREAK_PASS |
|
return |
|
elif self.reply_action == "replace": |
|
if self.searchr.ContainsAny(content): |
|
reply = Reply(ReplyType.INFO, "已替换回复中的敏感词: \n" + self.searchr.Replace(content)) |
|
e_context["reply"] = reply |
|
e_context.action = EventAction.CONTINUE |
|
return |
|
|
|
def get_help_text(self, **kwargs): |
|
return "过滤消息中的敏感词。" |
|
|