import json import logging import os import re from functools import lru_cache from llama_index.llms import OpenAI from whoosh.query import Or, Term import csv import gradio as gr from llama_index import ServiceContext, \ Document, GPTListIndex, VectorStoreIndex from whoosh import fields, index from whoosh.qparser import QueryParser logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=os.environ.get("LOGLEVEL", "INFO")) THREAD_ID = "thread_id" thread_index = {} comment_index = {} llama_cache = {} chatgpt = OpenAI(temperature=0, model="gpt-4") service_context = ServiceContext.from_defaults(llm=chatgpt, chunk_size=1024) def passive_topics(index_name, query, topic, summary_type): resp = search_keyword_matches(index_name, query) if resp is not None: print(f"Found {len(resp)} matches for {query}") docs_list = [Document(text=content) for content in resp] last_llama_index = GPTListIndex.from_documents(documents=docs_list, service_context=service_context) llama_cache[index_name] = last_llama_index resp = last_llama_index.as_query_engine().query( "What are the key negative topics from the discussion? Limit each topic to 30 characters") dynamic_topics = resp.response.split('\n') return dynamic_topics return [] def load_data(data_sets): for data_set in data_sets: create_thread_index(data_set) create_comment_index(data_set) def create_thread_index(data_set): # Define a schema for the index schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True)) index_path = f"./text_index/{data_set}" # Create the index directory if it doesn't exist if not os.path.exists(index_path): os.mkdir(index_path) build_index = True else: build_index = False print("Loading from existing thread index " + data_set) if build_index: print("Building thread index for " + data_set) # Create an index under "indexdir" write_ix = index.create_in(index_path, schema) # Create a writer object to add documents to the index writer = write_ix.writer() # Read the CSV file and add documents to the index with open(f'csv/{data_set}.csv', 'r') as csvfile: reader = csv.DictReader(csvfile) for row in reader: writer.add_document(id=row['thread_ts'], content=row['messages_json']) # Commit the writer and close it writer.commit() write_ix.close() # Open the index read_ix = index.open_dir(index_path) thread_index[data_set] = read_ix def create_comment_index(data_set): # Define a schema for the index schema = fields.Schema(id=fields.ID(stored=True), content=fields.TEXT(stored=True)) index_path = f"./text_index/{data_set}_comments" # Create the index directory if it doesn't exist if not os.path.exists(index_path): os.mkdir(index_path) build_index = True else: build_index = False print("Loading from existing comments index " + data_set) if build_index: print("Building comments index for " + data_set) # Create an index under "indexdir" write_ix = index.create_in(index_path, schema) # Create a writer object to add documents to the index writer = write_ix.writer() # Read the CSV file and add documents to the index count = 0 with open(f'csv/{data_set}.csv', 'r') as csvfile: reader = csv.DictReader(csvfile) for row in reader: comments = json.loads(row['messages_json']) for comment in comments: writer.add_document(id=row['thread_ts'], content=comment["content"]) count += 1 # Commit the writer and close it writer.commit() write_ix.close() # Open the index read_ix = index.open_dir(index_path) comment_index[data_set] = read_ix def search_keyword_matches(ix, keyword_query): # Create a query parser query_parser = QueryParser("content", ix.schema) query = query_parser.parse(keyword_query) return execute_text_search(ix, query) def search_thread_id_matches(ix, thread_id_list): # Create a query parser query = Or([Term('id', id_) for id_ in thread_id_list]) return execute_text_search(ix, query) def execute_text_search(ix, q): # Search the index with ix.searcher() as searcher: results = searcher.search(q, limit=20) if len(results) > 0: matches = [] for result in results: matches.append([result['id'], result['content']]) return matches else: return None def gen_insights(index_name, topic, summary_type): if topic is not None and len(topic) > 0: resp = generate_insights(index_name, topic, summary_type) return resp.response @lru_cache(maxsize=50) def generate_insights(index_name, topic, summary_type): if llama_cache[index_name] is None: return None query = f"What is the executive summary for the topic \"{topic}\"? Highlight negative aspects in 100 words" if summary_type == "Actions": query = f"What are the recommended action items for the topic \"{topic}\"? Limit response to 100 words using bullet points" elif summary_type == "Followup": query = f"What are the recommended questions to ask team for more clarity and latest status for the topic \"{topic}\"?" return llama_cache[index_name].as_query_engine().query(query) def generate_comment_insights(index_name, topic, summary_type): if summary_type == "Show Comments": return show_docs(index_name, topic) if summary_type == "Show Threads": return show_threads(index_name, topic) if summary_type == "Show Summary": return show_thread_summaries(index_name, topic) return "Not yet implemented" def retrieve_llama_nodes(index_name, topic): llama = llama_cache[index_name] if llama is None: return None retriever = llama.as_retriever() return retriever.retrieve(topic) def show_docs(index_name, topic): nodes = retrieve_llama_nodes(index_name, topic) if nodes is None: return "No matching documents found for the topic " + topic text_list = [node_with_score.node.text for node_with_score in nodes] return f"Total Matched Comments {len(text_list)}\n" + "\n\n==============\n".join(text_list) def find_matching_threads(index_name, topic): nodes = retrieve_llama_nodes(index_name, topic) if nodes is None: return None thread_ids_list = [node_with_score.node.metadata[THREAD_ID] for node_with_score in nodes] matches = search_thread_id_matches(thread_index[index_name], thread_ids_list) threads = [] for thread in matches: comments = json.loads(thread[1]) thread_content = [] for comment in comments: thread_content.append(comment["content"]) threads.append("\n ->->-> \n ".join(thread_content)) return threads def show_threads(index_name, topic): threads = find_matching_threads(index_name, topic) if threads is None: return "No matching documents found for the topic " + topic return f"Total Threads {len(threads)}\n" + "\n\n==============\n".join(threads) @lru_cache(maxsize=50) def show_thread_summaries(index_name, topic): threads = find_matching_threads(index_name, topic) if threads is None: return "No matching documents found for the topic " + topic docs_list = [] for thread in threads: docs_list.append(Document(text=thread)) llama_idx = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context) query = f"What is the executive summary for the topic \"{topic}\"? Limit response to 100 words" resp = llama_idx.as_query_engine().query(query) return resp.response def remove_leading_numbers(text): # Use re.sub to replace any pattern of "." at the beginning of a line. return re.sub(r'^\d+[.)]\s*', '', text, flags=re.M) def find_topics_with_llama(index_name, query, matches): print(f"Found {len(matches)} matches for {query}") docs_list = [] for match in matches: metadata = {THREAD_ID: match[0]} docs_list.append(Document(text=match[1], metadata=metadata)) last_llama_index = VectorStoreIndex.from_documents(documents=docs_list, service_context=service_context) llama_cache[index_name] = last_llama_index resp = last_llama_index.as_query_engine().query( "What are the key negative topics from the discussion? Limit each topic to 30 characters") # return resp.response.split('\n') # return ["foo", "bar"] result_topics = resp.response.split('\n') clean_topics = [remove_leading_numbers(topic) for topic in result_topics] return clean_topics def find_topics_by_thread(index_name, query, topic, summary_type): resp = search_keyword_matches(thread_index[index_name], query) if resp is not None: result_topics = find_topics_with_llama(index_name, query, resp) return gr.Dropdown.update(choices=result_topics, value=result_topics[0]) return "No matches found" if resp is None else resp def find_topics_by_comments(index_name, query, topic, summary_type): resp = search_keyword_matches(comment_index[index_name], query) if resp is not None: result_topics = find_topics_with_llama(index_name, query, resp) return gr.Dropdown.update(choices=result_topics, value=result_topics[0]) return "No matches found" if resp is None else resp def main_demo(): demo = gr.Blocks() with demo: data_sets = ["platform-engg_messages", "apps-ui_messages", "ux-reviews_messages"] load_data(data_sets) with gr.Tab("Thread"): data_sets_dd = gr.Dropdown(data_sets, type="value", value=data_sets[0], label="Select Slack Channel") keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0') find_topics_button = gr.Button("Find Topics") topics_dd = gr.Dropdown([], type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True) show_details = gr.Radio(["Summary", "Actions", "Followup"], label="Show Details") find_topics_button.click(find_topics_by_thread, inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd], outputs=topics_dd) show_details.change(gen_insights, inputs=[data_sets_dd, topics_dd, show_details], outputs=gr.Textbox(lines=11, label="Response")) with gr.Tab("Comment"): data_sets_dd = gr.Dropdown(data_sets, type="value", value=data_sets[0], label="Select Slack Channel") keyword_txt = gr.Textbox(lines=2, label="Enter keywords to search", placeholder='CISO, auth0') find_topics_button = gr.Button("Find Topics") topics_dd = gr.Dropdown([], type="value", label="Select Topic with Negative Sentiment", allow_custom_value=True) show_details = gr.Radio(["Show Comments", "Show Threads", "Show Summary"], label="Show Details") find_topics_button.click(find_topics_by_comments, inputs=[data_sets_dd, keyword_txt, find_topics_button, topics_dd], outputs=topics_dd) show_details.change(generate_comment_insights, inputs=[data_sets_dd, topics_dd, show_details], outputs=gr.Textbox(lines=11, label="Response")) demo.launch() main_demo()