page / bt-source /panel /mod /base /push_mod /mod_node_push.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
a757bd3 verified
import json
import os
import time
from typing import Tuple, Union, Optional, List
from .mods import PUSH_DATA_PATH
from .send_tool import WxAccountMsg
from .base_task import BaseTask, BaseTaskViewMsg
from .util import read_file, DB, write_file
class NodeHttpLoadTask(BaseTask):
def __init__(self):
super().__init__()
self.source_name = "nodes_nginx_http_load_push"
self.template_name = "节点管理-负载均衡"
self._tip_counter = None
@property
def tip_counter(self) -> dict:
if self._tip_counter is not None:
return self._tip_counter
tip_counter = '{}/node_load_balance_push.json'.format(PUSH_DATA_PATH)
if os.path.exists(tip_counter):
try:
self._tip_counter = json.loads(read_file(tip_counter))
except json.JSONDecodeError:
self._tip_counter = {}
else:
self._tip_counter = {}
return self._tip_counter
@staticmethod
def _get_load_name(load_id: int):
data = DB("load_sites").where("load_id = ? and site_type=?", (load_id, "http")).find()
if isinstance(data, dict):
return data["ps"]
return ""
@staticmethod
def _get_all_load() -> List[dict]:
data = DB("load_sites").where("site_type=?", ("http",)).field("load_id,ps").select()
if isinstance(data, list):
return data
return []
def save_tip_counter(self):
tip_counter = '{}/node_load_balance_push.json'.format(PUSH_DATA_PATH)
write_file(tip_counter, json.dumps(self.tip_counter))
def get_title(self, task_data: dict) -> str:
if task_data["project"] == "all":
return "节点管理-负载节点异常告警"
return "节点管理-负载[{}]异常告警".format(self._get_load_name(task_data["project"]))
def check_task_data(self, task_data: dict) -> Union[dict, str]:
all_loads = self._get_all_load()
all_load_id = [i["load_id"] for i in all_loads]
if not bool(all_load_id):
return '没有负载均衡配置,无法设置告警'
if task_data["project"] not in all_load_id:
return '没有该负载均衡配置,无法设置告警'
codes = []
for i in task_data["codes"].split(","):
if bool(i) and i.isdecimal():
code = int(i)
if 100 <= code < 600:
codes.append(str(code))
if not bool(codes):
return '没有指定任何错误码,无法设置告警'
task_data["codes"] = ",".join(codes)
task_data["interval"] = 120
return task_data
def get_keyword(self, task_data: dict) -> str:
return task_data["project"]
def _check_func(self, load_id: int, codes: str, interval: int) -> list:
from mod.project.node.loadutil.load_check import RequestChecker
from mod.project.node.dbutil import HttpNode, NodeDB
access_codes = [int(i) for i in codes.split(",") if bool(i.strip())]
nodes, err = NodeDB().get_nodes(load_id, "http")
if not nodes:
return []
res_list = []
for node in nodes:
# 检查每个节点,返回有问题的节点信息
node, _ = HttpNode.bind(node)
if not node:
continue
node_name = node.node_site_name + ":" + str(node.port)
if not RequestChecker.check_http_node(node, access_codes):
if node_name in self.tip_counter:
self.tip_counter[node_name].append(int(time.time()))
self.tip_counter[node_name] = list(filter(
lambda i: interval * 4 + i >= time.time(),
self.tip_counter[node_name]
))
# 如果一个节点连续三次出现在告警列表中,则视为需要告警
if len(self.tip_counter[node_name]) >= 3:
res_list.append(node_name)
self.tip_counter[node_name] = []
else:
self.tip_counter[node_name] = [int(time.time()), ]
self.save_tip_counter()
return res_list
def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]:
err_nodes = self._check_func(task_data["project"], task_data["codes"], task_data.get("interval", 180))
if not err_nodes:
return None
pj = "负载均衡:【{}】".format(self._get_load_name(task_data["project"]))
nodes = '、'.join(err_nodes)
self.title = self.get_title(task_data)
return {
"msg_list": [
">通知类型:节点管理-负载均衡告警",
">告警内容:<font color=#ff0000>{}配置下的节点【{}】出现访问错误,请及时关注节点情况并处理。</font> ".format(
pj, nodes),
],
"pj": pj,
"nodes": nodes
}
def filter_template(self, template: dict) -> Optional[dict]:
all_loads = self._get_all_load()
if not all_loads:
return None
for item in all_loads:
template["field"][0]["items"].append({
"title": item["ps"],
"value": item["load_id"]
})
template["field"][0]["default"] = all_loads[0]["load_id"]
return template
def to_sms_msg(self, push_data: dict, push_public_data: dict) -> Tuple[str, dict]:
return '', {}
def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg:
msg = WxAccountMsg.new_msg()
msg.thing_type = "节点管理-负载均衡告警"
msg.msg = "节点管理-负载均衡出现节点异常,请登录面板查看"
return msg
class NodeTcpLoadTask(NodeHttpLoadTask):
def __init__(self):
super().__init__()
self.source_name = "nodes_nginx_tcp_load_push"
self.template_name = "节点管理-tcp负载均衡"
self._tip_counter = None
@property
def tip_counter(self) -> dict:
if self._tip_counter is not None:
return self._tip_counter
tip_counter = '{}/node_tcp_load_balance_push.json'.format(PUSH_DATA_PATH)
if os.path.exists(tip_counter):
try:
self._tip_counter = json.loads(read_file(tip_counter))
except json.JSONDecodeError:
self._tip_counter = {}
else:
self._tip_counter = {}
return self._tip_counter
@staticmethod
def _get_load_name(load_id: int):
data = DB("load_sites").where("load_id = ? and site_type=?", (load_id, "tcp")).find()
if isinstance(data, dict):
return data["ps"]
return ""
@staticmethod
def _get_all_load() -> List[dict]:
data = DB("load_sites").where("site_type=? and tcp_config like ?", ("tcp", "%\"tcp\"%")).field(
"load_id,ps").select()
if isinstance(data, list):
return data
return []
def save_tip_counter(self):
tip_counter = '{}/node_tcp_load_balance_push.json'.format(PUSH_DATA_PATH)
write_file(tip_counter, json.dumps(self.tip_counter))
def check_task_data(self, task_data: dict) -> Union[dict, str]:
all_loads = self._get_all_load()
all_load_id = [i["load_id"] for i in all_loads]
if not bool(all_load_id):
return '没有负载均衡配置,无法设置告警'
if task_data["project"] not in all_load_id:
return '没有该负载均衡配置,无法设置告警'
task_data["err_num"] = max(1, task_data.get("err_num", 1))
task_data["interval"] = 120
return task_data
def _check_func(self, load_id: int, err_num: int, interval: int) -> list:
from mod.project.node.loadutil.load_check import RequestChecker
from mod.project.node.dbutil import TcpNode, NodeDB
nodes, err = NodeDB().get_nodes(load_id, "tcp")
if not nodes:
return []
res_list = []
for node in nodes:
# 检查每个节点,返回有问题的节点信息
node, _ = TcpNode.bind(node)
if not node:
continue
node_name = node.host + ":" + str(node.port)
if not RequestChecker.check_tcp_node(node):
if node_name in self.tip_counter:
self.tip_counter[node_name].append(int(time.time()))
self.tip_counter[node_name] = list(filter(
lambda i: interval * (err_num + 1) + i >= time.time(),
self.tip_counter[node_name]
))
# 如果一个节点连续多次次出现在告警列表中,则视为需要告警
if len(self.tip_counter[node_name]) >= err_num:
res_list.append(node_name)
self.tip_counter[node_name] = []
else:
self.tip_counter[node_name] = [int(time.time()), ]
self.save_tip_counter()
return res_list
def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]:
err_nodes = self._check_func(task_data["project"], task_data["err_num"], task_data.get("interval", 180))
if not err_nodes:
return None
pj = "tcp负载均衡:【{}】".format(self._get_load_name(task_data["project"]))
nodes = '、'.join(err_nodes)
self.title = self.get_title(task_data)
return {
"msg_list": [
">通知类型:节点管理-tcp负载均衡告警",
">告警内容:<font color=#ff0000>{}配置下的节点【{}】出现访问错误,请及时关注节点情况并处理。</font> ".format(
pj, nodes),
],
"pj": pj,
"nodes": nodes
}
def to_sms_msg(self, push_data: dict, push_public_data: dict) -> Tuple[str, dict]:
return '', {}
def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg:
msg = WxAccountMsg.new_msg()
msg.thing_type = "节点管理-tcp负载均衡告警"
msg.msg = "节点管理-tcp负载均衡出现节点异常"
return msg
class NodeMysqlSlave(BaseTask):
def __init__(self):
super().__init__()
self.source_name = "nodes_mysql_slave_err_push"
self.template_name = "节点管理-Mysql主从异常告警"
self._tip_counter = None
def get_title(self, task_data: dict) -> str:
return "节点管理-Mysql主从异常告警"
def check_task_data(self, task_data: dict) -> Union[dict, str]:
task_data["interval"] = 120
return task_data
def get_keyword(self, task_data: dict) -> str:
return "nodes_mysql_slave_err_push"
def _check_func(self) -> list:
from mod.project.node.mysql_slaveMod import main as mysql_slave
return mysql_slave().get_slave_error_info(get=None)
def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]:
err_nodes = self._check_func()
if not err_nodes:
return None
self.title = self.get_title(task_data)
err_list = []
wx_node_err = "{}主从中断,中断时间{}".format(err_nodes[0]["slave_ip"], err_nodes[0]["error_time"])
for node in err_nodes:
err_list.append("<font color=#ff0000>【{}】主从中断,中断时间【{}】;</font>".format(node["slave_ip"], node["error_time"]))
err_list.append("详情请前往节点管理-主从同步查看具体报错信息")
return {
"msg_list": [
">通知类型:节点管理-Mysql主从告警",
">告警内容:",
*err_list
],
"node": wx_node_err
}
def filter_template(self, template: dict) -> Optional[dict]:
return template
def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg:
msg = WxAccountMsg.new_msg()
msg.thing_type = "节点管理-Mysql主从告警"
msg.msg = "节点管理-Mysql主从出现节点异常,请登录面板查看"
return msg
class NodeRedisSlave(BaseTask):
def __init__(self):
super().__init__()
self.source_name = "nodes_redis_slave_err_push"
self.template_name = "节点管理-Redis主从异常告警"
self._tip_counter = None
def get_title(self, task_data: dict) -> str:
return "节点管理-Redis主从异常告警"
def check_task_data(self, task_data: dict) -> Union[dict, str]:
task_data["interval"] = 120
return task_data
def get_keyword(self, task_data: dict) -> str:
return "nodes_redis_slave_err_push"
def _check_func(self) -> list:
from mod.project.node.redis_slaveMod import main as redis_slave
ret = redis_slave().get_alert_info(get=None)
if "status" in ret and ret["status"] and "data" in ret and isinstance(ret["data"], list):
return ret["data"]
return []
def get_push_data(self, task_id: str, task_data: dict) -> Optional[dict]:
err_nodes = self._check_func()
if len(err_nodes) == 0:
return None
self.title = self.get_title(task_data)
err_list = []
wx_node_err = "{}主从中断,中断时间{}".format(err_nodes[0]["server_ip"], err_nodes[0]["timestamp"])
for node in err_nodes:
err_list.append("<font color=#ff0000>Redis主从服务器【{}】,异常:{},时间:{};</font>".format(
node["server_ip"], node["message"], node["timestamp"]))
err_list.append("详情请前往节点管理-主从同步查看具体报错信息")
return {
"msg_list": [
">通知类型:节点管理-Redis主从异常告警",
">告警内容:",
*err_list
],
"node": wx_node_err
}
def filter_template(self, template: dict) -> Optional[dict]:
return template
def to_wx_account_msg(self, push_data: dict, push_public_data: dict) -> WxAccountMsg:
msg = WxAccountMsg.new_msg()
msg.thing_type = "节点管理-Redis主从异常告警"
msg.msg = "节点管理-Redis主从异常告警,请登录面板查看" if not push_data.get("node", None) else push_data["node"]
return msg
class ViewMsgFormat(BaseTaskViewMsg):
def get_msg(self, task: dict) -> Optional[str]:
if task["template_id"] == "220":
return "<span>负载节点测试路径访问状态码不为{}时,推送告警信息(每日推送{}次后不再推送)<span>".format(
task.get("task_data", {}).get("codes", "").replace(",", "|"),
task.get("number_rule", {}).get("day_num"))
if task["template_id"] == "221":
return "<span>负载节点连接异常时,推送告警信息(每日推送{}次后不再推送)<span>".format(
task.get("number_rule", {}).get("day_num"))
if task["template_id"] == "222":
return "<span>任意Mysql主从配置异常时,推送告警信息(每日推送{}次后不再推送)<span>".format(
task.get("number_rule", {}).get("day_num"))
if task["template_id"] == "223":
return "<span>任意Redis主从配置异常时,推送告警信息(每日推送{}次后不再推送)<span>".format(
task.get("number_rule", {}).get("day_num"))
return None
NodeMysqlSlave.VIEW_MSG = NodeHttpLoadTask.VIEW_MSG = NodeTcpLoadTask.VIEW_MSG = ViewMsgFormat