Chatbot_Excel / app.py
dadashzadeh's picture
Update app.py
63dae76 verified
raw
history blame
17.4 kB
import streamlit as st
import pandas as pd
import os
import tempfile
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter # Fixed import
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
import requests
from bs4 import BeautifulSoup
from langchain.agents import initialize_agent, Tool, AgentType
from operator import itemgetter
# --- Page Configuration ---
st.set_page_config(
page_title="Chatbot Excel",
page_icon="📊",
layout="centered",
initial_sidebar_state="expanded"
)
# --- Custom Styles ---
st.markdown(
"""
<style>
.stButton > button {
background-color: #007bff;
color: white;
border: none;
border-radius: 5px;
padding: 0.5em 1em;
font-size: 1em;
font-weight: 600;
}
.greeting-text {
font-size: 2.5em;
color: transparent;
background-image: linear-gradient(90deg, #00529B, #00A9E0);
-webkit-background-clip: text;
font-weight: 600;
text-align: center !important;
}
.sidebar .sidebar-content {
background-color: #f0f2f6;
}
</style>
""",
unsafe_allow_html=True
)
# --- Helper function to format chat history ---
def format_chat_history(chat_messages_list):
"""Formats chat history for LLM prompt, excluding the last user message."""
history_for_prompt = chat_messages_list[:-1]
if not history_for_prompt:
return "No conversation history available."
formatted_history = []
for msg in history_for_prompt:
role_label = "User" if msg["role"] == "user" else "Assistant"
formatted_history.append(f"{role_label}: {msg['content']}")
return "\n".join(formatted_history)
# --- Bing Web Search Function ---
def bing_search_tool_function(query: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
quoted_query = requests.utils.quote(query)
search_url = f"https://www.bing.com/search?q={quoted_query}&qs=HS&pq=se&sc=10-2&cvid=C9D3906F723C49862C937B28F8106C8C&FORM=QBLH&sp=1&lq=0"
results_list = []
try:
response = requests.get(search_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
for item in soup.find_all("li", attrs={"class": "b_algo"}):
title_tag = item.find("h2")
title = title_tag.get_text().strip() if title_tag else "No title"
link_tag = title_tag.find("a") if title_tag else None
link = link_tag["href"] if link_tag else "No link"
description_text = "No description available."
description_tag = item.find(class_="b_caption")
if description_tag:
description_text = description_tag.get_text().strip()
elif item.find("p"):
caption_div = item.find("p")
if caption_div:
description_text = caption_div.get_text().strip()
description_text = description_text + " url:" + link
if title != "No title" or description_text != "No description available.":
results_list.append({"title": title, "description": description_text})
if not results_list:
return "Unfortunately, I couldn't find any results on the web for this search."
formatted_output = "\n\n".join([
f"Title: {res['title']}\nDescription: {res['description']}"
for res in results_list[:5]
])
return formatted_output
except requests.exceptions.RequestException as e:
return f"Network error in web search: {e}"
except Exception as e:
return f"Error parsing search results: {e}"
# --- Main function for Q&A from data and web search ---
def chat_with_data_and_web(api_key, base_url):
st.write('<div class="greeting-text">Hello! Welcome to Chatbot Excel.</div>', unsafe_allow_html=True)
# --- Function to completely reset chat and RAG state ---
def reset_all_chat_and_rag_state():
keys_to_clear = ['rag_initialized_for_file', 'retriever', 'chat_messages']
for key in keys_to_clear:
if key in st.session_state:
del st.session_state[key]
st.session_state.chat_messages = [{"role": "assistant", "content": "Hello! Upload a file to get started or ask me to search the web."}]
# Reset file_uploader by changing its key
if 'uploader_key_suffix_counter' not in st.session_state:
st.session_state.uploader_key_suffix_counter = 0
st.session_state.uploader_key_suffix_counter += 1
st.rerun()
# --- Function to reset RAG state on file change (without clearing chat history) ---
def reset_rag_on_file_change():
keys_to_clear = ['rag_initialized_for_file', 'retriever']
for key in keys_to_clear:
if key in st.session_state:
del st.session_state[key]
if "chat_messages" not in st.session_state:
st.session_state.chat_messages = []
st.session_state.chat_messages.append({
"role": "assistant",
"content": "New file detected. Preparing for Q&A..."
})
# --- "Start New Chat" button in sidebar ---
if st.sidebar.button("Start New Chat", key="new_chat_btn"):
reset_all_chat_and_rag_state()
# Initialize uploader key counter if not exists
if 'uploader_key_suffix_counter' not in st.session_state:
st.session_state.uploader_key_suffix_counter = 0
current_uploader_key = f"main_file_uploader_{st.session_state.uploader_key_suffix_counter}"
uploaded_file = st.file_uploader(
"Upload Excel or CSV file (optional for web search):",
type=["xlsx", "csv"],
key=current_uploader_key,
on_change=reset_rag_on_file_change
)
if "chat_messages" not in st.session_state:
st.session_state.chat_messages = [{"role": "assistant", "content": "Upload a file to ask questions about its content, or ask me to search the web."}]
for msg in st.session_state.chat_messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
# --- File processing logic ---
if uploaded_file is not None and st.session_state.get('rag_initialized_for_file') != uploaded_file.name:
with st.spinner(f"Processing file {uploaded_file.name} for Q&A..."):
tmp_file_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file:
tmp_file.write(uploaded_file.getvalue())
tmp_file_path = tmp_file.name
if uploaded_file.name.endswith(".xlsx"):
df = pd.read_excel(tmp_file_path)
elif uploaded_file.name.endswith(".csv"):
df = pd.read_csv(tmp_file_path, encoding='utf-8')
else:
st.error("File type not supported.")
if 'rag_initialized_for_file' in st.session_state: del st.session_state['rag_initialized_for_file']
if 'retriever' in st.session_state: del st.session_state.retriever
return
st.write("### Data Preview (first 5 rows):")
st.dataframe(df.head())
if df.empty:
st.warning("The uploaded file is empty. Q&A from file will not be effective.")
st.session_state.rag_initialized_for_file = "empty_file_" + uploaded_file.name
st.session_state.chat_messages.append({"role": "assistant", "content": f"File '{uploaded_file.name}' is empty. Would you like me to search the web?"})
st.rerun()
return
documents_for_rag = [f"Row {idx}: " + ", ".join([f"{col}: {str(val)}" for col, val in row.items() if pd.notna(val)]) for idx, row in df.iterrows()]
if not documents_for_rag:
st.warning("No data extracted from file for Q&A.")
st.session_state.rag_initialized_for_file = "no_docs_" + uploaded_file.name
st.session_state.chat_messages.append({"role": "assistant", "content": f"No data extracted from file '{uploaded_file.name}' for processing."})
st.rerun()
return
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)
all_splits = text_splitter.create_documents(documents_for_rag)
if not all_splits:
st.warning("Text splitting resulted in no chunks. File may be too small for Q&A.")
st.session_state.rag_initialized_for_file = "no_splits_" + uploaded_file.name
st.session_state.chat_messages.append({"role": "assistant", "content": f"File '{uploaded_file.name}' was too small for splitting and processing."})
st.rerun()
return
embeddings_model = OpenAIEmbeddings(
openai_api_key=api_key,
model="text-embedding-3-small",
base_url=base_url
)
vectorstore = FAISS.from_documents(all_splits, embeddings_model)
st.session_state.retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
st.session_state.rag_initialized_for_file = uploaded_file.name
st.session_state.chat_messages.append({
"role": "assistant",
"content": f"File '{uploaded_file.name}' successfully processed. You can now ask questions about its content or request web searches."
})
st.rerun()
except Exception as e:
st.error(f"Error processing file for Q&A: {e}")
if 'rag_initialized_for_file' in st.session_state: del st.session_state['rag_initialized_for_file']
if 'retriever' in st.session_state: del st.session_state.retriever
st.session_state.chat_messages.append({"role": "assistant", "content": f"Error processing file: {e}"})
st.rerun()
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
if prompt := st.chat_input("Ask your question or tell me to search the web:"):
st.session_state.chat_messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
message_placeholder = st.empty()
message_placeholder.markdown("Thinking...")
search_keywords = ["search for", "look up", "find", "search", "web search"]
is_search_request = any(prompt.lower().startswith(kw) for kw in search_keywords)
search_query = prompt
for kw in search_keywords:
if prompt.lower().startswith(kw):
search_query = prompt[len(kw):].strip()
break
response_text = ""
try:
if not api_key:
error_msg = "OpenAI API key not provided. Please enter it in the sidebar."
message_placeholder.error(error_msg)
st.session_state.chat_messages.append({"role": "assistant", "content": error_msg})
return
llm_rag = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.2,
openai_api_key=api_key,
base_url=base_url
)
if is_search_request and search_query:
message_placeholder.markdown(f"Searching the web for: '{search_query}' using agent...")
llm_for_agent = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.7,
openai_api_key=api_key,
base_url=base_url
)
tools = [
Tool(
name="BingSearch",
func=bing_search_tool_function,
description="Search the web using Bing and provide detailed results"
)
]
agent = initialize_agent(
tools,
llm_for_agent,
agent_type=AgentType.REACT_DOCSTORE,
verbose=True,
handle_parsing_errors=True
)
try:
response_text = agent.run(search_query)
except Exception as agent_exc:
st.error(f"Error running search agent: {agent_exc}")
response_text = f"Unfortunately, there was a problem processing the web search: {agent_exc}"
elif uploaded_file and 'retriever' in st.session_state and st.session_state.get('rag_initialized_for_file') == uploaded_file.name:
retriever = st.session_state.retriever
rag_prompt_template = """Based on the previous conversation history and the text below extracted from the uploaded file, answer the user's question.
If the information is not available in the text, state that the information was not found in the provided data. Do not make up an answer.
Conversation History:
{chat_history}
Extracted Text:
{context}
Current User Question: {question}
Answer:"""
rag_prompt = PromptTemplate.from_template(rag_prompt_template)
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{
"context": itemgetter("question") | retriever | format_docs,
"question": itemgetter("question"),
"chat_history": itemgetter("chat_history")
}
| rag_prompt
| llm_rag
| StrOutputParser()
)
formatted_history = format_chat_history(st.session_state.chat_messages)
response_text = rag_chain.invoke({
"question": prompt,
"chat_history": formatted_history
})
elif uploaded_file and st.session_state.get('rag_initialized_for_file') != uploaded_file.name:
response_text = f"File '{uploaded_file.name}' is still being processed or encountered an issue. Please wait or re-upload. You can also ask me to search the web."
elif not uploaded_file and not is_search_request:
response_text = "Please upload a file to ask questions about its content, or ask me to search the web (e.g., 'search for...')."
elif not search_query and is_search_request:
response_text = "It seems you wanted to search, but didn't specify what to search for. Please try again, e.g., 'search for latest tech news'."
message_placeholder.markdown(response_text)
st.session_state.chat_messages.append({"role": "assistant", "content": response_text})
except Exception as e:
error_msg = f"An error occurred: {e}"
st.error(error_msg)
if not response_text:
message_placeholder.markdown(f"Unfortunately, something went wrong: {e}")
st.session_state.chat_messages.append({"role": "assistant", "content": f"Unfortunately, something went wrong: {e}"})
elif not uploaded_file and len(st.session_state.get("chat_messages", [])) <= 1:
st.info("Upload an Excel or CSV file to chat about its data, or tell me to search the web.")
# --- Main UI Layout ---
def main():
st.sidebar.info("Ask questions about uploaded data or request web searches.")
st.title("Chatbot Excel")
api_key = st.sidebar.text_input("Enter your OpenAI API Key:", type="password", key="main_api_key_input")
base_url = st.sidebar.text_input("Enter API Base URL:", key="base_url_input")
if api_key and base_url:
chat_with_data_and_web(api_key, base_url)
else:
st.sidebar.warning("Please enter your OpenAI API key and base URL to continue.")
st.info("Please enter your OpenAI API key and base URL in the sidebar to use the application features.")
if __name__ == "__main__":
main()