mpsk's picture
Create bot.py
511c1dc
raw
history blame
3.48 kB
import requests
import json
import socket
from loguru import logger
from enum import Enum
class MSGTYPE(Enum):
INCIDENT = 1
ERROR = 2
REPORT = 3
INFO = 4
SUCCESS = 0
icons = {
MSGTYPE.SUCCESS: "https://img.icons8.com/ios-glyphs/30/000000/rocket.png",
MSGTYPE.INCIDENT: "https://img.icons8.com/ios-filled/30/car-accident.png",
MSGTYPE.ERROR: "https://img.icons8.com/ios-glyphs/30/000000/error--v2.png",
MSGTYPE.REPORT: "https://img.icons8.com/ios-glyphs/90/000000/checkmark--v2.png",
MSGTYPE.INFO: "https://img.icons8.com/ios-glyphs/30/000000/info--v2.png",
}
class Message:
host_name = socket.gethostname()
host_ip = socket.gethostbyname(socket.gethostname())
type = MSGTYPE.INCIDENT
type_hint = ""
content = ""
class dummyRequest:
def __init__(self) -> None:
self.status_code = 200
class Bot:
def __init__(
self,
app_name="",
bot_key="",
enabled=True,
):
self.Bot_URL = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={bot_key}"
self.template = {
"msgtype": "template_card",
"template_card": {
"card_type": "text_notice",
"source": {
"icon_url": "",
"desc": app_name,
"desc_color": 0
},
"main_title": {
"title": "",
"desc": ""
},
"emphasis_content": {
"title": "",
"desc": ""
},
"horizontal_content_list": [
],
"card_action": {
"type": 1,
"url": "https://huggingface.co/myscale"
},
}
}
self.app_name = app_name
self.enabled = enabled
def __constuct_msg(self, msg: Message):
_dict = self.template.copy()
_dict["template_card"]["source"]["icon_url"] = icons[msg.type]
_dict["template_card"]["main_title"]["title"] = msg.type.name[:13]
_dict["template_card"]["main_title"]["desc"] = msg.type_hint[:15]
_dict["template_card"]["horizontal_content_list"].extend(
self.__convert_dict2clist(msg)[:6])
_dict["template_card"]["sub_title_text"] = msg.content
return _dict
def __convert_dict2clist(self, msg: Message):
return [
{"keyname": "App Name", "value": self.app_name},
{"keyname": "Host Name", "value": msg.host_name},
{"keyname": "Host IP", "value": str(msg.host_ip)},
]
def __send_md_msg(self, msg: Message):
msg = json.dumps(self.__constuct_msg(msg), ensure_ascii=True)
return requests.post(
self.Bot_URL, data=msg, headers={
"Content-Type": "application/json"}
)
def error(self, msg: Message):
msg.type = MSGTYPE.ERROR
if self.enabled:
return self.__send_md_msg(msg)
else:
return dummyRequest()
def incident(self, msg: Message):
msg.type = MSGTYPE.INCIDENT
if self.enabled:
return self.__send_md_msg(msg)
else:
return dummyRequest()
def report(self, msg: Message):
msg.type = MSGTYPE.REPORT
if self.enabled:
return self.__send_md_msg(msg)
else:
return dummyRequest()