| import json |
| import os |
| import re |
| from typing import List |
|
|
| import simple_websocket |
| from mod.base import json_response |
| from mod.project.node.dbutil import LoadSite, HttpNode, NodeDB, TcpNode, ServerNodeDB |
| from mod.project.node.loadutil import load_check, config_generator |
| from mod.project.node.loadutil.nginx_utils import NginxUtils |
| from mod.project.node.loadutil.log_analyze import get_log_analyze, get_log_file |
| from mod.project.node.nodeutil import ServerNode |
|
|
| import public |
|
|
|
|
| class main(): |
| def __init__(self): |
| pass |
|
|
| def create_http_load(self, get): |
| """ |
| @route /ws_modsoc/node/load/create_http_load |
| @param name str |
| @param site_name str |
| @param ps str |
| @param http_config.proxy_next_upstream str |
| @param http_config.http_alg str |
| @param nodes.[].node_site str |
| @param nodes.[].port int |
| @param nodes.[].path str |
| @param nodes.[].node_status str |
| @param nodes.[].weight int |
| @param nodes.[].node_id int |
| @param nodes.[].max_conns int |
| @param nodes.[].max_fail int |
| @param nodes.[].fail_timeout int |
| @param nodes.[].ps str |
| @return ws { |
| "type": str, |
| "data": str |
| } |
| """ |
| ws: simple_websocket.Server = getattr(get, '_ws', None) |
| if not ws: |
| return json_response(False, "请使用 WebSocket 连接") |
| if not NginxUtils.nginx_exists(): |
| ws.send(json.dumps({"type": "error", "data": "该功能依赖nginx,请先安装nginx服务"})) |
| return |
| name = get.get("name", "") |
| if not name: |
| name = public.GetRandomString(8) |
| get.name = name |
| public.set_module_logs("nodes_create_http_load_9", "create_http_load") |
| load, err = LoadSite.bind_http_load(get) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
|
|
| nodes: List[HttpNode] = [] |
| for node_data in get.get("nodes", []): |
| node, err = HttpNode.bind(node_data) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析节点%s参数错误:%s" % (node_data.get("node_site_name"), err)})) |
| return |
| nodes.append(node) |
|
|
| if len(nodes) == 0: |
| ws.send(json.dumps({"type": "error", "data": "节点不能为空"})) |
| return |
| node_db = NodeDB() |
| if node_db.name_exist(load.name): |
| ws.send(json.dumps({"type": "error", "data": "名称已存在"})) |
| return |
| if node_db.load_site_name_exist(load.site_name): |
| ws.send(json.dumps({"type": "error", "data": "站点已存在"})) |
| return |
| log_call = lambda x: ws.send(json.dumps({"type": "log", "data": x})) |
| log_call("开始检查负载配置...") |
| err = load_check.check_http_load_data(load, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
|
|
| for n in nodes: |
| err = load_check.check_http_node(n, load.site_name, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
|
|
| log_call("开始生成配置文件...") |
| cgr = config_generator.NginxConfigGenerator() |
| err = cgr.save_configs(load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "生成配置文件失败:%s" % err})) |
| return |
|
|
| err = node_db.create_load("http", load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "保存数据失败:%s" % err})) |
| return |
| ws.send(json.dumps({"type": "end", "data": "数据保存成功"})) |
| |
| return |
|
|
| def modify_http_load(self, get): |
| """ |
| @route /ws_modsoc/node/load/modify_http_load |
| @param name str |
| @param load_id int |
| @param site_name str |
| @param ps str |
| @param http_config.proxy_next_upstream str |
| @param http_config.http_alg str |
| @param nodes.[].node_site str |
| @param nodes.[].node_site_id int |
| @param nodes.[].id int |
| @param nodes.[].port int |
| @param nodes.[].path str |
| @param nodes.[].node_status str |
| @param nodes.[].weight int |
| @param nodes.[].node_id int |
| @param nodes.[].max_conns int |
| @param nodes.[].max_fail int |
| @param nodes.[].fail_timeout int |
| @param nodes.[].ps str |
| @return ws { |
| "type": str, |
| "data": str |
| } |
| """ |
|
|
| ws: simple_websocket.Server = getattr(get, '_ws', None) |
| if not ws: |
| return json_response(False, "请使用 WebSocket 连接") |
|
|
| if get.get("load_id/d", 0) < 0: |
| ws.send(json.dumps({"type": "error", "data": "负载ID不能为空"})) |
| return |
|
|
| load, err = LoadSite.bind_http_load(get) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
|
|
| if len(get.get("nodes", [])) <= 0: |
| ws.send(json.dumps({"type": "error", "data": "负载ID不能为空"})) |
| return |
|
|
| nodes: List[HttpNode] = [] |
| for node_data in get.get("nodes", []): |
| node, err = HttpNode.bind(node_data) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
| nodes.append(node) |
|
|
| if not nodes: |
| ws.send(json.dumps({"type": "error", "data": "节点不能为空"})) |
| return |
|
|
| node_db = NodeDB() |
| if not node_db.load_id_exist(load.load_id): |
| ws.send(json.dumps({"type": "error", "data": "未找到该负载配置"})) |
| return |
|
|
| log_call = lambda x: ws.send(json.dumps({"type": "log", "data": x})) |
| err = load_check.check_http_load_data(load, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
|
|
| for n in nodes: |
| err = load_check.check_http_node(n, load.site_name, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
|
|
| log_call("生成配置文件...") |
| cgr = config_generator.NginxConfigGenerator() |
| err = cgr.save_configs(load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "生成配置文件失败:%s" % err})) |
| return |
|
|
| err = node_db.update_load("http", load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "保存数据失败:%s" % err})) |
| return |
| ws.send(json.dumps({"type": "success", "data": "数据保存成功"})) |
| return json_response(True, "负载配置创建成功") |
|
|
| def check_http_node(self, get): |
| """ |
| @route /ws_modsoc/node/load/check_http_node |
| @param node_site str |
| @param node_site_id int |
| @param port int |
| @param path str |
| @param node_status str |
| @param weight int |
| @param max_conns int |
| @param max_fail int |
| @param fail_timeout int |
| @param ps str |
| @return ws { |
| "type": str, |
| "data": str |
| } |
| """ |
| ws: simple_websocket.Server = getattr(get, '_ws', None) |
| if not ws: |
| return json_response(False, "请使用 WebSocket 连接") |
| node, err = HttpNode.bind(get) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
|
|
| log_call = lambda x: ws.send(json.dumps({"type": "log", "data": x})) |
| err = load_check.check_http_node(node, "", log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
| ws.send(json.dumps({"type": "end", "data": "测试连接成功"})) |
| return json_response(True, "测试连接成功") |
|
|
| def remove_http_load(self, get): |
| """ |
| @route /mod/node/load/delete_http_load |
| @param load_id int |
| @param { |
| "type": str, |
| "data": str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| if load_id <= 0: |
| return json_response(False, "负载ID不能为空") |
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
| load["http_config"] = json.loads(load["http_config"]) |
| load["tcp_config"] = json.loads(load["tcp_config"]) |
| load, err = LoadSite.bind_http_load(load) |
| if err: |
| return json_response(False, "解析参数错误:%s" % err) |
| err = node_db.delete(load_id) |
| if err: |
| return json_response(False, "删除失败:%s" % err) |
| cgr = config_generator.NginxConfigGenerator() |
|
|
| cgr.delete_node_conf(load) |
|
|
| return json_response(True, "删除成功") |
|
|
| def multi_remove_http_load(self, get): |
| load_ids = get.get("load_ids/s", '') |
| try: |
| load_ids = json.loads(load_ids) |
| load_ids = [int(i) for i in load_ids] |
| except: |
| return json_response(False, "参数格式错误") |
| node_db = NodeDB() |
| for load_id in load_ids: |
| if load_id <= 0: |
| continue |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
| load["http_config"] = json.loads(load["http_config"]) |
| load["tcp_config"] = json.loads(load["tcp_config"]) |
| load, err = LoadSite.bind_http_load(load) |
| if err: |
| return json_response(False, "解析参数错误:%s" % err) |
| err = node_db.delete(load_id) |
| if err: |
| return json_response(False, "删除失败:%s" % err) |
| cgr = config_generator.NginxConfigGenerator() |
|
|
| cgr.delete_node_conf(load, mutil=True) |
|
|
| NginxUtils.reload_nginx() |
| return json_response(True, "删除成功") |
|
|
| def http_load_list(self, get): |
| """ |
| @route /mod/node/load/http_load_list |
| @param page int |
| @param page_size int |
| @return { |
| "page": str, |
| "data": [ |
| { |
| "load_id": int, |
| "name": str, |
| "site_id": int, |
| "site_name": str, |
| "ps": str, |
| "http_config": { |
| "proxy_next_upstream": str, |
| "http_alg": str |
| }, |
| "created_at": int, |
| "request": int, |
| "error": int, |
| "qps": int, |
| "upstream_time": int, |
| "last_request_time": str, |
| "nodes": [ |
| { |
| "id": int, |
| "request": int, |
| "error": int, |
| "qps": int, |
| "upstream_time": int, |
| "last_request": str, |
| "load_id": int, |
| "node_site": str, |
| "node_site_id": int, |
| "port": int, |
| "path": str, |
| "node_status": str, |
| "weight": int, |
| "max_fail": int, |
| "node_id": int, |
| "max_conns": int, |
| "ps": str, |
| "created_at": int, |
| "fail_timeout": int |
| } |
| ] |
| } |
| ] |
| } |
| """ |
| node_db = NodeDB() |
| srv_db = ServerNodeDB() |
| node_name_map = srv_db.node_map() |
| page = max(get.get("page/d", 1), 1) |
| page_size = max(get.get("page_size/d", 10), 1) |
| search = get.get('search/s', "").strip() |
| count = node_db.loads_count("http", search) |
| loads = node_db.loads_list("http", (page - 1) * page_size, page_size, search) |
| err = "" |
| for load in loads: |
| la = get_log_analyze("http", load["site_name"], interval=60) |
| load["nodes"], tmp_err = node_db.get_nodes(load["load_id"], load["site_type"]) |
| if tmp_err: |
| err = tmp_err |
| continue |
| la.analyze_logs() |
| day_status = la.get_today_stats() |
| load.update(day_status.get("total")) |
| load["http_config"] = json.loads(load["http_config"]) |
| load["tcp_config"] = json.loads(load["tcp_config"]) |
| load["error_codes"] = [int(i[5:]) for i in load["http_config"]["proxy_next_upstream"].split() if i.startswith("http_")] |
| for node in load["nodes"]: |
| node_ip = ServerNode.get_node_ip(node["node_id"]) |
| tmp_status = day_status.get("nodes", {}).get(str(node_ip) + ":" + str(node["port"])) |
| if not tmp_status: |
| tmp_status = {'requests': 0, 'errors': 0, 'max_response_time': 0, |
| 'max_upstream_time': 0, 'last_update': 0, 'qps': 0} |
| node.update(tmp_status) |
| node["node_remarks"] = node_name_map.get(node["node_id"], "-") |
| data = public.get_page(count,page,page_size) |
| data["data"] = loads |
| data["err_info"] = err |
| return data |
|
|
| def log(self, get): |
| """ |
| @route /mod/node/load/log |
| @param load_id int |
| @param date str |
| @return { |
| "time": str, |
| "client_ip": str, |
| "method": str, |
| "node": str, |
| "upstream_time": int, |
| "bytes_sent": int, |
| "body_sent": int, |
| "status": int, |
| "uri": str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| date = get.get("date", "") |
| position = get.get("position/d", -1) |
| limit = get.get("limit/d", 16) |
| if not load_id: |
| return json_response(False, "参数错误") |
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
| la = get_log_analyze("http", load["site_name"], date=date, interval=60) |
| last_position, data_list = la.get_log(position, limit) |
| return json_response(True, "获取成功", data={ |
| "last_position": last_position, |
| "logs": data_list |
| }) |
|
|
| def export_log(self, get): |
| """ |
| @route /mod/node/load/export_log |
| @param load_id int |
| @param date str |
| @return { |
| "status": bool, |
| "filename": str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| date = get.get("date", "") |
| if not load_id: |
| return json_response(False, "参数错误") |
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
|
|
| la = get_log_file("http", load["site_name"], date=date) |
| if not os.path.exists(la): |
| return json_response(False, "未找到该日志文件") |
| return { |
| "status": True, |
| "filename": la |
| } |
|
|
| def set_http_load(self, get): |
| """ |
| @route /mod/node/load/set_http_load |
| @param load_id int |
| @param http_codes []int |
| @return { |
| "status": bool, |
| "msg" str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| http_codes = get.get("http_codes/s", "[]") |
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
| if not http_codes: |
| return json_response(False, "参数错误") |
| try: |
| http_codes = json.loads(http_codes) |
| except: |
| return json_response(False, "参数错误") |
|
|
| code_data = ["error", "timeout"] |
| for i in http_codes: |
| status_code = int(i) |
| if status_code > 600 or status_code < 100: |
| return json_response(False, "状态码错误") |
| code_data.append("http_{}".format(status_code)) |
| load["http_config"] = json.loads(load["http_config"]) |
| load["http_config"]["proxy_next_upstream"] = " ".join(code_data) |
| err = node_db.update_load_key(load_id, { |
| "http_config": json.dumps(load["http_config"]) |
| }) |
| if err: |
| return json_response(False, err) |
|
|
| crg = config_generator.NginxConfigGenerator() |
| err = crg.set_http_proxy_next_upstream(load["site_name"], load["http_config"]["proxy_next_upstream"]) |
| if err: |
| return json_response(False, err) |
| return json_response(True, "设置成功") |
|
|
| def set_http_cache(self, get): |
| """ |
| @route /mod/node/load/set_http_cache |
| @param load_id int |
| @param proxy_cache_status int |
| @param cache_time str |
| @param cache_suffix str |
| @return { |
| "status": bool, |
| "msg" str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| proxy_cache_status = bool(get.get("proxy_cache_status/d", 0)) |
| cache_time = get.get("cache_time/s", "1d") |
| cache_suffix = get.get("cache_suffix/s", "") |
| if not re.match(r"^[0-9]+([smhd])$", cache_time): |
| return json_response(False, "缓存时间格式错误") |
| cache_suffix_list = [] |
| for suffix in cache_suffix.split(","): |
| tmp_suffix = re.sub(r"\s", "", suffix) |
| if tmp_suffix: |
| cache_suffix_list.append(tmp_suffix) |
| real_cache_suffix = ",".join(cache_suffix_list) |
| if not real_cache_suffix: |
| real_cache_suffix = "css,js,jpg,jpeg,gif,png,webp,woff,eot,ttf,svg,ico,css.map,js.map" |
|
|
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
|
|
| load["http_config"] = json.loads(load["http_config"]) |
| load["http_config"]["proxy_cache_status"] = proxy_cache_status |
| load["http_config"]["cache_time"] = cache_time |
| load["http_config"]["cache_suffix"] = real_cache_suffix |
| err = node_db.update_load_key(load_id, { |
| "http_config": json.dumps(load["http_config"]) |
| }) |
| if err: |
| return json_response(False, err) |
|
|
| public.print_log(load) |
| load, err = LoadSite.bind_http_load(load) |
| if err: |
| return json_response(False, err) |
| node_datas, err= node_db.get_nodes(load_id,"http") |
| if err: |
| return json_response(False, err) |
| nodes = [] |
| for node in node_datas: |
| node, _ = HttpNode.bind(node) |
| if node is None: continue |
| nodes.append(node) |
|
|
| crg = config_generator.NginxConfigGenerator() |
| err = crg.set_http_proxy_cache(load.site_name, load, nodes) |
| if err: |
| return json_response(False, err) |
| return json_response(True, "设置成功") |
|
|
| def create_tcp_load(self, get): |
| """ |
| @route /ws_modsoc/node/load/create_tcp_load |
| @param name str |
| @param site_name str |
| @param ps str |
| @param tcp_config.proxy_connect_timeout int |
| @param tcp_config.proxy_timeout int |
| @param tcp_config.host str |
| @param tcp_config.port int |
| @param tcp_config.type str |
| @param nodes.[].host str |
| @param nodes.[].port int |
| @param nodes.[].node_status str |
| @param nodes.[].weight int |
| @param nodes.[].node_id int |
| @param nodes.[].max_fail int |
| @param nodes.[].fail_timeout int |
| @param nodes.[].ps str |
| @return ws { |
| "type": str, |
| "data": str |
| } |
| """ |
| ws: simple_websocket.Server = getattr(get, '_ws', None) |
| if not ws: |
| return json_response(False, "请使用 WebSocket 连接") |
| public.set_module_logs("node_create_tcp_load_9", "create_tcp_load") |
| if not NginxUtils.nginx_exists(): |
| ws.send(json.dumps({"type": "error", "data": "该功能依赖nginx,请先安装nginx服务"})) |
| return |
| name = get.get("name", "") |
| if not name: |
| name = public.GetRandomString(8) |
| get.name = name |
| load, err = LoadSite.bind_tcp_load(get) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
|
|
| nodes: List[TcpNode] = [] |
| for node_data in get.get("nodes", []): |
| node, err = TcpNode.bind(node_data) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
| nodes.append(node) |
|
|
| if len(nodes) == 0: |
| ws.send(json.dumps({"type": "error", "data": "节点不能为空"})) |
| return |
| node_db = NodeDB() |
| if node_db.name_exist(load.name): |
| ws.send(json.dumps({"type": "error", "data": "名称已存在"})) |
| return |
| log_call = lambda x: ws.send(json.dumps({"type": "log", "data": x})) |
| log_call("开始检查负载配置...") |
| err = load_check.check_tcp_load_data(load, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
| for n in nodes: |
| if load.tcp_config["type"] == "tcp": |
| err = load_check.check_tcp_node(n, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
|
|
| log_call("开始生成配置文件...") |
| cgr = config_generator.NginxConfigGenerator() |
| err = cgr.save_configs(load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "生成配置文件失败:%s" % err})) |
| return |
|
|
| err = node_db.create_load("tcp", load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "保存数据失败:%s" % err})) |
| return json_response(False, "保存数据失败:%s" % err) |
| ws.send(json.dumps({"type": "end", "data": "数据保存成功"})) |
| return json_response(True, "负载配置创建成功") |
|
|
| def modify_tcp_load(self, get): |
| """ |
| @route /ws_modsoc/node/load/modify_http_load |
| @param name str |
| @param load_id int |
| @param site_name str |
| @param ps str |
| @param http_config.proxy_next_upstream str |
| @param http_config.http_alg str |
| @param nodes.[].node_site str |
| @param nodes.[].node_site_id int |
| @param nodes.[].id int |
| @param nodes.[].port int |
| @param nodes.[].path str |
| @param nodes.[].node_status str |
| @param nodes.[].weight int |
| @param nodes.[].node_id int |
| @param nodes.[].max_conns int |
| @param nodes.[].max_fail int |
| @param nodes.[].fail_timeout int |
| @param nodes.[].ps str |
| @return ws { |
| "type": str, |
| "data": str |
| } |
| """ |
|
|
| ws: simple_websocket.Server = getattr(get, '_ws', None) |
| if not ws: |
| return json_response(False, "请使用 WebSocket 连接") |
|
|
| if get.get("load_id/d", 0) < 0: |
| ws.send(json.dumps({"type": "error", "data": "负载ID不能为空"})) |
| return |
|
|
| load, err = LoadSite.bind_tcp_load(get) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
|
|
| if len(get.get("nodes", [])) <= 0: |
| ws.send(json.dumps({"type": "error", "data": "负载ID不能为空"})) |
| return |
|
|
| nodes: List[TcpNode] = [] |
| for node_data in get.get("nodes", []): |
| node, err = TcpNode.bind(node_data) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
| nodes.append(node) |
|
|
| if not nodes: |
| ws.send(json.dumps({"type": "error", "data": "节点不能为空"})) |
| return |
|
|
| node_db = NodeDB() |
| if not node_db.load_id_exist(load.load_id): |
| ws.send(json.dumps({"type": "error", "data": "未找到该负载配置"})) |
| return |
|
|
| log_call = lambda x: ws.send(json.dumps({"type": "log", "data": x})) |
| err = load_check.check_tcp_load_data(load, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
| for n in nodes: |
| if load.tcp_config["type"] == "tcp": |
| err = load_check.check_tcp_node(n, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
|
|
| log_call("生成配置文件...") |
| cgr = config_generator.NginxConfigGenerator() |
| err = cgr.save_configs(load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "生成配置文件失败:%s" % err})) |
| return |
|
|
| err = node_db.update_load("tcp", load, nodes) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "保存数据失败:%s" % err})) |
| return |
| ws.send(json.dumps({"type": "success", "data": "数据保存成功"})) |
| return json_response(True, "负载配置更新成功") |
|
|
| def check_tcp_node(self, get): |
| """ |
| @route /ws_modsoc/node/load/check_tcp_node |
| @param type str |
| @param host str |
| @param port int |
| @param node_status str |
| @param weight int |
| @param max_fail int |
| @param fail_timeout int |
| @param ps str |
| @return ws { |
| "type": str, |
| "data": str |
| } |
| """ |
| ws: simple_websocket.Server = getattr(get, '_ws', None) |
| if not ws: |
| return json_response(False, "请使用 WebSocket 连接") |
| node, err = TcpNode.bind(get) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": "解析参数错误:%s" % err})) |
| return |
|
|
| log_call = lambda x: ws.send(json.dumps({"type": "log", "data": x})) |
| err = load_check.check_tcp_node(node, log_call) |
| if err: |
| ws.send(json.dumps({"type": "error", "data": err})) |
| return |
| ws.send(json.dumps({"type": "end", "data": "测试连接成功"})) |
| return json_response(True, "测试连接成功") |
|
|
| def remove_tcp_load(self, get): |
| """ |
| @route /mod/node/load/delete_http_load |
| @param load_id int |
| @param { |
| "type": str, |
| "data": str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| if load_id <= 0: |
| return json_response(False, "负载ID不能为空") |
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
| load["http_config"] = json.loads(load["http_config"]) |
| load["tcp_config"] = json.loads(load["tcp_config"]) |
| load, err = LoadSite.bind_tcp_load(load) |
| if err: |
| return json_response(False, "解析参数错误:%s" % err) |
| err = node_db.delete(load_id) |
| if err: |
| return json_response(False, "删除失败:%s" % err) |
| cgr = config_generator.NginxConfigGenerator() |
|
|
| cgr.delete_node_conf(load) |
|
|
| return json_response(True, "删除成功") |
|
|
| def multi_remove_tcp_load(self, get): |
| load_ids = get.get("load_ids/s", '') |
| try: |
| load_ids = json.loads(load_ids) |
| load_ids = [int(i) for i in load_ids] |
| except: |
| return json_response(False, "参数格式错误") |
|
|
| node_db = NodeDB() |
| for load_id in load_ids: |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
| load["http_config"] = json.loads(load["http_config"]) |
| load["tcp_config"] = json.loads(load["tcp_config"]) |
| load, err = LoadSite.bind_tcp_load(load) |
| if err: |
| return json_response(False, "解析参数错误:%s" % err) |
| err = node_db.delete(load_id) |
| if err: |
| return json_response(False, "删除失败:%s" % err) |
| cgr = config_generator.NginxConfigGenerator() |
|
|
| cgr.delete_node_conf(load, mutil=True) |
|
|
| NginxUtils.reload_nginx() |
| return json_response(True, "删除成功") |
|
|
|
|
| def tcp_load_list(self, get): |
| """ |
| @route /mod/node/load/http_load_list |
| @param page int |
| @param page_size int |
| @return { |
| "page": str, |
| "data": [ |
| { |
| "load_id": int, |
| "name": str, |
| "site_id": int, |
| "site_name": str, |
| "ps": str, |
| "http_config": { |
| "proxy_next_upstream": str, |
| "http_alg": str |
| }, |
| "created_at": int, |
| "request": int, |
| "error": int, |
| "qps": int, |
| "upstream_time": int, |
| "last_request_time": str, |
| "nodes": [ |
| { |
| "id": int, |
| "request": int, |
| "error": int, |
| "qps": int, |
| "upstream_time": int, |
| "last_request": str, |
| "load_id": int, |
| "node_site": str, |
| "node_site_id": int, |
| "port": int, |
| "path": str, |
| "node_status": str, |
| "weight": int, |
| "max_fail": int, |
| "node_id": int, |
| "max_conns": int, |
| "ps": str, |
| "created_at": int, |
| "fail_timeout": int |
| } |
| ] |
| } |
| ] |
| } |
| """ |
| node_db = NodeDB() |
| page = max(get.get("page/d", 1), 1) |
| page_size = max(get.get("page_size/d", 10), 1) |
| search = get.get('search/s', "").strip() |
| count = node_db.loads_count("tcp", search) |
| loads = node_db.loads_list("tcp", (page - 1) * page_size, page_size, search) |
| err = "" |
| for load in loads: |
| la = get_log_analyze("tcp", load["name"], interval=60) |
| load["nodes"], tmp_err = node_db.get_nodes(load["load_id"], load["site_type"]) |
| if tmp_err: |
| err = tmp_err |
| continue |
| la.analyze_logs() |
| day_status = la.get_today_stats() |
| load.update(day_status.get("total")) |
| load["http_config"] = json.loads(load["http_config"]) |
| load["tcp_config"] = json.loads(load["tcp_config"]) |
| for node in load["nodes"]: |
| tmp_status = day_status["nodes"].get(node["host"] + ":" + str(node["port"])) |
| if not tmp_status: |
| tmp_status = {'requests': 0, 'errors': 0, 'max_response_time': 0, |
| 'max_upstream_time': 0, 'last_update': 0, 'qps': 0} |
| node.update(tmp_status) |
| data = public.get_page(count,page,page_size) |
| data["data"] = loads |
| data["err_info"] = err |
| return data |
|
|
| def tcp_log(self, get): |
| """ |
| @route /mod/node/load/log |
| @param load_id int |
| @param date str |
| @return { |
| "time": str, |
| "client_ip": str, |
| "method": str, |
| "node": str, |
| "upstream_time": int, |
| "bytes_sent": int, |
| "body_sent": int, |
| "status": int, |
| "uri": str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| date = get.get("date", "") |
| position = get.get("position/d", -1) |
| limit = get.get("limit/d", 16) |
| if not load_id: |
| return json_response(False, "参数错误") |
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
| la = get_log_analyze("tcp", load["name"], date=date, interval=60) |
| last_position, data_list = la.get_log(position, limit) |
| return json_response(True, "获取成功", data={ |
| "last_position": last_position, |
| "logs": data_list |
| }) |
|
|
| def export_tcp_log(self, get): |
| """ |
| @route /mod/node/load/export_log |
| @param load_id int |
| @param date str |
| @return { |
| "status": bool, |
| "filename": str |
| } |
| """ |
|
|
| load_id = get.get("load_id/d", 0) |
| date = get.get("date", "") |
| if not load_id: |
| return json_response(False, "参数错误") |
| node_db = NodeDB() |
| load, err = node_db.get_load(load_id) |
| if err: |
| return json_response(False, "未找到该负载配置") |
|
|
| la = get_log_file("tcp", load["name"], date=date) |
| return { |
| "status": True, |
| "filename": la |
| } |