Spaces:
Sleeping
Sleeping
########################### | |
# UI for Meeting RAG Q&A. # | |
########################### | |
##################### Imports ##################### | |
import uuid | |
import threading | |
import gradio as gr | |
import spaces | |
import os | |
from utilities.setup import get_files | |
from connections.pinecone import PineconeConnector | |
from connections.model import InferencePipeline | |
from services.embed_service.embed import EmbeddingService | |
from services.embed_service.utils import VTTTranscriptLoader, clean_text | |
from services.qa_service.qna import QAService | |
#################### Functions #################### | |
def process_transcripts(files, context, session_key): | |
with EmbeddingService(conf, | |
pinecone=pc_connector, | |
session_key=session_key) as e: | |
f = e.run(files) | |
return "Completed Loading Data" | |
def retrieve_answer(question, goals, session_key): | |
keycheck = namespace_check(session_key) | |
with QAService(conf, | |
pinecone=pc_connector, | |
model_pipeline=pipelines, | |
question=question, | |
goals=goals, | |
session_key=session_key, | |
keycheck=keycheck) as q: | |
f, c = q.run() | |
return f, c | |
def return_stored_transcripts(conf): | |
print("Returning stored transcript") | |
store = conf["layout"]["stored_meetings"] | |
meeting_paths = os.listdir(store) | |
combined_paths = [store + i for i in meeting_paths] | |
loader = VTTTranscriptLoader(combined_paths) | |
results = loader.load() | |
meetings = [] | |
for d in range(len(results)): | |
meetings.append(clean_text(results[d].text)) | |
return meetings, meeting_paths | |
def namespace_check(arg): | |
"""This piece of code should be moved to the backend QA Service""" | |
print("Checking namespace") | |
pc = pc_connector._connection() | |
index = pc.Index(conf["embeddings"]["index_name"]) | |
stats = index.describe_index_stats() | |
name_list = stats['namespaces'].keys() | |
name_list = [item for item in name_list if item != ''] | |
return arg in name_list | |
def drop_namespace(arg): | |
if conf["embeddings"]["override"]: | |
pass | |
print("Maintained Namespace: " + conf["embeddings"]["demo_namespace"]) | |
else: | |
namecheck = namespace_check(arg) | |
if namecheck: | |
pc = pc_connector._connection() | |
index = pc.Index(conf["embeddings"]["index_name"]) | |
index.delete(namespace=arg, delete_all=True) | |
print("Deleted namespace: " + arg) | |
def generate_key(): | |
print("Generating key") | |
unique_key = str(uuid.uuid1()) | |
unique_key = 'User_' + unique_key | |
timer = threading.Timer(conf["session"]["user_timeout"], drop_namespace, [unique_key]) | |
timer.start() | |
return unique_key | |
def b_clicked(o): | |
return gr.Button(interactive=True) | |
##################### Process ##################### | |
def main(conf): | |
with gr.Blocks(theme=gr.themes.Soft(text_size="lg")) as demo: | |
# Main page | |
with gr.TabItem(conf["layout"]["page_names"][0]): | |
gr.Markdown("# π€ Multi-Meeting Q&A RAG") | |
gr.Markdown("![](file/emoji_meeting_resized.png)") | |
gr.Markdown(get_files.load_markdown_file(conf["layout"]["about"])) | |
# User config page | |
with gr.TabItem(conf["layout"]["page_names"][1]): | |
gr.Markdown("# π Upload your own meeting docs") | |
gr.Markdown("""Be sure to retain your session key. This key is your ID to | |
your stored documents and is live for 1 hour after generation""") | |
create_unique_key = gr.Button("Generate session key") | |
output_unique_key = gr.Textbox(label="Copy key", | |
interactive=True , | |
show_copy_button=True, | |
show_label=True) | |
create_unique_key.click(fn=generate_key, | |
outputs=output_unique_key) | |
### This should not be visible until key is generated. | |
load_file = gr.UploadButton(label="Upload Transcript (.vtt)", | |
file_types=conf["session"]["filetypes"], | |
file_count='multiple', interactive=False) | |
repository = gr.Textbox(label="Progress", value="Waiting for load...", visible=True) | |
gr.Markdown("## Additional context you want to provide?", visible=False) | |
gr.Markdown("Try to keep this portion as concise as possible.", visible=False) | |
goals = gr.Textbox(label="Analysis Goals", | |
value=conf["defaults"]["goals"], | |
visible=False) # not incorporated yet. Will be with Q&A. | |
load_file.upload(process_transcripts, [load_file, goals, output_unique_key], repository) | |
create_unique_key.click(fn=b_clicked, | |
inputs=create_unique_key, | |
outputs=load_file) | |
# Meeting Question & Answer Page | |
with gr.TabItem(conf["layout"]["page_names"][2]): | |
gr.Markdown("# β Query meeting docs") | |
gr.Markdown("""Paste session key below to query your own personal docs. Otherwise, skip and ask a question to analyze the pre-stored meetings. | |
After asking the question, an answer will populate along with the meeting snippets retrieved.""") | |
session_key = gr.Textbox(label="Session key here", | |
value="") | |
gr.Markdown("### Query") | |
question = gr.Textbox(label="Ask a Question", | |
value=conf["defaults"]["question"]) | |
ask_button = gr.Button("Ask!") | |
model_output = gr.Markdown("### Answer") | |
context_output = gr.components.Textbox(label="Retrieved Context") | |
ask_button.click(fn=retrieve_answer, | |
inputs=[question, goals, session_key], | |
outputs=[model_output,context_output]) | |
with gr.TabItem(conf["layout"]["page_names"][3]): | |
gr.Markdown("# π View stored meeting docs") | |
gr.Markdown("""These are just some example meeting docs of very clean, brief synthetic meetings generated by LLMs. | |
Compare these to the demo outputs you get when you don't enter a token to see what contex was retrieved.""") | |
for i in range(len(meeting_paths)): | |
with gr.Accordion(meeting_paths[i], open=False): | |
gr.Textbox(value=meetings[i],lines=10) | |
demo.launch(server_name="0.0.0.0", allowed_paths=["/"]) | |
##################### Execute ##################### | |
if __name__ == "__main__": | |
# Get config | |
conf = get_files.json_cfg() | |
# Get keys | |
keys = get_files.get_keys() | |
# Get stored meetings | |
meetings, meeting_paths = return_stored_transcripts(conf) | |
# initialize pinecone connector | |
pc_connector = PineconeConnector( | |
api_key=keys["pinecone"], | |
index_name=conf["embeddings"]["index_name"], | |
embedding=conf["embeddings"]["embedding"], | |
) | |
# initialize model connector | |
pipelines = InferencePipeline(conf, | |
api_key=keys["huggingface"] | |
) | |
# run main | |
main(conf) | |