slackdemo / app.py
svummidi's picture
Added PAN support, fixed few event listeners for proper update in UI
b28791c
import json
import logging
import os
import re
from functools import lru_cache
from llama_index.llms import OpenAI
from whoosh.query import Or, Term
import csv
import gradio as gr
from llama_index import ServiceContext, \
Document, GPTListIndex, VectorStoreIndex
from whoosh import fields, index
from whoosh.qparser import QueryParser
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=os.environ.get("LOGLEVEL", "INFO"))
THREAD_ID = "thread_id"
thread_index = {}
comment_index = {}
llama_cache = {}
chatgpt = OpenAI(temperature=0, model="gpt-4")
service_context = ServiceContext.from_defaults(llm=chatgpt, chunk_size=1024)
def passive_topics(index_name, query, topic, summary_type):
resp = search_keyword_matches(index_name, query)
if resp is not None:
print(f"Found {len(resp)} matches for {query}")
docs_list = [Document(text=content) for content in resp]
last_llama_index = GPTListIndex.from_documents(documents=docs_list, service_context=service_context)
llama_cache[index_name] = last_llama_index
resp = last_llama_index.as_query_engine().query(
"What are the key negative topics from the discussion? Limit each topic to 30 characters")
dynamic_topics = resp.response.split('\n')
return dynamic_topics
return []
def load_data(data_sets):
for data_set in data_sets:
create_thread_index(data_set)
create_comment_index(data_set)
def create_thread_index(data_set):
# Define a schema for the index
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True))
index_path = f"./text_index/{data_set}"
# Create the index directory if it doesn't exist
if not os.path.exists(index_path):
os.makedirs(index_path)
build_index = True
else:
build_index = False
print("Loading from existing thread index " + data_set)
if build_index:
print("Building thread index for " + data_set)
# Create an index under "indexdir"
write_ix = index.create_in(index_path, schema)
# Create a writer object to add documents to the index
writer = write_ix.writer()
# Read the CSV file and add documents to the index
with open(f'csv/{data_set}.csv', 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
writer.add_document(id=row['thread_ts'], content=row['messages_json'])
# Commit the writer and close it
writer.commit()
write_ix.close()
# Open the index
read_ix = index.open_dir(index_path)
thread_index[data_set] = read_ix
def create_comment_index(data_set):
# Define a schema for the index
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True))
index_path = f"./text_index/{data_set}_comments"
# Create the index directory if it doesn't exist
if not os.path.exists(index_path):
os.mkdir(index_path)
build_index = True
else:
build_index = False
print("Loading from existing comments index " + data_set)
if build_index:
print("Building comments index for " + data_set)
# Create an index under "indexdir"
write_ix = index.create_in(index_path, schema)
# Create a writer object to add documents to the index
writer = write_ix.writer()
# Read the CSV file and add documents to the index
count = 0
with open(f'csv/{data_set}.csv', 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
comments = json.loads(row['messages_json'])
for comment in comments:
writer.add_document(id=row['thread_ts'], content=comment["content"])
count += 1
# Commit the writer and close it
writer.commit()
write_ix.close()
# Open the index
read_ix = index.open_dir(index_path)
comment_index[data_set] = read_ix
def search_keyword_matches(ix, keyword_query):
# Create a query parser
query_parser = QueryParser("content", ix.schema)
query = query_parser.parse(keyword_query)
return execute_text_search(ix, query)
def search_thread_id_matches(ix, thread_id_list):
# Create a query parser
query = Or([Term('id', id_) for id_ in thread_id_list])
return execute_text_search(ix, query)
def execute_text_search(ix, q):
# Search the index
with ix.searcher() as searcher:
results = searcher.search(q, limit=20)
if len(results) > 0:
matches = []
for result in results:
matches.append([result['id'], result['content']])
return matches
else:
return None
def gen_insights(index_name, topic, summary_type):
if topic is not None and len(topic) > 0:
resp = generate_insights(index_name, topic, summary_type)
return resp
@lru_cache(maxsize=50)
def generate_insights(index_name, topic, summary_type):
if llama_cache[index_name] is None:
return None
query = f"What is the executive summary for the topic \"{topic}\"? Highlight negative aspects in 100 words"
if summary_type == "None":
return ""
if summary_type == "Actions":
query = f"What are the recommended action items for the topic \"{topic}\"? Limit response to 100 words using bullet points"
elif summary_type == "Followup":
query = f"What are the recommended questions to ask team for more clarity and latest status for the topic \"{topic}\"?"
return llama_cache[index_name].as_query_engine().query(query).response
def generate_comment_insights(index_name, topic, summary_type):
if summary_type == "None":
return ""
if summary_type == "Show Comments":
return show_docs(index_name, topic)
if summary_type == "Show Threads":
return show_threads(index_name, topic)
if summary_type == "Show Summary":
return show_thread_summaries(index_name, topic)
return "Not yet implemented"
def retrieve_llama_nodes(index_name, topic):
llama = llama_cache[index_name]
if llama is None:
return None
retriever = llama.as_retriever()
return retriever.retrieve(topic)
def show_docs(index_name, topic):
nodes = retrieve_llama_nodes(index_name, topic)
if nodes is None:
return "No matching documents found for the topic " + topic
text_list = [node_with_score.node.text for node_with_score in nodes]
return f"Total Matched Comments {len(text_list)}\n" + "\n\n==============\n".join(text_list)
def find_matching_threads(index_name, topic):
nodes = retrieve_llama_nodes(index_name, topic)
if nodes is None:
return None
thread_ids_list = [node_with_score.node.metadata[THREAD_ID] for node_with_score in nodes]
matches = search_thread_id_matches(thread_index[index_name], thread_ids_list)
threads = []
for thread in matches:
comments = json.loads(thread[1])
thread_content = []
for comment in comments:
thread_content.append(comment["content"])
threads.append("\n ->->-> \n ".join(thread_content))
return threads
def show_threads(index_name, topic):
threads = find_matching_threads(index_name, topic)
if threads is None:
return "No matching documents found for the topic " + topic
return f"Total Threads {len(threads)}\n" + "\n\n==============\n".join(threads)
@lru_cache(maxsize=50)
def show_thread_summaries(index_name, topic):
threads = find_matching_threads(index_name, topic)
if threads is None:
return "No matching documents found for the topic " + topic
docs_list = []
for thread in threads:
docs_list.append(Document(text=thread))
llama_idx = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context)
query = f"What is the executive summary for the topic \"{topic}\"? Limit response to 100 words"
resp = llama_idx.as_query_engine().query(query)
return resp.response
def remove_leading_numbers(text):
# Use re.sub to replace any pattern of "<number>." at the beginning of a line.
return re.sub(r'^\d+[.)]\s*', '', text, flags=re.M)
def find_topics_with_llama(index_name, query, matches):
print(f"Found {len(matches)} matches for {query}")
docs_list = []
for match in matches:
metadata = {THREAD_ID: match[0]}
docs_list.append(Document(text=match[1], metadata=metadata))
last_llama_index = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context)
llama_cache[index_name] = last_llama_index
resp = last_llama_index.as_query_engine().query(
"What are the key negative topics from the discussion? Limit each topic to 30 characters")
# return resp.response.split('\n')
# return ["foo", "bar"]
result_topics = resp.response.split('\n')
clean_topics = [remove_leading_numbers(topic) for topic in result_topics]
return clean_topics
def find_topics_by_thread(index_name, query, topic, summary_type):
resp = search_keyword_matches(thread_index[index_name], query)
if resp is not None:
result_topics = find_topics_with_llama(index_name, query, resp)
return gr.Dropdown.update(choices=result_topics, value=result_topics[0]), gr.Radio.update(value="None")
return "No matches found" if resp is None else resp
def find_topics_by_comments(index_name, query, topic, summary_type):
resp = search_keyword_matches(comment_index[index_name], query)
if resp is not None:
result_topics = find_topics_with_llama(index_name, query, resp)
return gr.Dropdown.update(choices=result_topics, value=result_topics[0]), gr.Radio.update(value="None")
return "No matches found" if resp is None else resp
def main_demo():
demo = gr.Blocks()
with demo:
data_sets = ["platform-engg_messages", "apps-ui_messages", "ux-reviews_messages", "paloaltonetworks_messages"]
load_data(data_sets)
with gr.Tab("Thread"):
data_sets_dd = gr.Dropdown(data_sets,
type="value", value=data_sets[0], label="Select Data Source")
keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0')
find_topics_button = gr.Button("Find Negative Topics")
topics_dd = gr.Dropdown([],
type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True)
show_details = gr.Radio(["None", "Summary", "Actions", "Followup"], label="Show Details")
out_box = gr.Textbox(lines=11, label="Response")
find_topics_button.click(find_topics_by_thread,
inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd],
outputs=[topics_dd, show_details])
show_details.change(gen_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
topics_dd.change(gen_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
with gr.Tab("Comment"):
data_sets_dd = gr.Dropdown(data_sets,
type="value", value=data_sets[0], label="Select Data Source")
keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0')
find_topics_button = gr.Button("Find Negative Topics")
topics_dd = gr.Dropdown([],
type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True)
show_details = gr.Radio(["None", "Show Comments", "Show Threads", "Show Summary"], label="Show Details")
out_box = gr.Textbox(lines=11, label="Response")
find_topics_button.click(find_topics_by_comments,
inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd],
outputs=[topics_dd, show_details])
show_details.change(generate_comment_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
topics_dd.change(generate_comment_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
if 'LOGIN_PASS' in os.environ:
demo.launch(auth=('axiamatic', os.environ['LOGIN_PASS']),
auth_message='For access, please check my Slack profile or contact me in Slack.',
share=False)
else:
demo.launch(share=False)
main_demo()