import os import re from tempfile import NamedTemporaryFile import streamlit as st import pandas as pd import folium from streamlit_folium import st_folium import anthropic # Import necessary modules from LangChain and community extensions from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import HumanMessage, AIMessage from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain.chains import create_history_aware_retriever, create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain from langchain.docstore.document import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyPDFLoader, TextLoader from langchain_community.vectorstores import FAISS # ============================================================================= # DATA LOADING UTILITIES # ============================================================================= @st.cache_data def load_pickle_data(file_path): """Load and cache a pickle file from the given path.""" return pd.read_pickle(file_path) city_mapping_df = load_pickle_data("./maps_helpers/city_mapping_df.pkl") city_plans_df = load_pickle_data("./maps_helpers/city_plans_df.pkl") # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def remove_code_blocks(text): """ Remove code block markers (triple backticks) from the text. Args: text (str): The text to clean. Returns: str: Cleaned text without code block markers. """ code_block_pattern = r"^```(?:\w+)?\n(.*?)\n```$" match = re.match(code_block_pattern, text, re.DOTALL) if match: return match.group(1).strip() return text def load_documents_from_pdf(file): """ Load documents from an uploaded PDF file. Args: file: Uploaded PDF file. Returns: list: List of documents extracted from the PDF. """ if not file.name.endswith('.pdf'): raise ValueError("The uploaded file is not a PDF. Please upload a PDF file.") with NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf: temp_pdf.write(file.read()) temp_pdf_path = temp_pdf.name loader = PyPDFLoader(temp_pdf_path) docs = loader.load() os.remove(temp_pdf_path) return docs def load_vector_store_from_path(path): """ Load a FAISS vector store from a given directory path. Args: path (str): Path to the vector store directory. Returns: FAISS: Loaded vector store. """ embeddings = OpenAIEmbeddings(model="text-embedding-3-large") return FAISS.load_local(path, embeddings, allow_dangerous_deserialization=True) def list_vector_store_documents(): """ List available vector store documents from the 'Individual_All_Vectorstores' directory. Returns: list: List of document names. """ directory_path = "Individual_All_Vectorstores" if not os.path.exists(directory_path): raise FileNotFoundError( f"The directory '{directory_path}' does not exist. " "Run `create_and_save_individual_vector_stores()` to create it." ) documents = [ f.replace("_vectorstore", "").replace("_", " ") for f in os.listdir(directory_path) if f.endswith("_vectorstore") ] return documents # ============================================================================= # SUMMARY AND QA FUNCTIONS # ============================================================================= def summary_generation(api_key, uploaded_file, questions_path, prompt_path, display_placeholder): """ Generate a summary from a PDF file by processing its content and performing Q&A. Steps: - Save the uploaded PDF temporarily. - Load and split the PDF into document chunks. - Create a FAISS vector store and set up a retriever. - Load a system prompt and initialize the language model. - For each question in the questions file, run retrieval and QA. - Update the display placeholder with results. Args: api_key (str): OpenAI API key. uploaded_file: Uploaded PDF file. questions_path (str): Path to the text file with questions. prompt_path (str): Path to the system prompt file. display_placeholder: Streamlit placeholder for displaying results. Returns: list: List of formatted Q&A results. """ os.environ["OPENAI_API_KEY"] = api_key # Save the uploaded PDF to a temporary file with NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf: temp_pdf.write(uploaded_file.read()) temp_pdf_path = temp_pdf.name # Load and split the PDF into documents loader = PyPDFLoader(temp_pdf_path) docs = loader.load() text_splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=500) splits = text_splitter.split_documents(docs) # Create a FAISS vector store for the document splits vectorstore = FAISS.from_documents(splits, embedding=OpenAIEmbeddings(model="text-embedding-3-large")) retriever = vectorstore.as_retriever(search_kwargs={"k": 10}) # Load the system prompt for Q&A if os.path.exists(prompt_path): with open(prompt_path, "r") as file: system_prompt = file.read() else: raise FileNotFoundError(f"The specified file was not found: {prompt_path}") # Create a prompt template with the system and human messages prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("human", "{input}"), ]) # Initialize the language model (using GPT-4o in this case) llm = ChatOpenAI(model="gpt-4o") # Create the document chain and retrieval chain for Q&A question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context") rag_chain = create_retrieval_chain(retriever, question_answer_chain) # Load questions from the provided questions file if os.path.exists(questions_path): with open(questions_path, "r") as file: questions = [line.strip() for line in file.readlines() if line.strip()] else: raise FileNotFoundError(f"The specified file was not found: {questions_path}") qa_results = [] for question in questions: result = rag_chain.invoke({"input": question}) answer = remove_code_blocks(result["answer"]) qa_text = f"### Question: {question}\n**Answer:**\n{answer}\n" qa_results.append(qa_text) display_placeholder.markdown("\n".join(qa_results), unsafe_allow_html=True) # Remove the temporary PDF file os.remove(temp_pdf_path) return qa_results def multi_plan_qa(api_key, input_text, display_placeholder): """ Perform multi-plan Q&A using an existing combined vector store. Args: api_key (str): OpenAI API key. input_text (str): The question to ask. display_placeholder: Streamlit placeholder for displaying results. """ os.environ["OPENAI_API_KEY"] = api_key # Load the combined vector store embeddings = OpenAIEmbeddings(model="text-embedding-3-large") vector_store = FAISS.load_local("Combined_Summary_Vectorstore", embeddings, allow_dangerous_deserialization=True) retriever = vector_store.as_retriever(search_kwargs={"k": 50}) # Load the multi-document Q&A system prompt prompt_path = "Prompts/multi_document_qa_system_prompt.md" if os.path.exists(prompt_path): with open(prompt_path, "r") as file: system_prompt = file.read() else: raise FileNotFoundError(f"The specified file was not found: {prompt_path}") prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("human", "{input}"), ]) llm = ChatOpenAI(model="gpt-4o") question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context") rag_chain = create_retrieval_chain(retriever, question_answer_chain) result = rag_chain.invoke({"input": input_text}) answer = result["answer"] display_placeholder.markdown(f"**Answer:**\n{answer}") def multi_plan_qa_multi_vectorstore(api_key, input_text, display_placeholder): """ Perform multi-plan Q&A using multiple individual vector stores. Args: api_key (str): OpenAI API key. input_text (str): The question to ask. display_placeholder: Streamlit placeholder for displaying results. """ os.environ["OPENAI_API_KEY"] = api_key vectorstore_directory = "Individual_Summary_Vectorstores" vectorstore_names = [ d for d in os.listdir(vectorstore_directory) if os.path.isdir(os.path.join(vectorstore_directory, d)) ] all_retrieved_chunks = [] for vectorstore_name in vectorstore_names: vectorstore_path = os.path.join(vectorstore_directory, vectorstore_name) embeddings = OpenAIEmbeddings(model="text-embedding-3-large") vector_store = FAISS.load_local(vectorstore_path, embeddings, allow_dangerous_deserialization=True) retriever = vector_store.as_retriever(search_kwargs={"k": 2}) retrieved_chunks = retriever.invoke(input_text) all_retrieved_chunks.extend(retrieved_chunks) # Load system prompt for multi-document QA prompt_path = "Prompts/multi_document_qa_system_prompt.md" if os.path.exists(prompt_path): with open(prompt_path, "r") as file: system_prompt = file.read() else: raise FileNotFoundError(f"The specified file was not found: {prompt_path}") prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("human", "{input}"), ]) llm = ChatOpenAI(model="gpt-4o") question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context") result = question_answer_chain.invoke({ "input": input_text, "context": all_retrieved_chunks }) answer = result["answer"] if "answer" in result else result display_placeholder.markdown(f"**Answer:**\n{answer}") def comparison_qa(api_key, focus_input, comparison_inputs, input_text, display_placeholder): """ Compare a focus document against multiple comparison documents using a one-to-many query. Args: api_key (str): OpenAI API key. focus_input: Focus document (uploaded PDF or vector store path). comparison_inputs: List of comparison documents (uploaded PDFs or vector store paths). input_text (str): The comparison question to ask. display_placeholder: Streamlit placeholder for displaying results. """ os.environ["OPENAI_API_KEY"] = api_key # Load focus document or vector store and set up retriever if isinstance(focus_input, st.runtime.uploaded_file_manager.UploadedFile): focus_docs = load_documents_from_pdf(focus_input) splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=500) focus_splits = splitter.split_documents(focus_docs) focus_vector_store = FAISS.from_documents(focus_splits, OpenAIEmbeddings(model="text-embedding-3-large")) elif isinstance(focus_input, str) and os.path.isdir(focus_input): focus_vector_store = load_vector_store_from_path(focus_input) else: raise ValueError("Invalid focus input type. Must be a PDF file or a path to a vector store.") focus_retriever = focus_vector_store.as_retriever(search_kwargs={"k": 5}) focus_docs = focus_retriever.invoke(input_text) # Process each comparison input comparison_chunks = [] for comparison_input in comparison_inputs: if isinstance(comparison_input, st.runtime.uploaded_file_manager.UploadedFile): comparison_docs = load_documents_from_pdf(comparison_input) splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=500) comparison_splits = splitter.split_documents(comparison_docs) comparison_vector_store = FAISS.from_documents(comparison_splits, OpenAIEmbeddings(model="text-embedding-3-large")) elif isinstance(comparison_input, str) and os.path.isdir(comparison_input): comparison_vector_store = load_vector_store_from_path(comparison_input) else: raise ValueError("Invalid comparison input type. Must be a PDF file or a path to a vector store.") comparison_retriever = comparison_vector_store.as_retriever(search_kwargs={"k": 5}) comparison_docs = comparison_retriever.invoke(input_text) comparison_chunks.extend(comparison_docs) combined_context = focus_docs + comparison_chunks # Load system prompt for comparison QA prompt_path = "Prompts/comparison_prompt.md" if os.path.exists(prompt_path): with open(prompt_path, "r") as file: system_prompt = file.read() else: raise FileNotFoundError(f"The specified file was not found: {prompt_path}") prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("human", "{input}") ]) llm = ChatOpenAI(model="gpt-4o") question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context") result = question_answer_chain.invoke({ "context": combined_context, "input": input_text }) answer = result["answer"] if "answer" in result else result display_placeholder.markdown(f"**Answer:**\n{answer}") def document_qa(api_key, focus_input, user_input): """ Query a single document (PDF or vector store) to answer a question. Args: api_key (str): OpenAI API key. focus_input: Focus document (uploaded PDF or path to vector store). user_input (str): The question to ask. Returns: str: The model's answer. """ os.environ["OPENAI_API_KEY"] = api_key # Create a vector store from the focus input if isinstance(focus_input, st.runtime.uploaded_file_manager.UploadedFile): docs = load_documents_from_pdf(focus_input) splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=500) splits = splitter.split_documents(docs) vector_store = FAISS.from_documents(splits, OpenAIEmbeddings(model="text-embedding-3-large")) else: vector_store = load_vector_store_from_path(focus_input) retriever = vector_store.as_retriever(search_kwargs={"k": 5}) retrieved_chunks = retriever.invoke(user_input) # Load the multi-document QA system prompt prompt_path = "Prompts/multi_document_qa_system_prompt.md" if os.path.exists(prompt_path): with open(prompt_path, "r") as file: system_prompt = file.read() else: raise FileNotFoundError(f"The specified file was not found: {prompt_path}") # Create a conversation RAG chain that incorporates chat history llm = ChatOpenAI(model="gpt-4o") history_prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}") ]) history_retriever_chain = create_history_aware_retriever(llm, retriever, history_prompt) answer_prompt = ChatPromptTemplate.from_messages([ ("system", "Answer the user's questions based on the below context:\n\n{context}"), MessagesPlaceholder(variable_name="chat_history"), ("human", "{input}") ]) document_chain = create_stuff_documents_chain(llm, answer_prompt) conversation_rag_chain = create_retrieval_chain(history_retriever_chain, document_chain) response = conversation_rag_chain.invoke({ "chat_history": st.session_state.get("chat_history", []), "input": user_input, "context": retrieved_chunks }) return response["answer"] def comparison_qa_long_context(api_key, anthropic_api_key, input_text, focus_input, comparison_inputs, display_placeholder): """ Compare plans using a long-context language model (e.g., Anthropic Claude). Modified to work with HuggingFace dataset. """ os.environ["OPENAI_API_KEY"] = api_key os.environ["ANTHROPIC_API_KEY"] = anthropic_api_key dataset_manager = get_dataset_manager() # Load focus document from PDF or file path if isinstance(focus_input, st.runtime.uploaded_file_manager.UploadedFile): focus_docs = load_documents_from_pdf(focus_input) focus_context = "\n\n".join([doc.page_content for doc in focus_docs]) elif isinstance(focus_input, str) and os.path.isdir(focus_input): # Extract plan name from vector store path plan_name = os.path.basename(focus_input).replace("_vectorstore", "") if plan_name.endswith("_Summary"): plan_name = plan_name[:-8] # Try to get PDF from HuggingFace dataset pdf_filename = f"{plan_name}.pdf" try: pdf_path = dataset_manager.get_pdf_path(pdf_filename) loader = PyPDFLoader(pdf_path) focus_docs = loader.load() focus_context = "\n\n".join([doc.page_content for doc in focus_docs]) except FileNotFoundError: # Fallback to summary file if PDF not found summary_path = os.path.join("CAPS_Summaries", f"{plan_name}_Summary.md") if os.path.exists(summary_path): with open(summary_path, 'r') as file: focus_context = file.read() else: raise FileNotFoundError(f"Neither PDF nor summary found for plan: {plan_name}") else: raise ValueError("Invalid focus input type.") # Load comparison documents (similar modification) comparison_contexts = [] for comparison_input in comparison_inputs: if isinstance(comparison_input, st.runtime.uploaded_file_manager.UploadedFile): comparison_docs = load_documents_from_pdf(comparison_input) comparison_context = "\n\n".join([doc.page_content for doc in comparison_docs]) comparison_contexts.append(comparison_context) elif isinstance(comparison_input, str) and os.path.isdir(comparison_input): plan_name = os.path.basename(comparison_input).replace("_vectorstore", "") if plan_name.endswith("_Summary"): plan_name = plan_name[:-8] pdf_filename = f"{plan_name}.pdf" try: pdf_path = dataset_manager.get_pdf_path(pdf_filename) loader = PyPDFLoader(pdf_path) comparison_docs = loader.load() comparison_context = "\n\n".join([doc.page_content for doc in comparison_docs]) comparison_contexts.append(comparison_context) except FileNotFoundError: # Fallback to summary file summary_path = os.path.join("CAPS_Summaries", f"{plan_name}_Summary.md") if os.path.exists(summary_path): with open(summary_path, 'r') as file: comparison_context = file.read() comparison_contexts.append(comparison_context) else: st.warning(f"Neither PDF nor summary found for comparison plan: {plan_name}") else: raise ValueError("Invalid comparison input type.") # Rest of the function remains the same... all_comparison_content = "\n\n---\n\n".join(comparison_contexts) client = anthropic.Anthropic(api_key=anthropic_api_key) response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[ { "role": "user", "content": f"{input_text}\n\nFocus Document:\n{focus_context}\n\nComparison Documents:\n{all_comparison_content}" } ] ) answer = response.content[0].text display_placeholder.markdown(f"**Answer:**\n{answer}", unsafe_allow_html=True) # ============================================================================= # MAP AND PLAN UTILITIES # ============================================================================= def generate_legend_html(region_colors): """ Generate HTML code for an EPA region legend. Args: region_colors (dict): Mapping of region to color. Returns: str: HTML string for the legend. """ legend_html = """
EPA Regions
""" for region, color in region_colors.items(): legend_html += ( f'Region {region}
' ) legend_html += "
" return legend_html def format_plan_name(plan, state_abbr): """ Format a plan string using a state abbreviation to match vector store naming. Example: Input: "Oakland, 2020, Mitigation Primary CAP" and state_abbr "CA" Output: "Oakland, CA Mitigation Primary CAP 2020" Args: plan (str): Plan description. state_abbr (str): State abbreviation. Returns: str: Formatted plan name. """ parts = [p.strip() for p in plan.split(",")] if len(parts) == 3: city, year, title = parts return f"{city}, {state_abbr} {title} {year}" return plan def add_city_markers(map_object): """ Add city markers with plan information to a Folium map. This function assumes the existence of two global DataFrames: - city_mapping_df: Contains 'CityName', 'StateName', 'Latitude', 'Longitude' - city_plans_df: Contains plan information per city and state. Args: map_object (folium.Map): The Folium map to add markers to. """ city_markers = folium.FeatureGroup(name="City Markers", show=False) for _, row in city_mapping_df[['City', 'State', 'Latitude', 'Longitude']].drop_duplicates().iterrows(): lat, lon = row["Latitude"], row["Longitude"] city, state = row["City"], row["State"] plans_row = city_plans_df[(city_plans_df["City"] == city) & (city_plans_df["State"] == state)] if not plans_row.empty: plan_list = plans_row.iloc[0]["plan_list"] plan_lines = "".join([f"
  • {plan}
  • " for plan in plan_list]) popup_html = f"{city}, {state}
    " else: popup_html = f"{city}, {state}
    No plans found" popup = folium.Popup(popup_html, max_width=500) folium.CircleMarker( location=[lat, lon], radius=3, color="darkgreen", fill=True, fill_color="darkgreen", fill_opacity=0.7, popup=popup, tooltip=f"{city}, {state}" ).add_to(city_markers) map_object.add_child(city_markers) def maps_qa(api_key, user_input, extra_context, plan_list, state_abbr, epa_region): """ Answer a user's question about maps and plans by retrieving document chunks from individual, combined, and EPA region vector stores, plus extra context and region cities. Now, the EPA region vector store is a single combined store, and an additional CSV file listing the cities in the region is also added as a document. """ # Set API key os.environ["OPENAI_API_KEY"] = api_key all_documents = [] # 1. Retrieve and aggregate individual plan vector store results individual_texts = [] for plan in plan_list: formatted_plan = format_plan_name(plan, state_abbr) vectorstore_path = os.path.join("Individual_All_Vectorstores", formatted_plan + "_vectorstore") try: embedding_model = OpenAIEmbeddings(model="text-embedding-3-large") vector_store = FAISS.load_local(vectorstore_path, embedding_model, allow_dangerous_deserialization=True) except Exception as e: st.error(f"Error loading vector store for plan '{formatted_plan}': {e}") continue retriever = vector_store.as_retriever(search_kwargs={"k": 1}) retrieved_chunks = retriever.invoke(user_input) if retrieved_chunks: # Combine all retrieved chunks for this plan into one text individual_texts.append(" ".join([chunk.page_content for chunk in retrieved_chunks])) aggregated_individual_text = "\n".join(individual_texts) if individual_texts else "" individual_doc = Document(page_content=aggregated_individual_text) all_documents.append(individual_doc) # 2. Retrieve from combined summary vector store (using k=5 and taking the first result) combined_vectorstore_path = "Combined_Summary_Vectorstore" try: embedding_model = OpenAIEmbeddings(model="text-embedding-3-large") combined_vector_store = FAISS.load_local(combined_vectorstore_path, embedding_model, allow_dangerous_deserialization=True) except Exception as e: st.error(f"Error loading combined vector store: {e}") combined_doc = Document(page_content="") else: combined_retriever = combined_vector_store.as_retriever(search_kwargs={"k": 5}) combined_retrieved_chunks = combined_retriever.invoke(user_input) combined_doc = combined_retrieved_chunks[0] if combined_retrieved_chunks else Document(page_content="") all_documents.append(combined_doc) # 3. Retrieve from combined EPA region vector store # Note: We now expect one vector store per region. region_vectorstore_path = os.path.join("Combined_By_Region_Vectorstores", f"Region_{epa_region}", f"Region_{epa_region}_vectorstore") try: embedding_model = OpenAIEmbeddings(model="text-embedding-3-large") region_vector_store = FAISS.load_local(region_vectorstore_path, embedding_model, allow_dangerous_deserialization=True) region_retriever = region_vector_store.as_retriever(search_kwargs={"k": 1}) retrieved_chunks = region_retriever.invoke(user_input) aggregated_region_text = " ".join([chunk.page_content for chunk in retrieved_chunks]) if retrieved_chunks else "" except Exception as e: st.error(f"Error loading region vector store for EPA region '{epa_region}': {e}") aggregated_region_text = "" region_doc = Document(page_content=aggregated_region_text) all_documents.append(region_doc) # 4. Append extra context as its own document extra_context_doc = Document(page_content=extra_context) all_documents.append(extra_context_doc) # 5. Add the cities document (read CSV listing cities in the region) cities_csv_path = os.path.join("Combined_By_Region_Vectorstores", f"Region_{epa_region}", f"Region_{epa_region}_cities.csv") if os.path.exists(cities_csv_path): with open(cities_csv_path, "r") as f: cities_info = f.read() else: cities_info = f"No city information available for EPA region {epa_region}." cities_doc = Document(page_content=f"Cities in EPA Region {epa_region}:\n{cities_info}") all_documents.append(cities_doc) # Load the system prompt for maps QA prompt_path = "Prompts/maps_qa.md" if os.path.exists(prompt_path): with open(prompt_path, "r") as file: system_prompt = file.read() else: raise FileNotFoundError(f"The specified file was not found: {prompt_path}") # Update prompt to accept both "context" and "input" prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("human", "{context}\n\nQuestion: {input}") ]) llm = ChatOpenAI(model="gpt-4o") question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context") result = question_answer_chain.invoke({"input": user_input, "context": all_documents}) return result["answer"] if "answer" in result else result