Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import streamlit as st | |
| import google.generativeai as genai | |
| from dotenv import load_dotenv | |
| from langchain_community.document_loaders import TextLoader | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.docstore.document import Document | |
| from langchain import PromptTemplate | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # Loading Google Gemini API Key from Environment Variables | |
| load_dotenv() | |
| genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
| # Display user Error, Warning or Success Message | |
| def fn_display_user_messages(lv_text, lv_type, mv_processing_message): | |
| """Display user Info, Error, Warning or Success Message""" | |
| if lv_type == "Success": | |
| with mv_processing_message.container(): | |
| st.success(lv_text) | |
| elif lv_type == "Error": | |
| with mv_processing_message.container(): | |
| st.error(lv_text) | |
| elif lv_type == "Warning": | |
| with mv_processing_message.container(): | |
| st.warning(lv_text) | |
| else: | |
| with mv_processing_message.container(): | |
| st.info(lv_text) | |
| # Upload pdf file into 'pdf-data' folder if it does not exist | |
| def fn_upload_pdf(mv_pdf_input_file, mv_processing_message): | |
| """Upload pdf file into 'pdf-data' folder if it does not exist""" | |
| lv_file_name = mv_pdf_input_file.name | |
| if not os.path.exists("pdf-data"): | |
| os.makedirs("pdf-data") | |
| lv_temp_file_path = os.path.join("pdf-data",lv_file_name) | |
| if os.path.exists(lv_temp_file_path): | |
| print("Step1: File already available") | |
| fn_display_user_messages("Step1: File already available","Warning", mv_processing_message) | |
| else: | |
| with open(lv_temp_file_path,"wb") as lv_file: | |
| lv_file.write(mv_pdf_input_file.getbuffer()) | |
| print("Step1: PDF uploaded successfully at -> " + lv_temp_file_path) | |
| fn_display_user_messages("Step1: PDF uploaded successfully at -> " + lv_temp_file_path, "Info", mv_processing_message) | |
| # Extract uploaded pdf data | |
| def fn_extract_pdf_data(mv_pdf_input_file, mv_processing_message): | |
| """Extract uploaded pdf data""" | |
| lv_temp_pdf_file_path = os.path.join("pdf-data",mv_pdf_input_file.name) | |
| # -- Loading PDF Data | |
| lv_pdf_loader = PyPDFLoader(lv_temp_pdf_file_path) | |
| lv_pdf_content = lv_pdf_loader.load() | |
| # -- Define patterns with flexibility | |
| pattern1 = r"(\w+)-\n(\w+)" # Match hyphenated words separated by a line break | |
| pattern2 = r"(?<!\n\s)\n(?!\s\n)" # Match line breaks not surrounded by whitespace | |
| pattern3 = r"\n\s*\n" # Match multiple line breaks with optional whitespace | |
| lv_pdf_formatted_content = [] | |
| for lv_page in lv_pdf_content: | |
| # -- Apply substitutions with flexibility | |
| lv_pdf_page_content = re.sub(pattern1, r"\1\2", lv_page.page_content) | |
| lv_pdf_page_content = re.sub(pattern2, " ", lv_pdf_page_content.strip()) | |
| lv_pdf_page_content = re.sub(pattern3, " ", lv_pdf_page_content) | |
| lv_pdf_page_content = re.sub("\n", " ", lv_pdf_page_content) | |
| lv_pdf_formatted_content.append( | |
| Document( page_content= lv_pdf_page_content, | |
| metadata= lv_page.metadata | |
| ) | |
| ) | |
| # print("Page Details of "+str(lv_page.metadata)+" is - "+lv_pdf_page_content) | |
| print("Step2: PDF content extracted") | |
| fn_display_user_messages("Step2: PDF content extracted", "Info", mv_processing_message) | |
| return lv_pdf_formatted_content | |
| # Load PDF data as Text File | |
| def fn_process_pf_data(mv_pdf_input_file, mv_processing_message): | |
| """Load PDF data as Text File""" | |
| # -- Create txt folder inside vectordb folder if it does not exist | |
| if not os.path.exists(os.path.join("vectordb","txt")): | |
| os.makedirs(os.path.join("vectordb","txt")) | |
| lv_file_name = mv_pdf_input_file.name[:-4] + ".txt" | |
| lv_temp_file_path = os.path.join(os.path.join("vectordb","txt"),lv_file_name) | |
| if os.path.isfile(lv_temp_file_path): | |
| print("Step2: Processed file details exists") | |
| fn_display_user_messages("Step2: Processed file details exists", "Warning", mv_processing_message) | |
| else: | |
| lv_pdf_formatted_content = fn_extract_pdf_data(mv_pdf_input_file, mv_processing_message) | |
| lv_text_data = "" | |
| for lv_page in lv_pdf_formatted_content: | |
| # print(lv_page.page_content) | |
| lv_text_data = lv_text_data + lv_page.page_content | |
| # print(lv_text_data) | |
| f = open(lv_temp_file_path, "w") | |
| f.write(lv_text_data) | |
| f.close() | |
| # Return QA Response | |
| def fn_generate_QnA_response(mv_user_question, mv_pdf_input_file, mv_processing_message): | |
| """Returns QA Response""" | |
| print("Step4: Generating LLM response") | |
| fn_display_user_messages("Step4: Generating LLM response","Info", mv_processing_message) | |
| lv_template = """Instruction: | |
| You are an AI assistant for answering questions about the provided context. | |
| You are given the following extracted parts of a long document and a question. Provide a detailed answer. | |
| If you don't know the answer, just say "Hmm, I'm not sure." Don't try to make up an answer. | |
| ======= | |
| {context} | |
| ======= | |
| Question: {question} | |
| Output:\n""" | |
| lv_qa_prompt = PromptTemplate( | |
| template=lv_template, | |
| input_variables=["question", "context"] | |
| ) | |
| # lv_model = ChatGoogleGenerativeAI(model="gemini-pro", | |
| # temperature=0.7, top_p=0.85) | |
| lv_model = genai.GenerativeModel('gemini-pro') | |
| lv_file_name = mv_pdf_input_file.name[:-4] + ".txt" | |
| lv_temp_file_path = os.path.join(os.path.join("vectordb","txt"),lv_file_name) | |
| lv_text_loader = TextLoader(lv_temp_file_path) | |
| lv_pdf_formatted_content = lv_text_loader.load() | |
| lv_text_data = "" | |
| for lv_page in lv_pdf_formatted_content: | |
| lv_text_data = lv_text_data + lv_page.page_content | |
| lv_qa_formatted_prompt = lv_qa_prompt.format( | |
| question=mv_user_question, | |
| context=lv_text_data | |
| ) | |
| # lv_llm_response = lv_model.invoke(lv_qa_formatted_prompt).content | |
| lv_llm_response = lv_model.generate_content(lv_qa_formatted_prompt).text | |
| print("Step5: LLM response generated") | |
| fn_display_user_messages("Step5: LLM response generated","Info", mv_processing_message) | |
| return lv_llm_response | |
| # Main Program | |
| def main(): | |
| # -- Streamlit Settings | |
| st.set_page_config("Chat With Your Product User Manual") | |
| st.header("Chat With Your Product User Manual💁") | |
| st.text("") | |
| st.text("") | |
| st.text("") | |
| # -- Display Processing Details | |
| mv_processing_message = st.empty() | |
| st.text("") | |
| st.text("") | |
| # -- Setting Chat History | |
| if "messages" not in st.session_state: | |
| st.session_state["messages"] = [] | |
| # -- Read User Manuals for Q&A | |
| with st.sidebar: | |
| mv_pdf_input_file = st.file_uploader("Choose a UM PDF file:", type=["pdf"]) | |
| st.text("") | |
| st.text("") | |
| # -- Process Uploaded User Manual PDF | |
| col1, col2, col3 = st.columns(3) | |
| if col1.button("Submit"): | |
| if mv_pdf_input_file is not None: | |
| fn_upload_pdf(mv_pdf_input_file, mv_processing_message) | |
| lv_pdf_page_content = fn_process_pf_data(mv_pdf_input_file, mv_processing_message) | |
| else: | |
| fn_display_user_messages("Upload PDF file before clicking on Submit", "Error", mv_processing_message) | |
| # -- Clear Chat History | |
| if col2.button("Reset"): | |
| st.session_state["messages"] = [] | |
| # -- Creating Chat Details | |
| mv_user_question = st.chat_input("Pass your input here") | |
| # -- Recording Chat Input and Generating Response | |
| if mv_user_question: | |
| # -- Saving User Input | |
| st.session_state.messages.append({"role": "user", "content": mv_user_question}) | |
| # -- Generating LLM Response | |
| lv_response = fn_generate_QnA_response(mv_user_question, mv_pdf_input_file, mv_processing_message) | |
| # -- Saving LLM Response | |
| st.session_state.messages.append( | |
| {"role": "agent", "content": lv_response} | |
| ) | |
| # -- Display chat messages from history | |
| for message in st.session_state.messages: | |
| with st.chat_message(message["role"]): | |
| st.markdown(message["content"]) | |
| # Loading Main | |
| if __name__ == "__main__": | |
| main() |