|
import json |
|
import logging |
|
import os |
|
import re |
|
from functools import lru_cache |
|
|
|
from llama_index.llms import OpenAI |
|
from whoosh.query import Or, Term |
|
|
|
import csv |
|
|
|
import gradio as gr |
|
from llama_index import ServiceContext, \ |
|
Document, GPTListIndex, VectorStoreIndex |
|
from whoosh import fields, index |
|
from whoosh.qparser import QueryParser |
|
|
|
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=os.environ.get("LOGLEVEL", "INFO")) |
|
THREAD_ID = "thread_id" |
|
|
|
thread_index = {} |
|
comment_index = {} |
|
llama_cache = {} |
|
|
|
chatgpt = OpenAI(temperature=0, model="gpt-4") |
|
service_context = ServiceContext.from_defaults(llm=chatgpt, chunk_size=1024) |
|
|
|
|
|
def passive_topics(index_name, query, topic, summary_type): |
|
resp = search_keyword_matches(index_name, query) |
|
if resp is not None: |
|
print(f"Found {len(resp)} matches for {query}") |
|
docs_list = [Document(text=content) for content in resp] |
|
last_llama_index = GPTListIndex.from_documents(documents=docs_list, service_context=service_context) |
|
llama_cache[index_name] = last_llama_index |
|
resp = last_llama_index.as_query_engine().query( |
|
"What are the key negative topics from the discussion? Limit each topic to 30 characters") |
|
dynamic_topics = resp.response.split('\n') |
|
return dynamic_topics |
|
|
|
return [] |
|
|
|
|
|
def load_data(data_sets): |
|
for data_set in data_sets: |
|
create_thread_index(data_set) |
|
create_comment_index(data_set) |
|
|
|
|
|
def create_thread_index(data_set): |
|
|
|
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True)) |
|
index_path = f"./text_index/{data_set}" |
|
|
|
if not os.path.exists(index_path): |
|
os.makedirs(index_path) |
|
build_index = True |
|
else: |
|
build_index = False |
|
print("Loading from existing thread index " + data_set) |
|
if build_index: |
|
print("Building thread index for " + data_set) |
|
|
|
write_ix = index.create_in(index_path, schema) |
|
|
|
|
|
writer = write_ix.writer() |
|
|
|
|
|
with open(f'csv/{data_set}.csv', 'r') as csvfile: |
|
reader = csv.DictReader(csvfile) |
|
for row in reader: |
|
writer.add_document(id=row['thread_ts'], content=row['messages_json']) |
|
|
|
writer.commit() |
|
write_ix.close() |
|
|
|
read_ix = index.open_dir(index_path) |
|
thread_index[data_set] = read_ix |
|
|
|
|
|
def create_comment_index(data_set): |
|
|
|
schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True)) |
|
index_path = f"./text_index/{data_set}_comments" |
|
|
|
if not os.path.exists(index_path): |
|
os.mkdir(index_path) |
|
build_index = True |
|
else: |
|
build_index = False |
|
print("Loading from existing comments index " + data_set) |
|
if build_index: |
|
print("Building comments index for " + data_set) |
|
|
|
write_ix = index.create_in(index_path, schema) |
|
|
|
|
|
writer = write_ix.writer() |
|
|
|
|
|
count = 0 |
|
with open(f'csv/{data_set}.csv', 'r') as csvfile: |
|
reader = csv.DictReader(csvfile) |
|
for row in reader: |
|
comments = json.loads(row['messages_json']) |
|
for comment in comments: |
|
writer.add_document(id=row['thread_ts'], content=comment["content"]) |
|
count += 1 |
|
|
|
writer.commit() |
|
write_ix.close() |
|
|
|
read_ix = index.open_dir(index_path) |
|
comment_index[data_set] = read_ix |
|
|
|
|
|
def search_keyword_matches(ix, keyword_query): |
|
|
|
query_parser = QueryParser("content", ix.schema) |
|
query = query_parser.parse(keyword_query) |
|
return execute_text_search(ix, query) |
|
|
|
|
|
def search_thread_id_matches(ix, thread_id_list): |
|
|
|
query = Or([Term('id', id_) for id_ in thread_id_list]) |
|
return execute_text_search(ix, query) |
|
|
|
|
|
def execute_text_search(ix, q): |
|
|
|
with ix.searcher() as searcher: |
|
results = searcher.search(q, limit=20) |
|
if len(results) > 0: |
|
matches = [] |
|
for result in results: |
|
matches.append([result['id'], result['content']]) |
|
return matches |
|
else: |
|
return None |
|
|
|
|
|
def gen_insights(index_name, topic, summary_type): |
|
if topic is not None and len(topic) > 0: |
|
resp = generate_insights(index_name, topic, summary_type) |
|
return resp |
|
|
|
|
|
@lru_cache(maxsize=50) |
|
def generate_insights(index_name, topic, summary_type): |
|
if llama_cache[index_name] is None: |
|
return None |
|
query = f"What is the executive summary for the topic \"{topic}\"? Highlight negative aspects in 100 words" |
|
if summary_type == "None": |
|
return "" |
|
if summary_type == "Actions": |
|
query = f"What are the recommended action items for the topic \"{topic}\"? Limit response to 100 words using bullet points" |
|
elif summary_type == "Followup": |
|
query = f"What are the recommended questions to ask team for more clarity and latest status for the topic \"{topic}\"?" |
|
return llama_cache[index_name].as_query_engine().query(query).response |
|
|
|
|
|
def generate_comment_insights(index_name, topic, summary_type): |
|
if summary_type == "None": |
|
return "" |
|
if summary_type == "Show Comments": |
|
return show_docs(index_name, topic) |
|
if summary_type == "Show Threads": |
|
return show_threads(index_name, topic) |
|
if summary_type == "Show Summary": |
|
return show_thread_summaries(index_name, topic) |
|
|
|
return "Not yet implemented" |
|
|
|
|
|
def retrieve_llama_nodes(index_name, topic): |
|
llama = llama_cache[index_name] |
|
if llama is None: |
|
return None |
|
retriever = llama.as_retriever() |
|
return retriever.retrieve(topic) |
|
|
|
|
|
def show_docs(index_name, topic): |
|
nodes = retrieve_llama_nodes(index_name, topic) |
|
if nodes is None: |
|
return "No matching documents found for the topic " + topic |
|
text_list = [node_with_score.node.text for node_with_score in nodes] |
|
return f"Total Matched Comments {len(text_list)}\n" + "\n\n==============\n".join(text_list) |
|
|
|
|
|
def find_matching_threads(index_name, topic): |
|
nodes = retrieve_llama_nodes(index_name, topic) |
|
if nodes is None: |
|
return None |
|
thread_ids_list = [node_with_score.node.metadata[THREAD_ID] for node_with_score in nodes] |
|
matches = search_thread_id_matches(thread_index[index_name], thread_ids_list) |
|
threads = [] |
|
for thread in matches: |
|
comments = json.loads(thread[1]) |
|
thread_content = [] |
|
for comment in comments: |
|
thread_content.append(comment["content"]) |
|
threads.append("\n ->->-> \n ".join(thread_content)) |
|
return threads |
|
|
|
|
|
def show_threads(index_name, topic): |
|
threads = find_matching_threads(index_name, topic) |
|
if threads is None: |
|
return "No matching documents found for the topic " + topic |
|
return f"Total Threads {len(threads)}\n" + "\n\n==============\n".join(threads) |
|
|
|
|
|
@lru_cache(maxsize=50) |
|
def show_thread_summaries(index_name, topic): |
|
threads = find_matching_threads(index_name, topic) |
|
if threads is None: |
|
return "No matching documents found for the topic " + topic |
|
docs_list = [] |
|
for thread in threads: |
|
docs_list.append(Document(text=thread)) |
|
llama_idx = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context) |
|
query = f"What is the executive summary for the topic \"{topic}\"? Limit response to 100 words" |
|
resp = llama_idx.as_query_engine().query(query) |
|
return resp.response |
|
|
|
|
|
def remove_leading_numbers(text): |
|
|
|
return re.sub(r'^\d+[.)]\s*', '', text, flags=re.M) |
|
|
|
|
|
def find_topics_with_llama(index_name, query, matches): |
|
print(f"Found {len(matches)} matches for {query}") |
|
docs_list = [] |
|
for match in matches: |
|
metadata = {THREAD_ID: match[0]} |
|
docs_list.append(Document(text=match[1], metadata=metadata)) |
|
last_llama_index = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context) |
|
llama_cache[index_name] = last_llama_index |
|
resp = last_llama_index.as_query_engine().query( |
|
"What are the key negative topics from the discussion? Limit each topic to 30 characters") |
|
|
|
|
|
result_topics = resp.response.split('\n') |
|
clean_topics = [remove_leading_numbers(topic) for topic in result_topics] |
|
return clean_topics |
|
|
|
|
|
def find_topics_by_thread(index_name, query, topic, summary_type): |
|
resp = search_keyword_matches(thread_index[index_name], query) |
|
if resp is not None: |
|
result_topics = find_topics_with_llama(index_name, query, resp) |
|
return gr.Dropdown.update(choices=result_topics, value=result_topics[0]), gr.Radio.update(value="None") |
|
|
|
return "No matches found" if resp is None else resp |
|
|
|
|
|
def find_topics_by_comments(index_name, query, topic, summary_type): |
|
resp = search_keyword_matches(comment_index[index_name], query) |
|
if resp is not None: |
|
result_topics = find_topics_with_llama(index_name, query, resp) |
|
|
|
return gr.Dropdown.update(choices=result_topics, value=result_topics[0]), gr.Radio.update(value="None") |
|
|
|
return "No matches found" if resp is None else resp |
|
|
|
|
|
def main_demo(): |
|
demo = gr.Blocks() |
|
|
|
with demo: |
|
data_sets = ["platform-engg_messages", "apps-ui_messages", "ux-reviews_messages", "paloaltonetworks_messages"] |
|
load_data(data_sets) |
|
with gr.Tab("Thread"): |
|
data_sets_dd = gr.Dropdown(data_sets, |
|
type="value", value=data_sets[0], label="Select Data Source") |
|
keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0') |
|
find_topics_button = gr.Button("Find Negative Topics") |
|
topics_dd = gr.Dropdown([], |
|
type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True) |
|
|
|
show_details = gr.Radio(["None", "Summary", "Actions", "Followup"], label="Show Details") |
|
out_box = gr.Textbox(lines=11, label="Response") |
|
find_topics_button.click(find_topics_by_thread, |
|
inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd], |
|
outputs=[topics_dd, show_details]) |
|
show_details.change(gen_insights, inputs=[data_sets_dd, topics_dd, show_details], |
|
outputs=out_box) |
|
topics_dd.change(gen_insights, inputs=[data_sets_dd, topics_dd, show_details], |
|
outputs=out_box) |
|
|
|
with gr.Tab("Comment"): |
|
data_sets_dd = gr.Dropdown(data_sets, |
|
type="value", value=data_sets[0], label="Select Data Source") |
|
keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0') |
|
find_topics_button = gr.Button("Find Negative Topics") |
|
topics_dd = gr.Dropdown([], |
|
type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True) |
|
|
|
show_details = gr.Radio(["None", "Show Comments", "Show Threads", "Show Summary"], label="Show Details") |
|
out_box = gr.Textbox(lines=11, label="Response") |
|
find_topics_button.click(find_topics_by_comments, |
|
inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd], |
|
outputs=[topics_dd, show_details]) |
|
show_details.change(generate_comment_insights, inputs=[data_sets_dd, topics_dd, show_details], |
|
outputs=out_box) |
|
topics_dd.change(generate_comment_insights, inputs=[data_sets_dd, topics_dd, show_details], |
|
outputs=out_box) |
|
|
|
if 'LOGIN_PASS' in os.environ: |
|
demo.launch(auth=('axiamatic', os.environ['LOGIN_PASS']), |
|
auth_message='For access, please check my Slack profile or contact me in Slack.', |
|
share=False) |
|
else: |
|
demo.launch(share=False) |
|
|
|
main_demo() |
|
|