|
import logging, copy, pickle |
|
from weakref import ref |
|
|
|
from ..returnvalues import ReturnValue |
|
from ..utils import update_info_dict |
|
|
|
logger = logging.getLogger('itchat') |
|
|
|
class AttributeDict(dict): |
|
def __getattr__(self, value): |
|
keyName = value[0].upper() + value[1:] |
|
try: |
|
return self[keyName] |
|
except KeyError: |
|
raise AttributeError("'%s' object has no attribute '%s'" % ( |
|
self.__class__.__name__.split('.')[-1], keyName)) |
|
def get(self, v, d=None): |
|
try: |
|
return self[v] |
|
except KeyError: |
|
return d |
|
|
|
class UnInitializedItchat(object): |
|
def _raise_error(self, *args, **kwargs): |
|
logger.warning('An itchat instance is called before initialized') |
|
def __getattr__(self, value): |
|
return self._raise_error |
|
|
|
class ContactList(list): |
|
''' when a dict is append, init function will be called to format that dict ''' |
|
def __init__(self, *args, **kwargs): |
|
super(ContactList, self).__init__(*args, **kwargs) |
|
self.__setstate__(None) |
|
@property |
|
def core(self): |
|
return getattr(self, '_core', lambda: fakeItchat)() or fakeItchat |
|
@core.setter |
|
def core(self, value): |
|
self._core = ref(value) |
|
def set_default_value(self, initFunction=None, contactClass=None): |
|
if hasattr(initFunction, '__call__'): |
|
self.contactInitFn = initFunction |
|
if hasattr(contactClass, '__call__'): |
|
self.contactClass = contactClass |
|
def append(self, value): |
|
contact = self.contactClass(value) |
|
contact.core = self.core |
|
if self.contactInitFn is not None: |
|
contact = self.contactInitFn(self, contact) or contact |
|
super(ContactList, self).append(contact) |
|
def __deepcopy__(self, memo): |
|
r = self.__class__([copy.deepcopy(v) for v in self]) |
|
r.contactInitFn = self.contactInitFn |
|
r.contactClass = self.contactClass |
|
r.core = self.core |
|
return r |
|
def __getstate__(self): |
|
return 1 |
|
def __setstate__(self, state): |
|
self.contactInitFn = None |
|
self.contactClass = User |
|
def __str__(self): |
|
return '[%s]' % ', '.join([repr(v) for v in self]) |
|
def __repr__(self): |
|
return '<%s: %s>' % (self.__class__.__name__.split('.')[-1], |
|
self.__str__()) |
|
|
|
class AbstractUserDict(AttributeDict): |
|
def __init__(self, *args, **kwargs): |
|
super(AbstractUserDict, self).__init__(*args, **kwargs) |
|
@property |
|
def core(self): |
|
return getattr(self, '_core', lambda: fakeItchat)() or fakeItchat |
|
@core.setter |
|
def core(self, value): |
|
self._core = ref(value) |
|
def update(self): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not be updated' % \ |
|
self.__class__.__name__, }, }) |
|
def set_alias(self, alias): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not set alias' % \ |
|
self.__class__.__name__, }, }) |
|
def set_pinned(self, isPinned=True): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not be pinned' % \ |
|
self.__class__.__name__, }, }) |
|
def verify(self): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s do not need verify' % \ |
|
self.__class__.__name__, }, }) |
|
def get_head_image(self, imageDir=None): |
|
return self.core.get_head_img(self.userName, picDir=imageDir) |
|
def delete_member(self, userName): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not delete member' % \ |
|
self.__class__.__name__, }, }) |
|
def add_member(self, userName): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not add member' % \ |
|
self.__class__.__name__, }, }) |
|
def send_raw_msg(self, msgType, content): |
|
return self.core.send_raw_msg(msgType, content, self.userName) |
|
def send_msg(self, msg='Test Message'): |
|
return self.core.send_msg(msg, self.userName) |
|
def send_file(self, fileDir, mediaId=None): |
|
return self.core.send_file(fileDir, self.userName, mediaId) |
|
def send_image(self, fileDir, mediaId=None): |
|
return self.core.send_image(fileDir, self.userName, mediaId) |
|
def send_video(self, fileDir=None, mediaId=None): |
|
return self.core.send_video(fileDir, self.userName, mediaId) |
|
def send(self, msg, mediaId=None): |
|
return self.core.send(msg, self.userName, mediaId) |
|
def search_member(self, name=None, userName=None, remarkName=None, nickName=None, |
|
wechatAccount=None): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s do not have members' % \ |
|
self.__class__.__name__, }, }) |
|
def __deepcopy__(self, memo): |
|
r = self.__class__() |
|
for k, v in self.items(): |
|
r[copy.deepcopy(k)] = copy.deepcopy(v) |
|
r.core = self.core |
|
return r |
|
def __str__(self): |
|
return '{%s}' % ', '.join( |
|
['%s: %s' % (repr(k),repr(v)) for k,v in self.items()]) |
|
def __repr__(self): |
|
return '<%s: %s>' % (self.__class__.__name__.split('.')[-1], |
|
self.__str__()) |
|
def __getstate__(self): |
|
return 1 |
|
def __setstate__(self, state): |
|
pass |
|
|
|
class User(AbstractUserDict): |
|
def __init__(self, *args, **kwargs): |
|
super(User, self).__init__(*args, **kwargs) |
|
self.__setstate__(None) |
|
def update(self): |
|
r = self.core.update_friend(self.userName) |
|
if r: |
|
update_info_dict(self, r) |
|
return r |
|
def set_alias(self, alias): |
|
return self.core.set_alias(self.userName, alias) |
|
def set_pinned(self, isPinned=True): |
|
return self.core.set_pinned(self.userName, isPinned) |
|
def verify(self): |
|
return self.core.add_friend(**self.verifyDict) |
|
def __deepcopy__(self, memo): |
|
r = super(User, self).__deepcopy__(memo) |
|
r.verifyDict = copy.deepcopy(self.verifyDict) |
|
return r |
|
def __setstate__(self, state): |
|
super(User, self).__setstate__(state) |
|
self.verifyDict = {} |
|
self['MemberList'] = fakeContactList |
|
|
|
class MassivePlatform(AbstractUserDict): |
|
def __init__(self, *args, **kwargs): |
|
super(MassivePlatform, self).__init__(*args, **kwargs) |
|
self.__setstate__(None) |
|
def __setstate__(self, state): |
|
super(MassivePlatform, self).__setstate__(state) |
|
self['MemberList'] = fakeContactList |
|
|
|
class Chatroom(AbstractUserDict): |
|
def __init__(self, *args, **kwargs): |
|
super(Chatroom, self).__init__(*args, **kwargs) |
|
memberList = ContactList() |
|
userName = self.get('UserName', '') |
|
refSelf = ref(self) |
|
def init_fn(parentList, d): |
|
d.chatroom = refSelf() or \ |
|
parentList.core.search_chatrooms(userName=userName) |
|
memberList.set_default_value(init_fn, ChatroomMember) |
|
if 'MemberList' in self: |
|
for member in self.memberList: |
|
memberList.append(member) |
|
self['MemberList'] = memberList |
|
@property |
|
def core(self): |
|
return getattr(self, '_core', lambda: fakeItchat)() or fakeItchat |
|
@core.setter |
|
def core(self, value): |
|
self._core = ref(value) |
|
self.memberList.core = value |
|
for member in self.memberList: |
|
member.core = value |
|
def update(self, detailedMember=False): |
|
r = self.core.update_chatroom(self.userName, detailedMember) |
|
if r: |
|
update_info_dict(self, r) |
|
self['MemberList'] = r['MemberList'] |
|
return r |
|
def set_alias(self, alias): |
|
return self.core.set_chatroom_name(self.userName, alias) |
|
def set_pinned(self, isPinned=True): |
|
return self.core.set_pinned(self.userName, isPinned) |
|
def delete_member(self, userName): |
|
return self.core.delete_member_from_chatroom(self.userName, userName) |
|
def add_member(self, userName): |
|
return self.core.add_member_into_chatroom(self.userName, userName) |
|
def search_member(self, name=None, userName=None, remarkName=None, nickName=None, |
|
wechatAccount=None): |
|
with self.core.storageClass.updateLock: |
|
if (name or userName or remarkName or nickName or wechatAccount) is None: |
|
return None |
|
elif userName: |
|
for m in self.memberList: |
|
if m.userName == userName: |
|
return copy.deepcopy(m) |
|
else: |
|
matchDict = { |
|
'RemarkName' : remarkName, |
|
'NickName' : nickName, |
|
'Alias' : wechatAccount, } |
|
for k in ('RemarkName', 'NickName', 'Alias'): |
|
if matchDict[k] is None: |
|
del matchDict[k] |
|
if name: |
|
contact = [] |
|
for m in self.memberList: |
|
if any([m.get(k) == name for k in ('RemarkName', 'NickName', 'Alias')]): |
|
contact.append(m) |
|
else: |
|
contact = self.memberList[:] |
|
if matchDict: |
|
friendList = [] |
|
for m in contact: |
|
if all([m.get(k) == v for k, v in matchDict.items()]): |
|
friendList.append(m) |
|
return copy.deepcopy(friendList) |
|
else: |
|
return copy.deepcopy(contact) |
|
def __setstate__(self, state): |
|
super(Chatroom, self).__setstate__(state) |
|
if not 'MemberList' in self: |
|
self['MemberList'] = fakeContactList |
|
|
|
class ChatroomMember(AbstractUserDict): |
|
def __init__(self, *args, **kwargs): |
|
super(AbstractUserDict, self).__init__(*args, **kwargs) |
|
self.__setstate__(None) |
|
@property |
|
def chatroom(self): |
|
r = getattr(self, '_chatroom', lambda: fakeChatroom)() |
|
if r is None: |
|
userName = getattr(self, '_chatroomUserName', '') |
|
r = self.core.search_chatrooms(userName=userName) |
|
if isinstance(r, dict): |
|
self.chatroom = r |
|
return r or fakeChatroom |
|
@chatroom.setter |
|
def chatroom(self, value): |
|
if isinstance(value, dict) and 'UserName' in value: |
|
self._chatroom = ref(value) |
|
self._chatroomUserName = value['UserName'] |
|
def get_head_image(self, imageDir=None): |
|
return self.core.get_head_img(self.userName, self.chatroom.userName, picDir=imageDir) |
|
def delete_member(self, userName): |
|
return self.core.delete_member_from_chatroom(self.chatroom.userName, self.userName) |
|
def send_raw_msg(self, msgType, content): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not send message directly' % \ |
|
self.__class__.__name__, }, }) |
|
def send_msg(self, msg='Test Message'): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not send message directly' % \ |
|
self.__class__.__name__, }, }) |
|
def send_file(self, fileDir, mediaId=None): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not send message directly' % \ |
|
self.__class__.__name__, }, }) |
|
def send_image(self, fileDir, mediaId=None): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not send message directly' % \ |
|
self.__class__.__name__, }, }) |
|
def send_video(self, fileDir=None, mediaId=None): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not send message directly' % \ |
|
self.__class__.__name__, }, }) |
|
def send(self, msg, mediaId=None): |
|
return ReturnValue({'BaseResponse': { |
|
'Ret': -1006, |
|
'ErrMsg': '%s can not send message directly' % \ |
|
self.__class__.__name__, }, }) |
|
def __setstate__(self, state): |
|
super(ChatroomMember, self).__setstate__(state) |
|
self['MemberList'] = fakeContactList |
|
|
|
def wrap_user_dict(d): |
|
userName = d.get('UserName') |
|
if '@@' in userName: |
|
r = Chatroom(d) |
|
elif d.get('VerifyFlag', 8) & 8 == 0: |
|
r = User(d) |
|
else: |
|
r = MassivePlatform(d) |
|
return r |
|
|
|
fakeItchat = UnInitializedItchat() |
|
fakeContactList = ContactList() |
|
fakeChatroom = Chatroom() |
|
|