climate-policy-tracker / app_helpers.py
umangchaudhry's picture
Update app_helpers.py
9c3ebda verified
import os
import re
from tempfile import NamedTemporaryFile
import streamlit as st
import pandas as pd
import folium
from streamlit_folium import st_folium
import anthropic
# Import necessary modules from LangChain and community extensions
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.docstore.document import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_community.vectorstores import FAISS
# =============================================================================
# DATA LOADING UTILITIES
# =============================================================================
@st.cache_data
def load_pickle_data(file_path):
"""Load and cache a pickle file from the given path."""
return pd.read_pickle(file_path)
city_mapping_df = load_pickle_data("./maps_helpers/city_mapping_df.pkl")
city_plans_df = load_pickle_data("./maps_helpers/city_plans_df.pkl")
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def remove_code_blocks(text):
"""
Remove code block markers (triple backticks) from the text.
Args:
text (str): The text to clean.
Returns:
str: Cleaned text without code block markers.
"""
code_block_pattern = r"^```(?:\w+)?\n(.*?)\n```$"
match = re.match(code_block_pattern, text, re.DOTALL)
if match:
return match.group(1).strip()
return text
def load_documents_from_pdf(file):
"""
Load documents from an uploaded PDF file.
Args:
file: Uploaded PDF file.
Returns:
list: List of documents extracted from the PDF.
"""
if not file.name.endswith('.pdf'):
raise ValueError("The uploaded file is not a PDF. Please upload a PDF file.")
with NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
temp_pdf.write(file.read())
temp_pdf_path = temp_pdf.name
loader = PyPDFLoader(temp_pdf_path)
docs = loader.load()
os.remove(temp_pdf_path)
return docs
def load_vector_store_from_path(path):
"""
Load a FAISS vector store from a given directory path.
Args:
path (str): Path to the vector store directory.
Returns:
FAISS: Loaded vector store.
"""
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
return FAISS.load_local(path, embeddings, allow_dangerous_deserialization=True)
def list_vector_store_documents():
"""
List available vector store documents from the 'Individual_All_Vectorstores' directory.
Returns:
list: List of document names.
"""
directory_path = "Individual_All_Vectorstores"
if not os.path.exists(directory_path):
raise FileNotFoundError(
f"The directory '{directory_path}' does not exist. "
"Run `create_and_save_individual_vector_stores()` to create it."
)
documents = [
f.replace("_vectorstore", "").replace("_", " ")
for f in os.listdir(directory_path)
if f.endswith("_vectorstore")
]
return documents
# =============================================================================
# SUMMARY AND QA FUNCTIONS
# =============================================================================
def summary_generation(api_key, uploaded_file, questions_path, prompt_path, display_placeholder):
"""
Generate a summary from a PDF file by processing its content and performing Q&A.
Steps:
- Save the uploaded PDF temporarily.
- Load and split the PDF into document chunks.
- Create a FAISS vector store and set up a retriever.
- Load a system prompt and initialize the language model.
- For each question in the questions file, run retrieval and QA.
- Update the display placeholder with results.
Args:
api_key (str): OpenAI API key.
uploaded_file: Uploaded PDF file.
questions_path (str): Path to the text file with questions.
prompt_path (str): Path to the system prompt file.
display_placeholder: Streamlit placeholder for displaying results.
Returns:
list: List of formatted Q&A results.
"""
os.environ["OPENAI_API_KEY"] = api_key
# Save the uploaded PDF to a temporary file
with NamedTemporaryFile(delete=False, suffix=".pdf") as temp_pdf:
temp_pdf.write(uploaded_file.read())
temp_pdf_path = temp_pdf.name
# Load and split the PDF into documents
loader = PyPDFLoader(temp_pdf_path)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=500)
splits = text_splitter.split_documents(docs)
# Create a FAISS vector store for the document splits
vectorstore = FAISS.from_documents(splits, embedding=OpenAIEmbeddings(model="text-embedding-3-large"))
retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
# Load the system prompt for Q&A
if os.path.exists(prompt_path):
with open(prompt_path, "r") as file:
system_prompt = file.read()
else:
raise FileNotFoundError(f"The specified file was not found: {prompt_path}")
# Create a prompt template with the system and human messages
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}"),
])
# Initialize the language model (using GPT-4o in this case)
llm = ChatOpenAI(model="gpt-4o")
# Create the document chain and retrieval chain for Q&A
question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context")
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
# Load questions from the provided questions file
if os.path.exists(questions_path):
with open(questions_path, "r") as file:
questions = [line.strip() for line in file.readlines() if line.strip()]
else:
raise FileNotFoundError(f"The specified file was not found: {questions_path}")
qa_results = []
for question in questions:
result = rag_chain.invoke({"input": question})
answer = remove_code_blocks(result["answer"])
qa_text = f"### Question: {question}\n**Answer:**\n{answer}\n"
qa_results.append(qa_text)
display_placeholder.markdown("\n".join(qa_results), unsafe_allow_html=True)
# Remove the temporary PDF file
os.remove(temp_pdf_path)
return qa_results
def multi_plan_qa(api_key, input_text, display_placeholder):
"""
Perform multi-plan Q&A using an existing combined vector store.
Args:
api_key (str): OpenAI API key.
input_text (str): The question to ask.
display_placeholder: Streamlit placeholder for displaying results.
"""
os.environ["OPENAI_API_KEY"] = api_key
# Load the combined vector store
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = FAISS.load_local("Combined_Summary_Vectorstore", embeddings, allow_dangerous_deserialization=True)
retriever = vector_store.as_retriever(search_kwargs={"k": 50})
# Load the multi-document Q&A system prompt
prompt_path = "Prompts/multi_document_qa_system_prompt.md"
if os.path.exists(prompt_path):
with open(prompt_path, "r") as file:
system_prompt = file.read()
else:
raise FileNotFoundError(f"The specified file was not found: {prompt_path}")
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}"),
])
llm = ChatOpenAI(model="gpt-4o")
question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context")
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
result = rag_chain.invoke({"input": input_text})
answer = result["answer"]
display_placeholder.markdown(f"**Answer:**\n{answer}")
def multi_plan_qa_multi_vectorstore(api_key, input_text, display_placeholder):
"""
Perform multi-plan Q&A using multiple individual vector stores.
Args:
api_key (str): OpenAI API key.
input_text (str): The question to ask.
display_placeholder: Streamlit placeholder for displaying results.
"""
os.environ["OPENAI_API_KEY"] = api_key
vectorstore_directory = "Individual_Summary_Vectorstores"
vectorstore_names = [
d for d in os.listdir(vectorstore_directory)
if os.path.isdir(os.path.join(vectorstore_directory, d))
]
all_retrieved_chunks = []
for vectorstore_name in vectorstore_names:
vectorstore_path = os.path.join(vectorstore_directory, vectorstore_name)
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = FAISS.load_local(vectorstore_path, embeddings, allow_dangerous_deserialization=True)
retriever = vector_store.as_retriever(search_kwargs={"k": 2})
retrieved_chunks = retriever.invoke(input_text)
all_retrieved_chunks.extend(retrieved_chunks)
# Load system prompt for multi-document QA
prompt_path = "Prompts/multi_document_qa_system_prompt.md"
if os.path.exists(prompt_path):
with open(prompt_path, "r") as file:
system_prompt = file.read()
else:
raise FileNotFoundError(f"The specified file was not found: {prompt_path}")
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}"),
])
llm = ChatOpenAI(model="gpt-4o")
question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context")
result = question_answer_chain.invoke({
"input": input_text,
"context": all_retrieved_chunks
})
answer = result["answer"] if "answer" in result else result
display_placeholder.markdown(f"**Answer:**\n{answer}")
def comparison_qa(api_key, focus_input, comparison_inputs, input_text, display_placeholder):
"""
Compare a focus document against multiple comparison documents using a one-to-many query.
Args:
api_key (str): OpenAI API key.
focus_input: Focus document (uploaded PDF or vector store path).
comparison_inputs: List of comparison documents (uploaded PDFs or vector store paths).
input_text (str): The comparison question to ask.
display_placeholder: Streamlit placeholder for displaying results.
"""
os.environ["OPENAI_API_KEY"] = api_key
# Load focus document or vector store and set up retriever
if isinstance(focus_input, st.runtime.uploaded_file_manager.UploadedFile):
focus_docs = load_documents_from_pdf(focus_input)
splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=500)
focus_splits = splitter.split_documents(focus_docs)
focus_vector_store = FAISS.from_documents(focus_splits, OpenAIEmbeddings(model="text-embedding-3-large"))
elif isinstance(focus_input, str) and os.path.isdir(focus_input):
focus_vector_store = load_vector_store_from_path(focus_input)
else:
raise ValueError("Invalid focus input type. Must be a PDF file or a path to a vector store.")
focus_retriever = focus_vector_store.as_retriever(search_kwargs={"k": 5})
focus_docs = focus_retriever.invoke(input_text)
# Process each comparison input
comparison_chunks = []
for comparison_input in comparison_inputs:
if isinstance(comparison_input, st.runtime.uploaded_file_manager.UploadedFile):
comparison_docs = load_documents_from_pdf(comparison_input)
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=500)
comparison_splits = splitter.split_documents(comparison_docs)
comparison_vector_store = FAISS.from_documents(comparison_splits, OpenAIEmbeddings(model="text-embedding-3-large"))
elif isinstance(comparison_input, str) and os.path.isdir(comparison_input):
comparison_vector_store = load_vector_store_from_path(comparison_input)
else:
raise ValueError("Invalid comparison input type. Must be a PDF file or a path to a vector store.")
comparison_retriever = comparison_vector_store.as_retriever(search_kwargs={"k": 5})
comparison_docs = comparison_retriever.invoke(input_text)
comparison_chunks.extend(comparison_docs)
combined_context = focus_docs + comparison_chunks
# Load system prompt for comparison QA
prompt_path = "Prompts/comparison_prompt.md"
if os.path.exists(prompt_path):
with open(prompt_path, "r") as file:
system_prompt = file.read()
else:
raise FileNotFoundError(f"The specified file was not found: {prompt_path}")
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}")
])
llm = ChatOpenAI(model="gpt-4o")
question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context")
result = question_answer_chain.invoke({
"context": combined_context,
"input": input_text
})
answer = result["answer"] if "answer" in result else result
display_placeholder.markdown(f"**Answer:**\n{answer}")
def document_qa(api_key, focus_input, user_input):
"""
Query a single document (PDF or vector store) to answer a question.
Args:
api_key (str): OpenAI API key.
focus_input: Focus document (uploaded PDF or path to vector store).
user_input (str): The question to ask.
Returns:
str: The model's answer.
"""
os.environ["OPENAI_API_KEY"] = api_key
# Create a vector store from the focus input
if isinstance(focus_input, st.runtime.uploaded_file_manager.UploadedFile):
docs = load_documents_from_pdf(focus_input)
splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=500)
splits = splitter.split_documents(docs)
vector_store = FAISS.from_documents(splits, OpenAIEmbeddings(model="text-embedding-3-large"))
else:
vector_store = load_vector_store_from_path(focus_input)
retriever = vector_store.as_retriever(search_kwargs={"k": 5})
retrieved_chunks = retriever.invoke(user_input)
# Load the multi-document QA system prompt
prompt_path = "Prompts/multi_document_qa_system_prompt.md"
if os.path.exists(prompt_path):
with open(prompt_path, "r") as file:
system_prompt = file.read()
else:
raise FileNotFoundError(f"The specified file was not found: {prompt_path}")
# Create a conversation RAG chain that incorporates chat history
llm = ChatOpenAI(model="gpt-4o")
history_prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}")
])
history_retriever_chain = create_history_aware_retriever(llm, retriever, history_prompt)
answer_prompt = ChatPromptTemplate.from_messages([
("system", "Answer the user's questions based on the below context:\n\n{context}"),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}")
])
document_chain = create_stuff_documents_chain(llm, answer_prompt)
conversation_rag_chain = create_retrieval_chain(history_retriever_chain, document_chain)
response = conversation_rag_chain.invoke({
"chat_history": st.session_state.get("chat_history", []),
"input": user_input,
"context": retrieved_chunks
})
return response["answer"]
def comparison_qa_long_context(api_key, anthropic_api_key, input_text, focus_input, comparison_inputs, display_placeholder):
"""
Compare plans using a long-context language model (e.g., Anthropic Claude).
Modified to work with HuggingFace dataset.
"""
os.environ["OPENAI_API_KEY"] = api_key
os.environ["ANTHROPIC_API_KEY"] = anthropic_api_key
dataset_manager = get_dataset_manager()
# Load focus document from PDF or file path
if isinstance(focus_input, st.runtime.uploaded_file_manager.UploadedFile):
focus_docs = load_documents_from_pdf(focus_input)
focus_context = "\n\n".join([doc.page_content for doc in focus_docs])
elif isinstance(focus_input, str) and os.path.isdir(focus_input):
# Extract plan name from vector store path
plan_name = os.path.basename(focus_input).replace("_vectorstore", "")
if plan_name.endswith("_Summary"):
plan_name = plan_name[:-8]
# Try to get PDF from HuggingFace dataset
pdf_filename = f"{plan_name}.pdf"
try:
pdf_path = dataset_manager.get_pdf_path(pdf_filename)
loader = PyPDFLoader(pdf_path)
focus_docs = loader.load()
focus_context = "\n\n".join([doc.page_content for doc in focus_docs])
except FileNotFoundError:
# Fallback to summary file if PDF not found
summary_path = os.path.join("CAPS_Summaries", f"{plan_name}_Summary.md")
if os.path.exists(summary_path):
with open(summary_path, 'r') as file:
focus_context = file.read()
else:
raise FileNotFoundError(f"Neither PDF nor summary found for plan: {plan_name}")
else:
raise ValueError("Invalid focus input type.")
# Load comparison documents (similar modification)
comparison_contexts = []
for comparison_input in comparison_inputs:
if isinstance(comparison_input, st.runtime.uploaded_file_manager.UploadedFile):
comparison_docs = load_documents_from_pdf(comparison_input)
comparison_context = "\n\n".join([doc.page_content for doc in comparison_docs])
comparison_contexts.append(comparison_context)
elif isinstance(comparison_input, str) and os.path.isdir(comparison_input):
plan_name = os.path.basename(comparison_input).replace("_vectorstore", "")
if plan_name.endswith("_Summary"):
plan_name = plan_name[:-8]
pdf_filename = f"{plan_name}.pdf"
try:
pdf_path = dataset_manager.get_pdf_path(pdf_filename)
loader = PyPDFLoader(pdf_path)
comparison_docs = loader.load()
comparison_context = "\n\n".join([doc.page_content for doc in comparison_docs])
comparison_contexts.append(comparison_context)
except FileNotFoundError:
# Fallback to summary file
summary_path = os.path.join("CAPS_Summaries", f"{plan_name}_Summary.md")
if os.path.exists(summary_path):
with open(summary_path, 'r') as file:
comparison_context = file.read()
comparison_contexts.append(comparison_context)
else:
st.warning(f"Neither PDF nor summary found for comparison plan: {plan_name}")
else:
raise ValueError("Invalid comparison input type.")
# Rest of the function remains the same...
all_comparison_content = "\n\n---\n\n".join(comparison_contexts)
client = anthropic.Anthropic(api_key=anthropic_api_key)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{
"role": "user",
"content": f"{input_text}\n\nFocus Document:\n{focus_context}\n\nComparison Documents:\n{all_comparison_content}"
}
]
)
answer = response.content[0].text
display_placeholder.markdown(f"**Answer:**\n{answer}", unsafe_allow_html=True)
# =============================================================================
# MAP AND PLAN UTILITIES
# =============================================================================
def generate_legend_html(region_colors):
"""
Generate HTML code for an EPA region legend.
Args:
region_colors (dict): Mapping of region to color.
Returns:
str: HTML string for the legend.
"""
legend_html = """
<div style="font-size:14px; opacity: 1;">
<b>EPA Regions</b><br>
"""
for region, color in region_colors.items():
legend_html += (
f'<i style="background:{color}; width:18px; height:18px; '
f'display:inline-block; margin-right:5px;"></i>Region {region}<br>'
)
legend_html += "</div>"
return legend_html
def format_plan_name(plan, state_abbr):
"""
Format a plan string using a state abbreviation to match vector store naming.
Example:
Input: "Oakland, 2020, Mitigation Primary CAP" and state_abbr "CA"
Output: "Oakland, CA Mitigation Primary CAP 2020"
Args:
plan (str): Plan description.
state_abbr (str): State abbreviation.
Returns:
str: Formatted plan name.
"""
parts = [p.strip() for p in plan.split(",")]
if len(parts) == 3:
city, year, title = parts
return f"{city}, {state_abbr} {title} {year}"
return plan
def add_city_markers(map_object):
"""
Add city markers with plan information to a Folium map.
This function assumes the existence of two global DataFrames:
- city_mapping_df: Contains 'CityName', 'StateName', 'Latitude', 'Longitude'
- city_plans_df: Contains plan information per city and state.
Args:
map_object (folium.Map): The Folium map to add markers to.
"""
city_markers = folium.FeatureGroup(name="City Markers", show=False)
for _, row in city_mapping_df[['City', 'State', 'Latitude', 'Longitude']].drop_duplicates().iterrows():
lat, lon = row["Latitude"], row["Longitude"]
city, state = row["City"], row["State"]
plans_row = city_plans_df[(city_plans_df["City"] == city) & (city_plans_df["State"] == state)]
if not plans_row.empty:
plan_list = plans_row.iloc[0]["plan_list"]
plan_lines = "".join([f"<li>{plan}</li>" for plan in plan_list])
popup_html = f"<b>{city}, {state}</b><br><ul>{plan_lines}</ul>"
else:
popup_html = f"<b>{city}, {state}</b><br>No plans found"
popup = folium.Popup(popup_html, max_width=500)
folium.CircleMarker(
location=[lat, lon],
radius=3,
color="darkgreen",
fill=True,
fill_color="darkgreen",
fill_opacity=0.7,
popup=popup,
tooltip=f"{city}, {state}"
).add_to(city_markers)
map_object.add_child(city_markers)
def maps_qa(api_key, user_input, extra_context, plan_list, state_abbr, epa_region):
"""
Answer a user's question about maps and plans by retrieving document chunks from
individual, combined, and EPA region vector stores, plus extra context and region cities.
Now, the EPA region vector store is a single combined store, and an additional
CSV file listing the cities in the region is also added as a document.
"""
# Set API key
os.environ["OPENAI_API_KEY"] = api_key
all_documents = []
# 1. Retrieve and aggregate individual plan vector store results
individual_texts = []
for plan in plan_list:
formatted_plan = format_plan_name(plan, state_abbr)
vectorstore_path = os.path.join("Individual_All_Vectorstores", formatted_plan + "_vectorstore")
try:
embedding_model = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = FAISS.load_local(vectorstore_path, embedding_model, allow_dangerous_deserialization=True)
except Exception as e:
st.error(f"Error loading vector store for plan '{formatted_plan}': {e}")
continue
retriever = vector_store.as_retriever(search_kwargs={"k": 1})
retrieved_chunks = retriever.invoke(user_input)
if retrieved_chunks:
# Combine all retrieved chunks for this plan into one text
individual_texts.append(" ".join([chunk.page_content for chunk in retrieved_chunks]))
aggregated_individual_text = "\n".join(individual_texts) if individual_texts else ""
individual_doc = Document(page_content=aggregated_individual_text)
all_documents.append(individual_doc)
# 2. Retrieve from combined summary vector store (using k=5 and taking the first result)
combined_vectorstore_path = "Combined_Summary_Vectorstore"
try:
embedding_model = OpenAIEmbeddings(model="text-embedding-3-large")
combined_vector_store = FAISS.load_local(combined_vectorstore_path, embedding_model, allow_dangerous_deserialization=True)
except Exception as e:
st.error(f"Error loading combined vector store: {e}")
combined_doc = Document(page_content="")
else:
combined_retriever = combined_vector_store.as_retriever(search_kwargs={"k": 5})
combined_retrieved_chunks = combined_retriever.invoke(user_input)
combined_doc = combined_retrieved_chunks[0] if combined_retrieved_chunks else Document(page_content="")
all_documents.append(combined_doc)
# 3. Retrieve from combined EPA region vector store
# Note: We now expect one vector store per region.
region_vectorstore_path = os.path.join("Combined_By_Region_Vectorstores", f"Region_{epa_region}", f"Region_{epa_region}_vectorstore")
try:
embedding_model = OpenAIEmbeddings(model="text-embedding-3-large")
region_vector_store = FAISS.load_local(region_vectorstore_path, embedding_model, allow_dangerous_deserialization=True)
region_retriever = region_vector_store.as_retriever(search_kwargs={"k": 1})
retrieved_chunks = region_retriever.invoke(user_input)
aggregated_region_text = " ".join([chunk.page_content for chunk in retrieved_chunks]) if retrieved_chunks else ""
except Exception as e:
st.error(f"Error loading region vector store for EPA region '{epa_region}': {e}")
aggregated_region_text = ""
region_doc = Document(page_content=aggregated_region_text)
all_documents.append(region_doc)
# 4. Append extra context as its own document
extra_context_doc = Document(page_content=extra_context)
all_documents.append(extra_context_doc)
# 5. Add the cities document (read CSV listing cities in the region)
cities_csv_path = os.path.join("Combined_By_Region_Vectorstores", f"Region_{epa_region}", f"Region_{epa_region}_cities.csv")
if os.path.exists(cities_csv_path):
with open(cities_csv_path, "r") as f:
cities_info = f.read()
else:
cities_info = f"No city information available for EPA region {epa_region}."
cities_doc = Document(page_content=f"Cities in EPA Region {epa_region}:\n{cities_info}")
all_documents.append(cities_doc)
# Load the system prompt for maps QA
prompt_path = "Prompts/maps_qa.md"
if os.path.exists(prompt_path):
with open(prompt_path, "r") as file:
system_prompt = file.read()
else:
raise FileNotFoundError(f"The specified file was not found: {prompt_path}")
# Update prompt to accept both "context" and "input"
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{context}\n\nQuestion: {input}")
])
llm = ChatOpenAI(model="gpt-4o")
question_answer_chain = create_stuff_documents_chain(llm, prompt, document_variable_name="context")
result = question_answer_chain.invoke({"input": user_input, "context": all_documents})
return result["answer"] if "answer" in result else result