|
import streamlit as st |
|
from PyPDF2 import PdfReader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_google_genai import GoogleGenerativeAIEmbeddings |
|
import google.generativeai as genai |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
from langchain.chains.question_answering import load_qa_chain |
|
from langchain.prompts import PromptTemplate |
|
import os |
|
import json |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, TextStreamer, ConversationalPipeline |
|
|
|
|
|
|
|
|
|
|
|
|
|
os.system("pip install -r requirements.txt") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Gemini RAG", layout="wide") |
|
|
|
|
|
api_key = 'AIzaSyCvXRggpO2yNwIpZmoMy_5Xhm2bDyD-pOo' |
|
|
|
|
|
|
|
|
|
|
|
def delete_files_in_folder(folder_path): |
|
try: |
|
|
|
chat_history_file = "chat_history.json" |
|
if os.path.exists(chat_history_file): |
|
os.remove(chat_history_file) |
|
for file_name in os.listdir(folder_path): |
|
file_path = os.path.join(folder_path, file_name) |
|
if os.path.isfile(file_path): |
|
os.remove(file_path) |
|
print(f"Deleted file: {file_path}") |
|
print("All files within the folder have been deleted successfully!") |
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
|
|
|
|
if st.button("Reset Files", key="reset_button"): |
|
folder_path = 'faiss_index' |
|
delete_files_in_folder(folder_path) |
|
|
|
CH_size = 450 |
|
CH_overlap = 50 |
|
|
|
|
|
def get_pdf_text(pdf_docs): |
|
text = "" |
|
for pdf in pdf_docs: |
|
pdf_reader = PdfReader(pdf) |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() |
|
return text |
|
|
|
|
|
def get_text_chunks(text): |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CH_size, chunk_overlap=CH_overlap) |
|
chunks = text_splitter.split_text(text) |
|
return chunks |
|
|
|
|
|
def get_vector_store(text_chunks, api_key): |
|
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) |
|
vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) |
|
vector_store.save_local("faiss_index") |
|
|
|
|
|
def get_conversational_chain(): |
|
prompt_template = """ |
|
Answer the question as detailed as possible from the provided context, make sure to provide all the details, if the answer is not in |
|
provided context just say, "answer is not available in the context", don't provide the wrong answer. When giving an answer, try to include all mentionings of the subject being asked and include this within your response\n\n |
|
Context:\n {context}?\n |
|
Question: \n{question}\n |
|
|
|
Answer: |
|
""" |
|
model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.2, google_api_key=api_key) |
|
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) |
|
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) |
|
return chain |
|
|
|
|
|
|
|
def update_chat_history(question, reply): |
|
|
|
chat_history_file = "chat_history.json" |
|
if os.path.exists(chat_history_file): |
|
|
|
with open(chat_history_file, "r") as file: |
|
chat_history = json.load(file) |
|
else: |
|
|
|
chat_history = {"conversations": []} |
|
|
|
|
|
chat_history["conversations"].append({"question": question, "reply": reply}) |
|
|
|
|
|
with open(chat_history_file, "w") as file: |
|
json.dump(chat_history, file, indent=4) |
|
|
|
st.subheader("Chat History") |
|
for conversation in chat_history["conversations"]: |
|
st.write(f"**Question:** {conversation['question']}") |
|
st.write(f"**Reply:** {conversation['reply']}") |
|
st.write("---") |
|
|
|
|
|
|
|
def user_input(user_question, api_key): |
|
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) |
|
new_db = FAISS.load_local("faiss_index", embeddings,allow_dangerous_deserialization=True) |
|
docs = new_db.similarity_search(user_question) |
|
chain = get_conversational_chain() |
|
response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) |
|
st.write("Reply: ", response["output_text"]) |
|
|
|
|
|
update_chat_history(user_question, response["output_text"]) |
|
|
|
'''''''''''''''''' |
|
|
|
def clear_faiss_index(folder_path): |
|
try: |
|
if os.path.exists(folder_path): |
|
for file_name in os.listdir(folder_path): |
|
file_path = os.path.join(folder_path, file_name) |
|
if os.path.isfile(file_path): |
|
os.remove(file_path) |
|
st.write("Existing FAISS index files cleared successfully!") |
|
else: |
|
st.write("No existing FAISS index files found.") |
|
except Exception as e: |
|
st.error(f"An error occurred while clearing FAISS index files: {e}") |
|
|
|
|
|
|
|
def recreate_faiss_index(pdf_docs, chunk_size, chunk_overlap, api_key): |
|
try: |
|
|
|
clear_faiss_index("faiss_index") |
|
|
|
|
|
text = "" |
|
for pdf in pdf_docs: |
|
pdf_reader = PdfReader(pdf) |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() |
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) |
|
chunks = text_splitter.split_text(text) |
|
|
|
|
|
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) |
|
vector_store = FAISS.from_texts(chunks, embedding=embeddings) |
|
|
|
|
|
vector_store.save_local("faiss_index") |
|
|
|
st.success("FAISS index recreated successfully!") |
|
except Exception as e: |
|
st.error(f"An error occurred while recreating FAISS index: {e}") |
|
|
|
|
|
def main(): |
|
st.header("RAG based LLM Application") |
|
|
|
user_question = st.text_input("Ask a Question from the PDF Files", key="user_question") |
|
|
|
if user_question and api_key: |
|
user_input(user_question, api_key) |
|
|
|
with st.sidebar: |
|
st.title("Menu:") |
|
|
|
pdf_docs = st.file_uploader("Upload your PDF Files and Click on the Submit & Process Button", |
|
accept_multiple_files=True, key="pdf_uploader") |
|
if st.button("Submit & Process", key="process_button") and api_key: |
|
with st.spinner("Processing..."): |
|
recreate_faiss_index(pdf_docs, CH_size, CH_overlap, api_key) |
|
|
|
raw_text = get_pdf_text(pdf_docs) |
|
text_chunks = get_text_chunks(raw_text) |
|
get_vector_store(text_chunks, api_key) |
|
st.success("Done") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|