import os import PyPDF2 import random import itertools import streamlit as st from io import StringIO from langchain.vectorstores import FAISS from langchain.chains import RetrievalQA from langchain.chat_models import ChatOpenAI from langchain.retrievers import SVMRetriever from langchain.chains import QAGenerationChain from langchain.embeddings.openai import OpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain.callbacks.base import CallbackManager from langchain.embeddings import HuggingFaceEmbeddings st.set_page_config(page_title="PDF Analyzer",page_icon=':shark:') @st.cache_data def load_docs(files): st.info("`Reading doc ...`") all_text = "" for file_path in files: file_extension = os.path.splitext(file_path.name)[1] if file_extension == ".pdf": pdf_reader = PyPDF2.PdfReader(file_path) text = "" for page in pdf_reader.pages: text += page.extract_text() all_text += text elif file_extension == ".txt": stringio = StringIO(file_path.getvalue().decode("utf-8")) text = stringio.read() all_text += text else: st.warning('Please provide txt or pdf.', icon="⚠️") return all_text @st.cache_resource def create_retriever(_embeddings, splits, retriever_type): if retriever_type == "SIMILARITY SEARCH": try: vectorstore = FAISS.from_texts(splits, _embeddings) except (IndexError, ValueError) as e: st.error(f"Error creating vectorstore: {e}") return retriever = vectorstore.as_retriever(k=5) elif retriever_type == "SUPPORT VECTOR MACHINES": retriever = SVMRetriever.from_texts(splits, _embeddings) return retriever @st.cache_resource def split_texts(text, chunk_size, overlap, split_method): # Split texts # IN: text, chunk size, overlap, split_method # OUT: list of str splits st.info("`Splitting doc ...`") split_method = "RecursiveTextSplitter" text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=overlap) splits = text_splitter.split_text(text) if not splits: st.error("Failed to split document") st.stop() return splits @st.cache_data def generate_eval(text, N, chunk): # Generate N questions from context of chunk chars # IN: text, N questions, chunk size to draw question from in the doc # OUT: eval set as JSON list st.info("`Generating sample questions ...`") n = len(text) starting_indices = [random.randint(0, n-chunk) for _ in range(N)] sub_sequences = [text[i:i+chunk] for i in starting_indices] chain = QAGenerationChain.from_llm(ChatOpenAI(temperature=0)) eval_set = [] for i, b in enumerate(sub_sequences): try: qa = chain.run(b) eval_set.append(qa) st.write("Creating Question:",i+1) except: st.warning('Error generating question %s.' % str(i+1), icon="⚠️") eval_set_full = list(itertools.chain.from_iterable(eval_set)) return eval_set_full # ... def main(): foot = f"""

Made by Mehmet Balioglu

""" st.markdown(foot, unsafe_allow_html=True) # Add custom CSS st.markdown( """ """, unsafe_allow_html=True, ) st.sidebar.image("img/logo1.png") st.write( f"""

PDF Analyzer

beta
""", unsafe_allow_html=True, ) st.sidebar.title("Menu") embedding_option = st.sidebar.radio( "Choose Embeddings", ["OpenAI Embeddings", "HuggingFace Embeddings(slower)"]) retriever_type = st.sidebar.selectbox( "Choose Retriever", ["SIMILARITY SEARCH", "SUPPORT VECTOR MACHINES"]) # Use RecursiveCharacterTextSplitter as the default and only text splitter splitter_type = "RecursiveCharacterTextSplitter" if 'openai_api_key' not in st.session_state: openai_api_key = st.text_input( 'Please enter your OpenAI API key or [get one here](https://platform.openai.com/account/api-keys)', value="", placeholder="Enter the OpenAI API key which begins with sk-") if openai_api_key: st.session_state.openai_api_key = openai_api_key os.environ["OPENAI_API_KEY"] = openai_api_key else: #warning_text = 'Please enter your OpenAI API key. Get yours from here: [link](https://platform.openai.com/account/api-keys)' #warning_html = f'{warning_text}' #st.markdown(warning_html, unsafe_allow_html=True) return else: os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key uploaded_files = st.file_uploader("Upload a PDF or TXT Document", type=[ "pdf", "txt"], accept_multiple_files=True) if uploaded_files: # Check if last_uploaded_files is not in session_state or if uploaded_files are different from last_uploaded_files if 'last_uploaded_files' not in st.session_state or st.session_state.last_uploaded_files != uploaded_files: st.session_state.last_uploaded_files = uploaded_files if 'eval_set' in st.session_state: del st.session_state['eval_set'] # Load and process the uploaded PDF or TXT files. loaded_text = load_docs(uploaded_files) st.write("Documents uploaded and processed.") # Split the document into chunks splits = split_texts(loaded_text, chunk_size=1000, overlap=0, split_method=splitter_type) # Display the number of text chunks num_chunks = len(splits) st.write(f"Number of text chunks: {num_chunks}") # Embed using OpenAI embeddings # Embed using OpenAI embeddings or HuggingFace embeddings if embedding_option == "OpenAI Embeddings": embeddings = OpenAIEmbeddings() elif embedding_option == "HuggingFace Embeddings(slower)": # Replace "bert-base-uncased" with the desired HuggingFace model embeddings = HuggingFaceEmbeddings() retriever = create_retriever(embeddings, splits, retriever_type) # Initialize the RetrievalQA chain with streaming output callback_handler = StreamingStdOutCallbackHandler() callback_manager = CallbackManager([callback_handler]) chat_openai = ChatOpenAI( streaming=True, callback_manager=callback_manager, verbose=True, temperature=0) qa = RetrievalQA.from_chain_type(llm=chat_openai, retriever=retriever, chain_type="stuff", verbose=True) # Check if there are no generated question-answer pairs in the session state if 'eval_set' not in st.session_state: # Use the generate_eval function to generate question-answer pairs num_eval_questions = 10 # Number of question-answer pairs to generate st.session_state.eval_set = generate_eval( loaded_text, num_eval_questions, 3000) # Display the question-answer pairs in the sidebar with smaller text for i, qa_pair in enumerate(st.session_state.eval_set): st.sidebar.markdown( f"""
Question {i + 1}

{qa_pair['question']}

{qa_pair['answer']}

""", unsafe_allow_html=True, ) #

Question {i + 1}:

#

Answer {i + 1}:

st.write("Ready to answer questions.") # Question and answering user_question = st.text_input("Enter your question:") if user_question: answer = qa.run(user_question) st.write("Answer:", answer) if __name__ == "__main__": main()