import streamlit as st from PyPDF2 import PdfReader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_google_genai import GoogleGenerativeAIEmbeddings import google.generativeai as genai from langchain_community.vectorstores import FAISS from langchain_google_genai import ChatGoogleGenerativeAI from langchain.chains.question_answering import load_qa_chain from langchain.prompts import PromptTemplate import os import json from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, TextStreamer, ConversationalPipeline ####CREDIT##### # Credit to author (Sri Laxmi) of original code reference: SriLaxmi1993 # Sri LaxmiGithub Link: https://github.com/SriLaxmi1993/Document-Genie-using-RAG-Framwork # Sri Laxmi Youtube:https://www.youtube.com/watch?v=SkY2u4UUr6M&t=112s ############### CH_size = 450 CH_overlap = 50 os.system("pip install -r requirements.txt") # some model #tokenizer = AutoTokenizer.from_pretrained("google/gemma-7b") #model = AutoModelForCausalLM.from_pretrained("google/gemma-7b") st.set_page_config(page_title="Gemini RAG", layout="wide") # This is the first API key input; no need to repeat it in the main function. api_key = 'AIzaSyCvXRggpO2yNwIpZmoMy_5Xhm2bDyD-pOo' #os.mkdir('faiss_index') # empty faiss_index and chat_history.json def delete_files_in_folder(folder_path): try: # Iterate over all the files in the folder chat_history_file = "chat_history.json" if os.path.exists(chat_history_file): os.remove(chat_history_file) for file_name in os.listdir(folder_path): file_path = os.path.join(folder_path, file_name) if os.path.isfile(file_path): # Check if it's a file os.remove(file_path) # Delete the file print(f"Deleted file: {file_path}") print("All files within the folder have been deleted successfully!") except Exception as e: print(f"An error occurred: {e}") if st.button("Reset Files", key="reset_button"): folder_path = 'faiss_index' delete_files_in_folder(folder_path) CH_size = 450 CH_overlap = 50 def get_pdf_text(pdf_docs): text = "" for pdf in pdf_docs: pdf_reader = PdfReader(pdf) for page in pdf_reader.pages: text += page.extract_text() return text def get_text_chunks(text): text_splitter = RecursiveCharacterTextSplitter(chunk_size=CH_size, chunk_overlap=CH_overlap) chunks = text_splitter.split_text(text) return chunks def get_vector_store(text_chunks, api_key): embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) vector_store.save_local("faiss_index") def get_conversational_chain(): prompt_template = """ Answer the question as detailed as possible from the provided context, make sure to provide all the details, if the answer is not in provided context just say, "answer is not available in the context", don't provide the wrong answer. When giving an answer, try to include all mentionings of the subject being asked and include this within your response\n\n Context:\n {context}?\n Question: \n{question}\n Answer: """ model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.2, google_api_key=api_key) prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) return chain # chat history functionality def update_chat_history(question, reply): # Check if chat history file exists chat_history_file = "chat_history.json" if os.path.exists(chat_history_file): # If file exists, load existing chat history with open(chat_history_file, "r") as file: chat_history = json.load(file) else: # If file doesn't exist, initialize chat history chat_history = {"conversations": []} # Add current conversation to chat history chat_history["conversations"].append({"question": question, "reply": reply}) # Write updated chat history back to file with open(chat_history_file, "w") as file: json.dump(chat_history, file, indent=4) # Display chat history st.subheader("Chat History") for conversation in chat_history["conversations"]: st.write(f"**Question:** {conversation['question']}") st.write(f"**Reply:** {conversation['reply']}") st.write("---") def user_input(user_question, api_key): embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) new_db = FAISS.load_local("faiss_index", embeddings,allow_dangerous_deserialization=True) docs = new_db.similarity_search(user_question) chain = get_conversational_chain() response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) st.write("Reply: ", response["output_text"]) #chat history update_chat_history(user_question, response["output_text"]) '''''''''''''''''' def clear_faiss_index(folder_path): try: if os.path.exists(folder_path): for file_name in os.listdir(folder_path): file_path = os.path.join(folder_path, file_name) if os.path.isfile(file_path): os.remove(file_path) st.write("Existing FAISS index files cleared successfully!") else: st.write("No existing FAISS index files found.") except Exception as e: st.error(f"An error occurred while clearing FAISS index files: {e}") # Function to process PDF files and recreate FAISS index def recreate_faiss_index(pdf_docs, chunk_size, chunk_overlap, api_key): try: # Clear existing FAISS index files clear_faiss_index("faiss_index") # Process PDF files and extract text text = "" for pdf in pdf_docs: pdf_reader = PdfReader(pdf) for page in pdf_reader.pages: text += page.extract_text() # Split text into chunks text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) chunks = text_splitter.split_text(text) # Generate embeddings for text chunks embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) vector_store = FAISS.from_texts(chunks, embedding=embeddings) # Save FAISS index vector_store.save_local("faiss_index") st.success("FAISS index recreated successfully!") except Exception as e: st.error(f"An error occurred while recreating FAISS index: {e}") def main(): CH_size = 450 CH_overlap = 50 st.header("RAG based LLM Application") user_question = st.text_input("Ask a Question from the PDF Files", key="user_question") if user_question and api_key: user_input(user_question, api_key) with st.sidebar: st.title("Menu:") pdf_docs = st.file_uploader("Upload your PDF Files and Click on the Submit & Process Button", accept_multiple_files=True, key="pdf_uploader") if st.button("Submit & Process", key="process_button") and api_key: with st.spinner("Processing..."): CH_size = 450 CH_overlap = 50 recreate_faiss_index(pdf_docs, CH_size, CH_overlap, api_key) raw_text = get_pdf_text(pdf_docs) text_chunks = get_text_chunks(raw_text) get_vector_store(text_chunks, api_key) st.success("Done") if __name__ == "__main__": main()