import streamlit as st from openai.error import OpenAIError from .utils import * from typing import Text, Union multiple_files = True def query_pipeline(index:VectorStore, query:Text, stream_answer:bool=False, n_sources:int=5)->Text: """This function reproduces the querying pipeline considering a given input index.""" # retrieving the most relevant pieces of information within the knowledge base sources = search_docs(index, query=query, k=n_sources) # getting the answer, all at once answer = get_answer(sources, query=query, stream_answer=stream_answer)["output_text"] return answer def toggle_process_document(): """Toggles the greenlight for the next step in the pipeline, i.e. processing the document.""" if "processing_document_greenlight" not in st.session_state: st.session_state["processing_document_greenlight"] = True st.session_state["processing_document_greenlight"] = not st.session_state["processing_document_greenlight"] def register_new_file_name(file_name): """ Registers a new file name in the internal session state. """ if "uploaded_file_names" not in st.session_state: st.session_state["uploaded_file_names"] = [] st.session_state["uploaded_file_names"].append(file_name) def clear_index(): """ Clears the index from the internal session state. This is a non reversible operation. """ if "index" in st.session_state: del globals()["index"] def clear_session_state(): """ Clears the session state iterating over keys. This is a non reversible operation. """ for k in st.session_state.keys(): del st.session_state[k] def register_new_file(new_file): """ Registers a new file in the internal session state. """ if "uploaded_files" not in st.session_state: st.session_state["uploaded_files"] = [] st.session_state["uploaded_files"].extend(new_file) def clear_all_files(): """Removes all uploaded files from the interal session state.""" st.session_state["uploaded_files"] = [] def append_uploaded_files(file): """Appends the uploaded files to the internal session state.""" st.session_state.get("uploaded_files", []).extend(file) def set_openai_api_key(api_key:Text)->bool: """Sets the internal OpenAI API key to the given value. Args: api_key (Text): OpenAI API key """ if not check_openai_api_key(api_key=api_key): raise ValueError("Invalid OpenAI API key! Please provide a valid key.") st.session_state["OPENAI_API_KEY"] = api_key st.session_state["api_key_configured"] = True return True def parse_file(file:Union[PDFFile, DocxFile, TxtFile, CodeFile]) -> None: """Converts a file to a document using specialized parsers.""" if file.name.endswith(".pdf"): doc = parse_pdf(file) elif file.name.endswith(".docx"): doc = parse_docx(file) elif file.name.split["."][1] in [".txt", ".py", ".json", ".html", ".css", ".md" ]: doc = parse_txt(file) else: st.error("File type not yet supported! Supported files: [.pdf, .docx, .txt, .py, .json, .html, .css, .md]") doc = None return doc # this function can be used to define a single doc processing pipeline # def document_embedding_pipeline(file:Union[PDFFile, DocxFile, TxtFile, CodeFile]) -> None: def qa_main(): """Main function for the QA app.""" st.title("Chat with a file 💬📖") st.write("Just upload something using and start chatting with a version of GPT4 that has read the file!") # OpenAI API Key - TODO: consider adding a key valid for everyone # st.header("Configure OpenAI API Key") # st.warning('Please enter your OpenAI API Key!', icon='⚠️') # uncomment the following lines to add a user-specific key # user_secret = st.text_input( # "Insert your OpenAI API key here ([get your API key](https://platform.openai.com/account/api-keys)).", # type="password", # placeholder="Paste your OpenAI API key here (sk-...)", # help="You can get your API key from https://platform.openai.com/account/api-keys.", # value=st.session_state.get("OPENAI_API_KEY", ""), # ) user_secret = st.secrets["OPENAI_API_KEY"] if user_secret: if set_openai_api_key(user_secret): # removing this when the OpenAI API key is hardcoded # st.success('OpenAI API key successfully accessed!', icon='✅') # greenlight for next step, i.e. uploading the document to chat with st.session_state["upload_document_greenlight"] = True if st.session_state.get("upload_document_greenlight"): # File that needs to be queried st.header("Upload a file") st.file_uploader( "Upload a pdf, docx, or txt file (scanned documents not supported)", type=["pdf", "docx", "txt", "py", "json", "html", "css", "md"], help="Scanned documents are not supported yet 🥲", accept_multiple_files=multiple_files, #on_change=toggle_process_document, key="uploaded_file" ) documents = {} indexes = {} for file in st.session_state["uploaded_file"]: parsed_file = parse_file(file) # converts the files into a list of documents document = text_to_docs(pages=tuple(parsed_file), file_name=file.name) documents[file.name] = document with st.spinner(f"Indexing {file.name} (might take some time)"): try: # indexing the document uploaded indexes[file.name] = embed_docs(file_name=file.name, _docs=tuple(document)) except OpenAIError as e: st.error("OpenAI error encountered: ", e._message) if len(documents)>1: # documents to be indexed when providing the query st.multiselect( label="Select the documents to be indexed", options=list(documents.keys()), key="multiselect_documents_choices", ) elif len(documents)==1: st.session_state["multiselect_documents_choices"] = [list(documents.keys())[0]] # this is the code that actually performs the chat process if "messages" not in st.session_state: # checking if there is any cache history st.session_state["messages"] = [] for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"], unsafe_allow_html=True) if prompt:=st.chat_input("Ask the document something..."): if prompt=="1": prompt="What is this document about?" st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): # full_response will store every question asked to all the document(s) considered full_response = "" message_placeholder = st.empty() # asking the same question to all of the documents considered for chat_document in st.session_state["multiselect_documents_choices"]: # keeping track of what is asked to what document full_response += \ f"Asking {chat_document} question {prompt}
" message_placeholder.markdown(full_response, unsafe_allow_html=True) with st.spinner("Querying the document..."): # retrieving the vector store associated to the chat document considered chat_index = indexes[chat_document] # producing the answer considered, live for answer_bit in query_pipeline(chat_index, prompt, stream_answer=True, n_sources=20): full_response += answer_bit message_placeholder.markdown(full_response + "▌", unsafe_allow_html=True) # appending a final entering full_response += "
" message_placeholder.markdown(full_response, unsafe_allow_html=True) # appending the final response obtained after having asked all the documents st.session_state.messages.append({"role": "assistant", "content": full_response})