dev2607's picture
Update app.py
1818ffd verified
import os
import tempfile
import streamlit as st
import fitz # PyMuPDF
from typing import List, Dict, Any, Optional
from langchain_community.llms import HuggingFaceEndpoint
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
# Configure page
st.set_page_config(
page_title="PDF Q&A Assistant",
page_icon="πŸ“š",
layout="wide"
)
# Initialize session state variables if they don't exist
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "conversation_chain" not in st.session_state:
st.session_state.conversation_chain = None
if "document_processed" not in st.session_state:
st.session_state.document_processed = False
if "file_names" not in st.session_state:
st.session_state.file_names = []
class PDFQAAssistant:
def __init__(self,
hf_token: str = None,
model_name: str = "google/flan-t5-base", # Changed to a more accessible model
embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
"""
Initialize the PDF Q&A Assistant with Hugging Face models.
Args:
hf_token: Hugging Face API token
model_name: HF model to use for Q&A
embedding_model_name: HF model to use for embeddings
"""
self.model_name = model_name
self.embedding_model_name = embedding_model_name
self.hf_token = hf_token
# Create a temp directory for the vector store
self.persist_directory = os.path.join(tempfile.gettempdir(), "pdf_qa_vectorstore")
# Initialize LLM with Hugging Face
self.llm = HuggingFaceEndpoint(
repo_id=model_name,
huggingfacehub_api_token=hf_token,
max_length=512, # Reduced for smaller models
temperature=0.5
)
# Initialize embeddings with Hugging Face
self.embeddings = HuggingFaceEmbeddings(
model_name=embedding_model_name,
model_kwargs={'device': 'cpu'}
)
# Initialize text splitter for chunking documents
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800, # Smaller chunks for better processing
chunk_overlap=150,
length_function=len
)
# Vector store and conversation chain will be initialized when documents are loaded
self.vectorstore = None
self.memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# Create directories if they don't exist
os.makedirs(self.persist_directory, exist_ok=True)
def extract_text_from_pdf(self, pdf_file) -> str:
"""
Extract text from a PDF file using PyMuPDF.
Args:
pdf_file: Uploaded PDF file
Returns:
Extracted text as a string
"""
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(pdf_file.getvalue())
tmp_path = tmp_file.name
# Open the PDF
doc = fitz.open(tmp_path)
# Extract text from each page
text = ""
for page_num, page in enumerate(doc):
text += page.get_text()
# Clean up
doc.close()
os.unlink(tmp_path)
return text
except Exception as e:
st.error(f"Error extracting text from PDF: {e}")
raise
def process_pdf(self, pdf_file, document_name: str) -> None:
"""
Process a PDF file and prepare it for question answering.
Args:
pdf_file: Uploaded PDF file
document_name: Name to identify the document
"""
# Extract text from PDF
with st.status("Extracting text from PDF..."):
text = self.extract_text_from_pdf(pdf_file)
st.write(f"Extracted {len(text)} characters")
# Split text into chunks
with st.status("Splitting document into chunks..."):
chunks = self.text_splitter.split_text(text)
st.write(f"Document split into {len(chunks)} chunks")
# Create vector embeddings
with st.status("Creating vector embeddings..."):
# Create metadata for each chunk
metadatas = [{"source": document_name, "chunk": i} for i in range(len(chunks))]
# If vectorstore already exists, add to it, otherwise create a new one
if self.vectorstore is None:
self.vectorstore = Chroma.from_texts(
texts=chunks,
embedding=self.embeddings,
metadatas=metadatas,
persist_directory=self.persist_directory
)
else:
self.vectorstore.add_texts(texts=chunks, metadatas=metadatas)
# Persist the vector store
if hasattr(self.vectorstore, 'persist'):
self.vectorstore.persist()
# Initialize the conversation chain
with st.status("Setting up Q&A system..."):
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": 4} # Retrieve top 4 most relevant chunks
)
# Create a custom prompt template that includes the source information
qa_prompt = PromptTemplate(
input_variables=["context", "question", "chat_history"],
template="""
You are an AI assistant specializing in answering questions about documents.
Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say you don't know. Don't try to make up an answer.
Always cite the specific source or page number when possible.
Context:
{context}
Chat History:
{chat_history}
Question:
{question}
Answer:
"""
)
self.conversation_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=retriever,
memory=self.memory,
combine_docs_chain_kwargs={"prompt": qa_prompt},
return_source_documents=True
)
# Store the conversation chain in session state
st.session_state.conversation_chain = self.conversation_chain
st.success(f"Successfully processed {document_name}")
st.session_state.document_processed = True
def ask(self, question: str) -> Dict[str, Any]:
"""
Ask a question about the loaded documents.
Args:
question: The question to ask
Returns:
Dictionary with the answer and source documents
"""
if self.conversation_chain is None:
return {"answer": "Please load a document first before asking questions.", "sources": []}
try:
result = self.conversation_chain({"question": question})
# Format sources for better readability
sources = []
if "source_documents" in result:
for doc in result["source_documents"]:
source = doc.metadata.get("source", "Unknown")
chunk = doc.metadata.get("chunk", "Unknown")
if source not in [s["source"] for s in sources]:
sources.append({"source": source, "chunk": chunk})
return {
"answer": result["answer"],
"sources": sources
}
except Exception as e:
st.error(f"Error processing question: {e}")
return {"answer": f"Error processing your question: {e}", "sources": []}
def clear_memory(self) -> None:
"""Clear the conversation memory."""
self.memory.clear()
def get_document_summary(assistant, document_name):
"""Get a summary of the loaded document."""
st.subheader("Document Summary")
with st.status("Generating document summary..."):
questions = [
"What is the main topic of this document?",
"What are the key points from this document?",
"Could you provide a summary of this document in 3-5 bullet points?"
]
for question in questions:
result = assistant.ask(question)
st.write(f"**{question}**")
st.write(result["answer"])
st.divider()
# Main app function
def main():
st.title("πŸ“š AI-Powered PDF Reader & Q&A Assistant")
# Sidebar for settings and uploads
with st.sidebar:
st.header("Settings")
# Get HF_TOKEN from secrets or environment
if "HF_TOKEN" in st.secrets:
hf_token = st.secrets["HF_TOKEN"]
token_source = "Using HF_TOKEN from app secrets"
elif os.environ.get("HF_TOKEN"):
hf_token = os.environ.get("HF_TOKEN")
token_source = "Using HF_TOKEN from environment variables"
else:
hf_token = None
token_source = "No HF_TOKEN found"
st.info(token_source)
# Option to manually enter token if needed
use_manual_token = st.checkbox("Enter token manually", value=not hf_token)
if use_manual_token:
hf_token = st.text_input("Enter Hugging Face API Token:", type="password")
# Model selection with open-source models
st.subheader("Model Settings")
model_name = st.selectbox(
"Select LLM model:",
[
"google/flan-t5-base", # Smaller, more accessible model
"google/flan-t5-small", # Even smaller model
"facebook/bart-large-cnn", # Good for summarization
"distilbert-base-uncased" # Lightweight model
],
index=0
)
embedding_model = st.selectbox(
"Select Embedding model:",
[
"sentence-transformers/all-MiniLM-L6-v2",
"sentence-transformers/paraphrase-MiniLM-L3-v2" # Smaller embedding model
],
index=0
)
# Document upload
st.subheader("Upload Documents")
uploaded_files = st.file_uploader("Upload PDF documents",
type="pdf",
accept_multiple_files=True)
if uploaded_files:
process_btn = st.button("Process Documents")
if process_btn:
if not hf_token:
st.error("Please provide a valid Hugging Face API token.")
else:
# Initialize the assistant
try:
assistant = PDFQAAssistant(
hf_token=hf_token,
model_name=model_name,
embedding_model_name=embedding_model
)
# Process each uploaded file
for pdf_file in uploaded_files:
file_name = pdf_file.name
if file_name not in st.session_state.file_names:
st.session_state.file_names.append(file_name)
assistant.process_pdf(pdf_file, file_name)
# Store the assistant in session state
st.session_state.assistant = assistant
except Exception as e:
st.error(f"Error initializing assistant: {e}")
st.error("Try selecting a different model or check your token permissions.")
# Document management
if st.session_state.get("document_processed", False):
st.subheader("Document Management")
if st.button("Clear Chat History"):
if "assistant" in st.session_state:
st.session_state.assistant.clear_memory()
st.session_state.chat_history = []
st.success("Chat history cleared!")
if st.button("Generate Document Summary"):
if "assistant" in st.session_state and len(st.session_state.file_names) > 0:
get_document_summary(st.session_state.assistant,
st.session_state.file_names[0])
# Main area for chat interface
if not st.session_state.get("document_processed", False):
st.info("πŸ‘ˆ Please upload and process a PDF document to get started.")
# Display demo information
st.header("How It Works")
col1, col2, col3 = st.columns(3)
with col1:
st.subheader("1. Upload PDF")
st.markdown("Upload any PDF document you want to query.")
with col2:
st.subheader("2. Process Document")
st.markdown("The AI will extract text and create searchable embeddings.")
with col3:
st.subheader("3. Ask Questions")
st.markdown("Ask any question about your document and get accurate answers.")
else:
# Chat interface
st.header("Ask Questions About Your Documents")
# Display processed files
st.caption(f"Processed Files: {', '.join(st.session_state.file_names)}")
# Display chat history
for message in st.session_state.chat_history:
if message["role"] == "user":
st.chat_message("user").write(message["content"])
else:
st.chat_message("assistant").write(message["content"])
if message.get("sources"): # Use .get() with default to avoid KeyError
with st.expander("View Sources"):
for source in message["sources"]:
st.write(f"- {source['source']} (chunk {source['chunk']})")
# Input for new question
if question := st.chat_input("Ask a question about your documents..."):
# Add user question to chat history
st.session_state.chat_history.append({
"role": "user",
"content": question
})
# Display user question
st.chat_message("user").write(question)
# Get the answer
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
try:
result = st.session_state.assistant.ask(question)
st.write(result["answer"])
# Show sources if available
if result.get("sources"): # Use .get() with default to avoid KeyError
with st.expander("View Sources"):
for source in result["sources"]:
st.write(f"- {source['source']} (chunk {source['chunk']})")
# Add assistant response to chat history
st.session_state.chat_history.append({
"role": "assistant",
"content": result["answer"],
"sources": result.get("sources", []) # Use .get() with default to avoid KeyError
})
except Exception as e:
st.error(f"Error getting response: {e}")
st.error("Please try a different question or model.")
if __name__ == "__main__":
main()