File size: 4,265 Bytes
61fd43f
 
 
6e8bd08
 
 
 
 
 
 
61fd43f
 
b5d11af
 
 
6e8bd08
61fd43f
 
 
6e8bd08
 
 
 
61fd43f
 
 
6e8bd08
61fd43f
6e8bd08
61fd43f
 
 
6e8bd08
 
 
61fd43f
 
6e8bd08
 
 
61fd43f
6e8bd08
 
61fd43f
6e8bd08
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61fd43f
6e8bd08
 
 
 
 
 
 
 
 
61fd43f
6e8bd08
 
61fd43f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b5d11af
 
 
 
 
d7d2462
b5d11af
 
 
61fd43f
 
b5d11af
 
d7d2462
b5d11af
61fd43f
d7d2462
6e8bd08
d7d2462
61fd43f
b5d11af
d7d2462
61fd43f
b5d11af
6e8bd08
 
61fd43f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_huggingface import HuggingFaceEmbeddings
# from dotenv import load_dotenv
import os
import pymongo
import logging
import nest_asyncio
from langchain.docstore.document import Document
import redis
import threading
import asyncio
import gradio as gr
import pandas as pd
import plotly.express as px
from datetime import datetime, timedelta

# config
# nest_asyncio.apply()
logging.basicConfig(level = logging.INFO)
database = "AlertSimAndRemediation"
collection = "alert_embed"
stream_name = "alerts"

# Global variables to store alert information
latest_alert = "No alerts yet."
alert_count = 0

# embedding model
embedding_args = {
    "model_name" : "BAAI/bge-large-en-v1.5",
    "model_kwargs" : {"device": "cpu"},
    "encode_kwargs" : {"normalize_embeddings": True}
}
embedding_model = HuggingFaceEmbeddings(**embedding_args)

# Mongo Connection
connection = pymongo.MongoClient(os.environ["MONGO_URI"])
alert_collection = connection[database][collection]

# Redis connection
r = redis.Redis(host=os.environ['REDIS_HOST'], password=os.environ['REDIS_PWD'], port=16652)

# Preprocessing
async def create_textual_description(entry_data):
    entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()}

    category = entry_dict["Category"]
    created_at = entry_dict["CreatedAt"]
    acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged"
    remedy = entry_dict["Remedy"]
    severity = entry_dict["Severity"]
    source = entry_dict["Source"]
    node = entry_dict["node"]

    description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}."

    return description, entry_dict

# Saving alert doc
async def save(entry):
    vector_search = MongoDBAtlasVectorSearch.from_documents(
        documents=[Document(
            page_content=entry["content"],
            metadata=entry["metadata"]
        )],
        embedding=embedding_model,
        collection=alert_collection,
        index_name="alert_index",
    )
    logging.info("Alerts stored successfully!")

# Listening to alert stream
async def listen_to_alerts(r):
    global latest_alert, alert_count
    try:
        last_id = '$'

        while True:
            entries = r.xread({stream_name: last_id}, block=0, count=None)

            if entries:
                stream, new_entries = entries[0]

                for entry_id, entry_data in new_entries:
                    description, entry_dict = await create_textual_description(entry_data)
                    await save({
                        "content" : description,
                        "metadata" : entry_dict
                    })
                    print(description)
                    latest_alert = description
                    alert_count += 1
                    # Update the last ID read
                    last_id = entry_id
                    await asyncio.sleep(1) 

    except KeyboardInterrupt:
        print("Exiting...")

def run_alert_listener():
    asyncio.run(listen_to_alerts(r))

# Start the alert listener thread
alert_thread = threading.Thread(target=run_alert_listener)
alert_thread.start()

# dashboard

# Function to get the latest alert and count
def get_latest_alert():
    global latest_alert, alert_count
    return latest_alert, alert_count

with gr.Blocks(theme=gr.themes.Soft()) as app:
    gr.Markdown("# 🚨 Alert Dashboard 🚨")
    
    with gr.Row():
        with gr.Column():
            latest_alert_box = gr.Textbox(label="Latest Alert", lines=3, interactive=False)
            alert_count_box = gr.Number(label="Alert Count", interactive=False)
            refresh_button = gr.Button("Refresh", variant="primary")
    
    refresh_button.click(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box])
    
    app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box])
    
    # Auto-refresh every 30 seconds
    app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box], every=30)

# Launch the app

# Launch the app
# if __name__ == "__main__":
app.launch()