slackdemo / app.py
svummidi's picture
Added PAN support, fixed few event listeners for proper update in UI
b28791c
raw
history blame contribute delete
No virus
12.8 kB
import json
import logging
import os
import re
from functools import lru_cache
from llama_index.llms import OpenAI
from whoosh.query import Or, Term
import csv
import gradio as gr
from llama_index import ServiceContext, \
Document, GPTListIndex, VectorStoreIndex
from whoosh import fields, index
from whoosh.qparser import QueryParser
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=os.environ.get("LOGLEVEL", "INFO"))
THREAD_ID = "thread_id"
thread_index = {}
comment_index = {}
llama_cache = {}
chatgpt = OpenAI(temperature=0, model="gpt-4")
service_context = ServiceContext.from_defaults(llm=chatgpt, chunk_size=1024)
def passive_topics(index_name, query, topic, summary_type):
resp = search_keyword_matches(index_name, query)
if resp is not None:
print(f"Found {len(resp)} matches for {query}")
docs_list = [Document(text=content) for content in resp]
last_llama_index = GPTListIndex.from_documents(documents=docs_list, service_context=service_context)
llama_cache[index_name] = last_llama_index
resp = last_llama_index.as_query_engine().query(
"What are the key negative topics from the discussion? Limit each topic to 30 characters")
dynamic_topics = resp.response.split('\n')
return dynamic_topics
return []
def load_data(data_sets):
for data_set in data_sets:
create_thread_index(data_set)
create_comment_index(data_set)
def create_thread_index(data_set):
# Define a schema for the index
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True))
index_path = f"./text_index/{data_set}"
# Create the index directory if it doesn't exist
if not os.path.exists(index_path):
os.makedirs(index_path)
build_index = True
else:
build_index = False
print("Loading from existing thread index " + data_set)
if build_index:
print("Building thread index for " + data_set)
# Create an index under "indexdir"
write_ix = index.create_in(index_path, schema)
# Create a writer object to add documents to the index
writer = write_ix.writer()
# Read the CSV file and add documents to the index
with open(f'csv/{data_set}.csv', 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
writer.add_document(id=row['thread_ts'], content=row['messages_json'])
# Commit the writer and close it
writer.commit()
write_ix.close()
# Open the index
read_ix = index.open_dir(index_path)
thread_index[data_set] = read_ix
def create_comment_index(data_set):
# Define a schema for the index
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True))
index_path = f"./text_index/{data_set}_comments"
# Create the index directory if it doesn't exist
if not os.path.exists(index_path):
os.mkdir(index_path)
build_index = True
else:
build_index = False
print("Loading from existing comments index " + data_set)
if build_index:
print("Building comments index for " + data_set)
# Create an index under "indexdir"
write_ix = index.create_in(index_path, schema)
# Create a writer object to add documents to the index
writer = write_ix.writer()
# Read the CSV file and add documents to the index
count = 0
with open(f'csv/{data_set}.csv', 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
comments = json.loads(row['messages_json'])
for comment in comments:
writer.add_document(id=row['thread_ts'], content=comment["content"])
count += 1
# Commit the writer and close it
writer.commit()
write_ix.close()
# Open the index
read_ix = index.open_dir(index_path)
comment_index[data_set] = read_ix
def search_keyword_matches(ix, keyword_query):
# Create a query parser
query_parser = QueryParser("content", ix.schema)
query = query_parser.parse(keyword_query)
return execute_text_search(ix, query)
def search_thread_id_matches(ix, thread_id_list):
# Create a query parser
query = Or([Term('id', id_) for id_ in thread_id_list])
return execute_text_search(ix, query)
def execute_text_search(ix, q):
# Search the index
with ix.searcher() as searcher:
results = searcher.search(q, limit=20)
if len(results) > 0:
matches = []
for result in results:
matches.append([result['id'], result['content']])
return matches
else:
return None
def gen_insights(index_name, topic, summary_type):
if topic is not None and len(topic) > 0:
resp = generate_insights(index_name, topic, summary_type)
return resp
@lru_cache(maxsize=50)
def generate_insights(index_name, topic, summary_type):
if llama_cache[index_name] is None:
return None
query = f"What is the executive summary for the topic \"{topic}\"? Highlight negative aspects in 100 words"
if summary_type == "None":
return ""
if summary_type == "Actions":
query = f"What are the recommended action items for the topic \"{topic}\"? Limit response to 100 words using bullet points"
elif summary_type == "Followup":
query = f"What are the recommended questions to ask team for more clarity and latest status for the topic \"{topic}\"?"
return llama_cache[index_name].as_query_engine().query(query).response
def generate_comment_insights(index_name, topic, summary_type):
if summary_type == "None":
return ""
if summary_type == "Show Comments":
return show_docs(index_name, topic)
if summary_type == "Show Threads":
return show_threads(index_name, topic)
if summary_type == "Show Summary":
return show_thread_summaries(index_name, topic)
return "Not yet implemented"
def retrieve_llama_nodes(index_name, topic):
llama = llama_cache[index_name]
if llama is None:
return None
retriever = llama.as_retriever()
return retriever.retrieve(topic)
def show_docs(index_name, topic):
nodes = retrieve_llama_nodes(index_name, topic)
if nodes is None:
return "No matching documents found for the topic " + topic
text_list = [node_with_score.node.text for node_with_score in nodes]
return f"Total Matched Comments {len(text_list)}\n" + "\n\n==============\n".join(text_list)
def find_matching_threads(index_name, topic):
nodes = retrieve_llama_nodes(index_name, topic)
if nodes is None:
return None
thread_ids_list = [node_with_score.node.metadata[THREAD_ID] for node_with_score in nodes]
matches = search_thread_id_matches(thread_index[index_name], thread_ids_list)
threads = []
for thread in matches:
comments = json.loads(thread[1])
thread_content = []
for comment in comments:
thread_content.append(comment["content"])
threads.append("\n ->->-> \n ".join(thread_content))
return threads
def show_threads(index_name, topic):
threads = find_matching_threads(index_name, topic)
if threads is None:
return "No matching documents found for the topic " + topic
return f"Total Threads {len(threads)}\n" + "\n\n==============\n".join(threads)
@lru_cache(maxsize=50)
def show_thread_summaries(index_name, topic):
threads = find_matching_threads(index_name, topic)
if threads is None:
return "No matching documents found for the topic " + topic
docs_list = []
for thread in threads:
docs_list.append(Document(text=thread))
llama_idx = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context)
query = f"What is the executive summary for the topic \"{topic}\"? Limit response to 100 words"
resp = llama_idx.as_query_engine().query(query)
return resp.response
def remove_leading_numbers(text):
# Use re.sub to replace any pattern of "<number>." at the beginning of a line.
return re.sub(r'^\d+[.)]\s*', '', text, flags=re.M)
def find_topics_with_llama(index_name, query, matches):
print(f"Found {len(matches)} matches for {query}")
docs_list = []
for match in matches:
metadata = {THREAD_ID: match[0]}
docs_list.append(Document(text=match[1], metadata=metadata))
last_llama_index = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context)
llama_cache[index_name] = last_llama_index
resp = last_llama_index.as_query_engine().query(
"What are the key negative topics from the discussion? Limit each topic to 30 characters")
# return resp.response.split('\n')
# return ["foo", "bar"]
result_topics = resp.response.split('\n')
clean_topics = [remove_leading_numbers(topic) for topic in result_topics]
return clean_topics
def find_topics_by_thread(index_name, query, topic, summary_type):
resp = search_keyword_matches(thread_index[index_name], query)
if resp is not None:
result_topics = find_topics_with_llama(index_name, query, resp)
return gr.Dropdown.update(choices=result_topics, value=result_topics[0]), gr.Radio.update(value="None")
return "No matches found" if resp is None else resp
def find_topics_by_comments(index_name, query, topic, summary_type):
resp = search_keyword_matches(comment_index[index_name], query)
if resp is not None:
result_topics = find_topics_with_llama(index_name, query, resp)
return gr.Dropdown.update(choices=result_topics, value=result_topics[0]), gr.Radio.update(value="None")
return "No matches found" if resp is None else resp
def main_demo():
demo = gr.Blocks()
with demo:
data_sets = ["platform-engg_messages", "apps-ui_messages", "ux-reviews_messages", "paloaltonetworks_messages"]
load_data(data_sets)
with gr.Tab("Thread"):
data_sets_dd = gr.Dropdown(data_sets,
type="value", value=data_sets[0], label="Select Data Source")
keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0')
find_topics_button = gr.Button("Find Negative Topics")
topics_dd = gr.Dropdown([],
type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True)
show_details = gr.Radio(["None", "Summary", "Actions", "Followup"], label="Show Details")
out_box = gr.Textbox(lines=11, label="Response")
find_topics_button.click(find_topics_by_thread,
inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd],
outputs=[topics_dd, show_details])
show_details.change(gen_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
topics_dd.change(gen_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
with gr.Tab("Comment"):
data_sets_dd = gr.Dropdown(data_sets,
type="value", value=data_sets[0], label="Select Data Source")
keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0')
find_topics_button = gr.Button("Find Negative Topics")
topics_dd = gr.Dropdown([],
type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True)
show_details = gr.Radio(["None", "Show Comments", "Show Threads", "Show Summary"], label="Show Details")
out_box = gr.Textbox(lines=11, label="Response")
find_topics_button.click(find_topics_by_comments,
inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd],
outputs=[topics_dd, show_details])
show_details.change(generate_comment_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
topics_dd.change(generate_comment_insights, inputs=[data_sets_dd, topics_dd, show_details],
outputs=out_box)
if 'LOGIN_PASS' in os.environ:
demo.launch(auth=('axiamatic', os.environ['LOGIN_PASS']),
auth_message='For access, please check my Slack profile or contact me in Slack.',
share=False)
else:
demo.launch(share=False)
main_demo()