Tools / db_diag /api.py
ZackBradshaw's picture
Upload folder using huggingface_hub
e67043b verified
import json
import os
import requests
import numpy as np
import openai
import paramiko
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet, stopwords
from nltk.tokenize import word_tokenize
import nltk
from ..tool import Tool
from swarms.tools.tools.database.utils.db_parser import get_conf
from swarms.tools.tools.database.utils.database import DBArgs, Database
from swarms.tools.models.customllm import CustomLLM
from swarms.tools.knowledge.knowledge_extraction import KnowledgeExtraction
from swarms.tools.tools.db_diag.anomaly_detection import detect_anomalies
from swarms.tools.tools.db_diag.anomaly_detection import prometheus
from swarms.tools.tools.db_diag.example_generate import bm25
import warnings
def obtain_values_of_metrics(start_time, end_time, metrics):
if (
end_time - start_time > 11000 * 3
): # maximum resolution of 11,000 points per timeseries
# raise Exception("The time range is too large, please reduce the time range")
warnings.warn(
"The time range ({}, {}) is too large, please reduce the time range".format(
start_time, end_time
)
)
required_values = {}
print(" ====> metrics: ", metrics)
for metric in metrics:
metric_values = prometheus(
"api/v1/query_range",
{"query": metric, "start": start_time, "end": end_time, "step": "3"},
)
if metric_values["data"]["result"] != []:
metric_values = metric_values["data"]["result"][0]["values"]
else:
raise Exception("No metric values found for the given time range")
# compute the average value of the metric
max_value = np.max(np.array([float(value) for _, value in metric_values]))
required_values[metric] = max_value
return required_values
def find_abnormal_metrics(start_time, end_time, monitoring_metrics, resource):
resource_keys = ["memory", "cpu", "disk", "network"]
abnormal_metrics = []
for metric_name in monitoring_metrics:
interval_time = 5
metric_values = prometheus(
"api/v1/query_range",
{
"query": metric_name,
"start": start_time - interval_time * 60,
"end": end_time + interval_time * 60,
"step": "3",
},
)
if metric_values["data"]["result"] != []:
metric_values = metric_values["data"]["result"][0]["values"]
else:
continue
if detect_anomalies(np.array([float(value) for _, value in metric_values])):
success = True
for key in resource_keys:
if key in metric_name and key != resource:
success = False
break
if success:
abnormal_metrics.append(metric_name)
return abnormal_metrics
def build_db_diag_tool(config) -> Tool:
tool = Tool(
"Database Diagnosis",
"Diagnose the bottlenecks of a database based on relevant metrics",
name_for_model="db_diag",
description_for_model="Plugin for diagnosing the bottlenecks of a database based on relevant metrics",
logo_url="https://commons.wikimedia.org/wiki/File:Postgresql_elephant.svg",
contact_email="hello@contact.com",
legal_info_url="hello@legal.com",
)
# URL_CURRENT_WEATHER= "http://api.weatherapi.com/v1/current.json"
# URL_FORECAST_WEATHER = "http://api.weatherapi.com/v1/forecast.json"
URL_PROMETHEUS = "http://8.131.229.55:9090/"
prometheus_metrics = {
"cpu_usage": 'avg(rate(process_cpu_seconds_total{instance="172.27.58.65:9187"}[5m]) * 1000)',
"cpu_metrics": [
'node_scrape_collector_duration_seconds{instance="172.27.58.65:9100"}',
'node_procs_running{instance="172.27.58.65:9100"}',
'node_procs_blocked{instance="172.27.58.65:9100"}',
'node_entropy_available_bits{instance="172.27.58.65:9100"}',
'node_load1{instance="172.27.58.65:9100"}',
'node_load5{instance="172.27.58.65:9100"}',
'node_load15{instance="172.27.58.65:9100"}',
],
"memory_usage": 'node_memory_MemTotal_bytes{instance=~"172.27.58.65:9100"} - (node_memory_Cached_bytes{instance=~"172.27.58.65:9100"} + node_memory_Buffers_bytes{instance=~"172.27.58.65:9100"} + node_memory_MemFree_bytes{instance=~"172.27.58.65:9100"})',
"memory_metrics": [
'node_memory_Inactive_anon_bytes{instance="172.27.58.65:9100"}',
'node_memory_MemFree_bytes{instance="172.27.58.65:9100"}',
'node_memory_Dirty_bytes{instance="172.27.58.65:9100"}',
'pg_stat_activity_count{datname=~"(imdbload|postgres|sysbench|template0|template1|tpcc|tpch)", instance=~"172.27.58.65:9187", state="active"} !=0',
],
"network_metrics": [
'node_sockstat_TCP_tw{instance="172.27.58.65:9100"}',
'node_sockstat_TCP_orphan{instance="172.27.58.65:9100"}',
],
}
# "node_sockstat_TCP_tw{instance=\"172.27.58.65:9100\"}",
# load knowlege extractor
knowledge_matcher = KnowledgeExtraction(
"/swarms.tools/tools/db_diag/root_causes_dbmind.jsonl"
)
# load db settings
script_path = os.path.abspath(__file__)
script_dir = os.path.dirname(script_path)
config = get_conf(script_dir + "/my_config.ini", "postgresql")
dbargs = DBArgs("postgresql", config=config) # todo assign database name
# send request to database
db = Database(dbargs, timeout=-1)
server_config = get_conf(script_dir + "/my_config.ini", "benchserver")
monitoring_metrics = []
with open(
str(os.getcwd()) + "/swarms/tools/db_diag/database_monitoring_metrics", "r"
) as f:
monitoring_metrics = f.read()
monitoring_metrics = eval(monitoring_metrics)
@tool.get("/obtain_start_and_end_time_of_anomaly")
def obtain_start_and_end_time_of_anomaly():
# Create SSH client
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
start_time = 0
end_time = 0
try:
# Connect to the remote server
ssh.connect(
server_config["server_address"],
username=server_config["username"],
password=server_config["password"],
)
# Create an SFTP client
sftp = ssh.open_sftp()
# Change to the remote directory
sftp.chdir(server_config["remote_directory"])
# Get a list of files in the directory
files = sftp.listdir()
required_file_name = ""
required_tp = -1
# Read the contents of each file
for filename in files:
remote_filepath = server_config["remote_directory"] + "/" + filename
if "trigger_time_log" not in filename:
continue
tp = filename.split("_")[0]
if tp.isdigit():
tp = int(tp)
if required_tp < tp:
required_tp = tp
required_file_name = filename
file_content = sftp.open(
server_config["remote_directory"] + "/" + required_file_name
).read()
file_content = file_content.decode()
tps = file_content.split("\n")[0]
start_time = tps.split(";")[0]
end_time = tps.split(";")[1]
finally:
# Close the SFTP session and SSH connection
sftp.close()
ssh.close()
return {"start_time": start_time, "end_time": end_time}
@tool.get("/whether_is_abnormal_metric")
def whether_is_abnormal_metric(
start_time: int, end_time: int, metric_name: str = "cpu_usage"
):
interval_time = 5
metric_values = prometheus(
"api/v1/query_range",
{
"query": prometheus_metrics[metric_name],
"start": start_time - interval_time * 60,
"end": end_time + interval_time * 60,
"step": "3",
},
)
# prometheus('api/v1/query_range', {'query': '100 - (avg(irate(node_cpu_seconds_total{instance=~"172.27.58.65:9100",mode="idle"}[1m])) * 100)', 'start': '1684412385', 'end': '1684413285', 'step': '3'})
# print(" === metric_values", metric_values)
if metric_values["data"]["result"] != []:
metric_values = metric_values["data"]["result"][0]["values"]
else:
raise Exception("No metric values found for the given time range")
# is_abnormal = detect_anomalies(np.array([float(value) for _, value in metric_values]))
is_abnormal = True
if is_abnormal:
return "The metric is abnormal"
else:
return "The metric is normal"
@tool.get("/cpu_diagnosis_agent")
def cpu_diagnosis_agent(start_time: int, end_time: int):
# live_tuples\n- dead_tuples\n- table_size
cpu_metrics = prometheus_metrics["cpu_metrics"]
cpu_metrics = cpu_metrics # + find_abnormal_metrics(start_time, end_time, monitoring_metrics, 'cpu')
print("==== cpu_metrics", cpu_metrics)
detailed_cpu_metrics = obtain_values_of_metrics(
start_time, end_time, cpu_metrics
)
docs_str = knowledge_matcher.match(detailed_cpu_metrics)
prompt = """The CPU metric is abnormal. Then obtain the CPU relevant metric values from Prometheus: {}.
Next output the analysis of potential causes of the high CPU usage based on the CPU relevant metric values,
{}""".format(
detailed_cpu_metrics, docs_str
)
print(prompt)
# response = openai.Completion.create(
# model="text-davinci-003",
# prompt=prompt,
# temperature=0,
# max_tokens=1000,
# top_p=1.0,
# frequency_penalty=0.0,
# presence_penalty=0.0,
# stop=["#", ";"]
# )
# output_text = response.choices[0].text.strip()
# Set up the OpenAI GPT-3 model
# model_engine = "gpt-3.5-turbo"
# prompt_response = openai.ChatCompletion.create(
# engine="gpt-3.5-turbo",
# messages=[
# {"role": "assistant", "content": "The table schema is as follows: " + str(schema)},
# {"role": "user", "content": str(prompt)}
# ]
# )
# output_text = prompt_response['choices'][0]['message']['content']
llm = CustomLLM()
output_analysis = llm(prompt)
return {"diagnose": output_analysis, "knowledge": docs_str}
@tool.get("/memory_diagnosis_agent")
def memory_diagnosis_agent(start_time: int, end_time: int):
memory_metrics = prometheus_metrics["memory_metrics"]
memory_metrics = prometheus_metrics["memory_metrics"]
memory_metrics = memory_metrics # + find_abnormal_metrics(start_time, end_time, monitoring_metrics, 'memory')
detailed_memory_metrics = obtain_values_of_metrics(
start_time, end_time, memory_metrics
)
openai.api_key = os.environ["OPENAI_API_KEY"]
db = Database(dbargs, timeout=-1)
slow_queries = db.obtain_historical_slow_queries()
slow_query_state = ""
for i, query in enumerate(slow_queries):
slow_query_state += str(i + 1) + ". " + str(query) + "\n"
print(slow_query_state)
# TODO: need a similarity match function to match the top-K examples
# 1. get the categories of incoming metrics. Such as "The abnormal metrics include A, B, C, D"
# 2. embedding the metrics
# note: 这个metrics的embedding有可能预计算吗?如果metrics的种类(组合数)有限的话
# 3. match the top-K examples(embedding)
# note: 不用embedding如何有效的筛选出来与当前metrics最相关的example呢?可以枚举吗?比如如果我知道某一个example涉及到哪些metrics?
# 该如何判断某一个metrics跟一段文本是相关的呢?能否用一个模型来判断一段文本涉及到哪些metrics呢?重新训练的话感觉需要很多样本才行
# 能不能用关键词数量?
docs_str = knowledge_matcher.match(detailed_memory_metrics)
prompt = """The memory metric is abnormal. Then obtain the memory metric values from Prometheus: {}. The slow queries are:
{}
Output the analysis of potential causes of the high memory usage based on the memory metric values and slow queries, e.g.,
{}
Note: include the important slow queries in the output.
""".format(
detailed_memory_metrics, slow_query_state, docs_str
)
# print(prompt)
# response = openai.Completion.create(
# model="text-davinci-003",
# prompt=prompt,
# temperature=0,
# max_tokens=1000,
# top_p=1.0,
# frequency_penalty=0.0,
# presence_penalty=0.0,
# stop=["#", ";"]
# )
# output_text = response.choices[0].text.strip()
# Set up the OpenAI GPT-3 model
# model_engine = "gpt-3.5-turbo"
# prompt_response = openai.ChatCompletion.create(
# engine="gpt-3.5-turbo",
# messages=[
# {"role": "assistant", "content": "The table schema is as follows: " + str(schema)},
# {"role": "user", "content": str(prompt)}
# ]
# )
# output_text = prompt_response['choices'][0]['message']['content']
llm = CustomLLM()
output_analysis = llm(prompt)
return {"diagnose": output_analysis, "knowledge": docs_str}
return tool