|
import re |
|
|
|
from bridge.context import ContextType |
|
from channel.chat_message import ChatMessage |
|
from common.log import logger |
|
from common.tmp_dir import TmpDir |
|
from lib import itchat |
|
from lib.itchat.content import * |
|
|
|
class WechatMessage(ChatMessage): |
|
def __init__(self, itchat_msg, is_group=False): |
|
super().__init__(itchat_msg) |
|
self.msg_id = itchat_msg["MsgId"] |
|
self.create_time = itchat_msg["CreateTime"] |
|
self.is_group = is_group |
|
|
|
if itchat_msg["Type"] == TEXT: |
|
self.ctype = ContextType.TEXT |
|
self.content = itchat_msg["Text"] |
|
elif itchat_msg["Type"] == VOICE: |
|
self.ctype = ContextType.VOICE |
|
self.content = TmpDir().path() + itchat_msg["FileName"] |
|
self._prepare_fn = lambda: itchat_msg.download(self.content) |
|
elif itchat_msg["Type"] == PICTURE and itchat_msg["MsgType"] == 3: |
|
self.ctype = ContextType.IMAGE |
|
self.content = TmpDir().path() + itchat_msg["FileName"] |
|
self._prepare_fn = lambda: itchat_msg.download(self.content) |
|
elif itchat_msg["Type"] == NOTE and itchat_msg["MsgType"] == 10000: |
|
if is_group and ("加入群聊" in itchat_msg["Content"] or "加入了群聊" in itchat_msg["Content"]): |
|
|
|
if "加入了群聊" in itchat_msg["Content"]: |
|
self.ctype = ContextType.JOIN_GROUP |
|
self.content = itchat_msg["Content"] |
|
self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[-1] |
|
elif "加入群聊" in itchat_msg["Content"]: |
|
self.ctype = ContextType.JOIN_GROUP |
|
self.content = itchat_msg["Content"] |
|
self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[0] |
|
|
|
elif is_group and ("移出了群聊" in itchat_msg["Content"]): |
|
self.ctype = ContextType.EXIT_GROUP |
|
self.content = itchat_msg["Content"] |
|
self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[0] |
|
|
|
elif "你已添加了" in itchat_msg["Content"]: |
|
self.ctype = ContextType.ACCEPT_FRIEND |
|
self.content = itchat_msg["Content"] |
|
elif "拍了拍我" in itchat_msg["Content"]: |
|
self.ctype = ContextType.PATPAT |
|
self.content = itchat_msg["Content"] |
|
if is_group: |
|
self.actual_user_nickname = re.findall(r"\"(.*?)\"", itchat_msg["Content"])[0] |
|
else: |
|
raise NotImplementedError("Unsupported note message: " + itchat_msg["Content"]) |
|
elif itchat_msg["Type"] == ATTACHMENT: |
|
self.ctype = ContextType.FILE |
|
self.content = TmpDir().path() + itchat_msg["FileName"] |
|
self._prepare_fn = lambda: itchat_msg.download(self.content) |
|
elif itchat_msg["Type"] == SHARING: |
|
self.ctype = ContextType.SHARING |
|
self.content = itchat_msg.get("Url") |
|
|
|
else: |
|
raise NotImplementedError("Unsupported message type: Type:{} MsgType:{}".format(itchat_msg["Type"], itchat_msg["MsgType"])) |
|
|
|
self.from_user_id = itchat_msg["FromUserName"] |
|
self.to_user_id = itchat_msg["ToUserName"] |
|
|
|
user_id = itchat.instance.storageClass.userName |
|
nickname = itchat.instance.storageClass.nickName |
|
|
|
|
|
|
|
if self.from_user_id == user_id: |
|
self.from_user_nickname = nickname |
|
if self.to_user_id == user_id: |
|
self.to_user_nickname = nickname |
|
try: |
|
|
|
self.my_msg = itchat_msg["ToUserName"] == itchat_msg["User"]["UserName"] and \ |
|
itchat_msg["ToUserName"] != itchat_msg["FromUserName"] |
|
self.other_user_id = itchat_msg["User"]["UserName"] |
|
self.other_user_nickname = itchat_msg["User"]["NickName"] |
|
if self.other_user_id == self.from_user_id: |
|
self.from_user_nickname = self.other_user_nickname |
|
if self.other_user_id == self.to_user_id: |
|
self.to_user_nickname = self.other_user_nickname |
|
if itchat_msg["User"].get("Self"): |
|
|
|
self.self_display_name = itchat_msg["User"].get("Self").get("DisplayName") |
|
except KeyError as e: |
|
logger.warn("[WX]get other_user_id failed: " + str(e)) |
|
if self.from_user_id == user_id: |
|
self.other_user_id = self.to_user_id |
|
else: |
|
self.other_user_id = self.from_user_id |
|
|
|
if self.is_group: |
|
self.is_at = itchat_msg["IsAt"] |
|
self.actual_user_id = itchat_msg["ActualUserName"] |
|
if self.ctype not in [ContextType.JOIN_GROUP, ContextType.PATPAT, ContextType.EXIT_GROUP]: |
|
self.actual_user_nickname = itchat_msg["ActualNickName"] |
|
|