import os
import PyPDF2
import random
import itertools
import streamlit as st
from io import StringIO
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.retrievers import SVMRetriever
from langchain.chains import QAGenerationChain
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.callbacks.base import CallbackManager
from langchain.embeddings import HuggingFaceEmbeddings
st.set_page_config(page_title="PDF Analyzer",page_icon=':shark:')
@st.cache_data
def load_docs(files):
st.info("`Reading doc ...`")
all_text = ""
for file_path in files:
file_extension = os.path.splitext(file_path.name)[1]
if file_extension == ".pdf":
pdf_reader = PyPDF2.PdfReader(file_path)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
all_text += text
elif file_extension == ".txt":
stringio = StringIO(file_path.getvalue().decode("utf-8"))
text = stringio.read()
all_text += text
else:
st.warning('Please provide txt or pdf.', icon="⚠️")
return all_text
@st.cache_resource
def create_retriever(_embeddings, splits, retriever_type):
if retriever_type == "SIMILARITY SEARCH":
try:
vectorstore = FAISS.from_texts(splits, _embeddings)
except (IndexError, ValueError) as e:
st.error(f"Error creating vectorstore: {e}")
return
retriever = vectorstore.as_retriever(k=5)
elif retriever_type == "SUPPORT VECTOR MACHINES":
retriever = SVMRetriever.from_texts(splits, _embeddings)
return retriever
@st.cache_resource
def split_texts(text, chunk_size, overlap, split_method):
# Split texts
# IN: text, chunk size, overlap, split_method
# OUT: list of str splits
st.info("`Splitting doc ...`")
split_method = "RecursiveTextSplitter"
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=overlap)
splits = text_splitter.split_text(text)
if not splits:
st.error("Failed to split document")
st.stop()
return splits
@st.cache_data
def generate_eval(text, N, chunk):
# Generate N questions from context of chunk chars
# IN: text, N questions, chunk size to draw question from in the doc
# OUT: eval set as JSON list
st.info("`Generating sample questions ...`")
n = len(text)
starting_indices = [random.randint(0, n-chunk) for _ in range(N)]
sub_sequences = [text[i:i+chunk] for i in starting_indices]
chain = QAGenerationChain.from_llm(ChatOpenAI(temperature=0))
eval_set = []
for i, b in enumerate(sub_sequences):
try:
qa = chain.run(b)
eval_set.append(qa)
st.write("Creating Question:",i+1)
except:
st.warning('Error generating question %s.' % str(i+1), icon="⚠️")
eval_set_full = list(itertools.chain.from_iterable(eval_set))
return eval_set_full
# ...
def main():
foot = f"""
"""
st.markdown(foot, unsafe_allow_html=True)
# Add custom CSS
st.markdown(
"""
""",
unsafe_allow_html=True,
)
st.sidebar.image("img/logo1.png")
st.write(
f"""
PDF Analyzer
beta
""",
unsafe_allow_html=True,
)
st.sidebar.title("Menu")
embedding_option = st.sidebar.radio(
"Choose Embeddings", ["OpenAI Embeddings", "HuggingFace Embeddings(slower)"])
retriever_type = st.sidebar.selectbox(
"Choose Retriever", ["SIMILARITY SEARCH", "SUPPORT VECTOR MACHINES"])
# Use RecursiveCharacterTextSplitter as the default and only text splitter
splitter_type = "RecursiveCharacterTextSplitter"
if 'openai_api_key' not in st.session_state:
openai_api_key = st.text_input(
'Please enter your OpenAI API key or [get one here](https://platform.openai.com/account/api-keys)', value="", placeholder="Enter the OpenAI API key which begins with sk-")
if openai_api_key:
st.session_state.openai_api_key = openai_api_key
os.environ["OPENAI_API_KEY"] = openai_api_key
else:
#warning_text = 'Please enter your OpenAI API key. Get yours from here: [link](https://platform.openai.com/account/api-keys)'
#warning_html = f'{warning_text}'
#st.markdown(warning_html, unsafe_allow_html=True)
return
else:
os.environ["OPENAI_API_KEY"] = st.session_state.openai_api_key
uploaded_files = st.file_uploader("Upload a PDF or TXT Document", type=[
"pdf", "txt"], accept_multiple_files=True)
if uploaded_files:
# Check if last_uploaded_files is not in session_state or if uploaded_files are different from last_uploaded_files
if 'last_uploaded_files' not in st.session_state or st.session_state.last_uploaded_files != uploaded_files:
st.session_state.last_uploaded_files = uploaded_files
if 'eval_set' in st.session_state:
del st.session_state['eval_set']
# Load and process the uploaded PDF or TXT files.
loaded_text = load_docs(uploaded_files)
st.write("Documents uploaded and processed.")
# Split the document into chunks
splits = split_texts(loaded_text, chunk_size=1000,
overlap=0, split_method=splitter_type)
# Display the number of text chunks
num_chunks = len(splits)
st.write(f"Number of text chunks: {num_chunks}")
# Embed using OpenAI embeddings
# Embed using OpenAI embeddings or HuggingFace embeddings
if embedding_option == "OpenAI Embeddings":
embeddings = OpenAIEmbeddings()
elif embedding_option == "HuggingFace Embeddings(slower)":
# Replace "bert-base-uncased" with the desired HuggingFace model
embeddings = HuggingFaceEmbeddings()
retriever = create_retriever(embeddings, splits, retriever_type)
# Initialize the RetrievalQA chain with streaming output
callback_handler = StreamingStdOutCallbackHandler()
callback_manager = CallbackManager([callback_handler])
chat_openai = ChatOpenAI(
streaming=True, callback_manager=callback_manager, verbose=True, temperature=0)
qa = RetrievalQA.from_chain_type(llm=chat_openai, retriever=retriever, chain_type="stuff", verbose=True)
# Check if there are no generated question-answer pairs in the session state
if 'eval_set' not in st.session_state:
# Use the generate_eval function to generate question-answer pairs
num_eval_questions = 10 # Number of question-answer pairs to generate
st.session_state.eval_set = generate_eval(
loaded_text, num_eval_questions, 3000)
# Display the question-answer pairs in the sidebar with smaller text
for i, qa_pair in enumerate(st.session_state.eval_set):
st.sidebar.markdown(
f"""
Question {i + 1}
{qa_pair['question']}
{qa_pair['answer']}
""",
unsafe_allow_html=True,
)
# Question {i + 1}:
# Answer {i + 1}:
st.write("Ready to answer questions.")
# Question and answering
user_question = st.text_input("Enter your question:")
if user_question:
answer = qa.run(user_question)
st.write("Answer:", answer)
if __name__ == "__main__":
main()