Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| import tempfile | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter # Fixed import | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain_core.output_parsers import StrOutputParser | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from langchain.agents import initialize_agent, Tool, AgentType | |
| from operator import itemgetter | |
| # --- Page Configuration --- | |
| st.set_page_config( | |
| page_title="Chatbot Excel", | |
| page_icon="📊", | |
| layout="centered", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # --- Custom Styles --- | |
| st.markdown( | |
| """ | |
| <style> | |
| .stButton > button { | |
| background-color: #007bff; | |
| color: white; | |
| border: none; | |
| border-radius: 5px; | |
| padding: 0.5em 1em; | |
| font-size: 1em; | |
| font-weight: 600; | |
| } | |
| .greeting-text { | |
| font-size: 2.5em; | |
| color: transparent; | |
| background-image: linear-gradient(90deg, #00529B, #00A9E0); | |
| -webkit-background-clip: text; | |
| font-weight: 600; | |
| text-align: center !important; | |
| } | |
| .sidebar .sidebar-content { | |
| background-color: #f0f2f6; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # --- Helper function to format chat history --- | |
| def format_chat_history(chat_messages_list): | |
| """Formats chat history for LLM prompt, excluding the last user message.""" | |
| history_for_prompt = chat_messages_list[:-1] | |
| if not history_for_prompt: | |
| return "No conversation history available." | |
| formatted_history = [] | |
| for msg in history_for_prompt: | |
| role_label = "User" if msg["role"] == "user" else "Assistant" | |
| formatted_history.append(f"{role_label}: {msg['content']}") | |
| return "\n".join(formatted_history) | |
| # --- Bing Web Search Function --- | |
| def bing_search_tool_function(query: str) -> str: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| quoted_query = requests.utils.quote(query) | |
| search_url = f"https://www.bing.com/search?q={quoted_query}&qs=HS&pq=se&sc=10-2&cvid=C9D3906F723C49862C937B28F8106C8C&FORM=QBLH&sp=1&lq=0" | |
| results_list = [] | |
| try: | |
| response = requests.get(search_url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| for item in soup.find_all("li", attrs={"class": "b_algo"}): | |
| title_tag = item.find("h2") | |
| title = title_tag.get_text().strip() if title_tag else "No title" | |
| link_tag = title_tag.find("a") if title_tag else None | |
| link = link_tag["href"] if link_tag else "No link" | |
| description_text = "No description available." | |
| description_tag = item.find(class_="b_caption") | |
| if description_tag: | |
| description_text = description_tag.get_text().strip() | |
| elif item.find("p"): | |
| caption_div = item.find("p") | |
| if caption_div: | |
| description_text = caption_div.get_text().strip() | |
| description_text = description_text + " url:" + link | |
| if title != "No title" or description_text != "No description available.": | |
| results_list.append({"title": title, "description": description_text}) | |
| if not results_list: | |
| return "Unfortunately, I couldn't find any results on the web for this search." | |
| formatted_output = "\n\n".join([ | |
| f"Title: {res['title']}\nDescription: {res['description']}" | |
| for res in results_list[:5] | |
| ]) | |
| return formatted_output | |
| except requests.exceptions.RequestException as e: | |
| return f"Network error in web search: {e}" | |
| except Exception as e: | |
| return f"Error parsing search results: {e}" | |
| # --- Main function for Q&A from data and web search --- | |
| def chat_with_data_and_web(api_key, base_url): | |
| st.write('<div class="greeting-text">Hello! Welcome to Chatbot Excel.</div>', unsafe_allow_html=True) | |
| # --- Function to completely reset chat and RAG state --- | |
| def reset_all_chat_and_rag_state(): | |
| keys_to_clear = ['rag_initialized_for_file', 'retriever', 'chat_messages'] | |
| for key in keys_to_clear: | |
| if key in st.session_state: | |
| del st.session_state[key] | |
| st.session_state.chat_messages = [{"role": "assistant", "content": "Hello! Upload a file to get started or ask me to search the web."}] | |
| # Reset file_uploader by changing its key | |
| if 'uploader_key_suffix_counter' not in st.session_state: | |
| st.session_state.uploader_key_suffix_counter = 0 | |
| st.session_state.uploader_key_suffix_counter += 1 | |
| st.rerun() | |
| # --- Function to reset RAG state on file change (without clearing chat history) --- | |
| def reset_rag_on_file_change(): | |
| keys_to_clear = ['rag_initialized_for_file', 'retriever'] | |
| for key in keys_to_clear: | |
| if key in st.session_state: | |
| del st.session_state[key] | |
| if "chat_messages" not in st.session_state: | |
| st.session_state.chat_messages = [] | |
| st.session_state.chat_messages.append({ | |
| "role": "assistant", | |
| "content": "New file detected. Preparing for Q&A..." | |
| }) | |
| # --- "Start New Chat" button in sidebar --- | |
| if st.sidebar.button("Start New Chat", key="new_chat_btn"): | |
| reset_all_chat_and_rag_state() | |
| # Initialize uploader key counter if not exists | |
| if 'uploader_key_suffix_counter' not in st.session_state: | |
| st.session_state.uploader_key_suffix_counter = 0 | |
| current_uploader_key = f"main_file_uploader_{st.session_state.uploader_key_suffix_counter}" | |
| uploaded_file = st.file_uploader( | |
| "Upload Excel or CSV file (optional for web search):", | |
| type=["xlsx", "csv"], | |
| key=current_uploader_key, | |
| on_change=reset_rag_on_file_change | |
| ) | |
| if "chat_messages" not in st.session_state: | |
| st.session_state.chat_messages = [{"role": "assistant", "content": "Upload a file to ask questions about its content, or ask me to search the web."}] | |
| for msg in st.session_state.chat_messages: | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"]) | |
| # --- File processing logic --- | |
| if uploaded_file is not None and st.session_state.get('rag_initialized_for_file') != uploaded_file.name: | |
| with st.spinner(f"Processing file {uploaded_file.name} for Q&A..."): | |
| tmp_file_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| tmp_file_path = tmp_file.name | |
| if uploaded_file.name.endswith(".xlsx"): | |
| df = pd.read_excel(tmp_file_path) | |
| elif uploaded_file.name.endswith(".csv"): | |
| df = pd.read_csv(tmp_file_path, encoding='utf-8') | |
| else: | |
| st.error("File type not supported.") | |
| if 'rag_initialized_for_file' in st.session_state: del st.session_state['rag_initialized_for_file'] | |
| if 'retriever' in st.session_state: del st.session_state.retriever | |
| return | |
| st.write("### Data Preview (first 5 rows):") | |
| st.dataframe(df.head()) | |
| if df.empty: | |
| st.warning("The uploaded file is empty. Q&A from file will not be effective.") | |
| st.session_state.rag_initialized_for_file = "empty_file_" + uploaded_file.name | |
| st.session_state.chat_messages.append({"role": "assistant", "content": f"File '{uploaded_file.name}' is empty. Would you like me to search the web?"}) | |
| st.rerun() | |
| return | |
| documents_for_rag = [f"Row {idx}: " + ", ".join([f"{col}: {str(val)}" for col, val in row.items() if pd.notna(val)]) for idx, row in df.iterrows()] | |
| if not documents_for_rag: | |
| st.warning("No data extracted from file for Q&A.") | |
| st.session_state.rag_initialized_for_file = "no_docs_" + uploaded_file.name | |
| st.session_state.chat_messages.append({"role": "assistant", "content": f"No data extracted from file '{uploaded_file.name}' for processing."}) | |
| st.rerun() | |
| return | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
| all_splits = text_splitter.create_documents(documents_for_rag) | |
| if not all_splits: | |
| st.warning("Text splitting resulted in no chunks. File may be too small for Q&A.") | |
| st.session_state.rag_initialized_for_file = "no_splits_" + uploaded_file.name | |
| st.session_state.chat_messages.append({"role": "assistant", "content": f"File '{uploaded_file.name}' was too small for splitting and processing."}) | |
| st.rerun() | |
| return | |
| embeddings_model = OpenAIEmbeddings( | |
| openai_api_key=api_key, | |
| model="text-embedding-3-small", | |
| base_url=base_url | |
| ) | |
| vectorstore = FAISS.from_documents(all_splits, embeddings_model) | |
| st.session_state.retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) | |
| st.session_state.rag_initialized_for_file = uploaded_file.name | |
| st.session_state.chat_messages.append({ | |
| "role": "assistant", | |
| "content": f"File '{uploaded_file.name}' successfully processed. You can now ask questions about its content or request web searches." | |
| }) | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error processing file for Q&A: {e}") | |
| if 'rag_initialized_for_file' in st.session_state: del st.session_state['rag_initialized_for_file'] | |
| if 'retriever' in st.session_state: del st.session_state.retriever | |
| st.session_state.chat_messages.append({"role": "assistant", "content": f"Error processing file: {e}"}) | |
| st.rerun() | |
| finally: | |
| if tmp_file_path and os.path.exists(tmp_file_path): | |
| os.remove(tmp_file_path) | |
| if prompt := st.chat_input("Ask your question or tell me to search the web:"): | |
| st.session_state.chat_messages.append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): | |
| st.markdown(prompt) | |
| with st.chat_message("assistant"): | |
| message_placeholder = st.empty() | |
| message_placeholder.markdown("Thinking...") | |
| search_keywords = ["search for", "look up", "find", "search", "web search"] | |
| is_search_request = any(prompt.lower().startswith(kw) for kw in search_keywords) | |
| search_query = prompt | |
| for kw in search_keywords: | |
| if prompt.lower().startswith(kw): | |
| search_query = prompt[len(kw):].strip() | |
| break | |
| response_text = "" | |
| try: | |
| if not api_key: | |
| error_msg = "OpenAI API key not provided. Please enter it in the sidebar." | |
| message_placeholder.error(error_msg) | |
| st.session_state.chat_messages.append({"role": "assistant", "content": error_msg}) | |
| return | |
| llm_rag = ChatOpenAI( | |
| model="gpt-4o-mini", | |
| temperature=0.2, | |
| openai_api_key=api_key, | |
| base_url=base_url | |
| ) | |
| if is_search_request and search_query: | |
| message_placeholder.markdown(f"Searching the web for: '{search_query}' using agent...") | |
| llm_for_agent = ChatOpenAI( | |
| model="gpt-4o-mini", | |
| temperature=0.7, | |
| openai_api_key=api_key, | |
| base_url=base_url | |
| ) | |
| tools = [ | |
| Tool( | |
| name="BingSearch", | |
| func=bing_search_tool_function, | |
| description="Search the web using Bing and provide detailed results" | |
| ) | |
| ] | |
| agent = initialize_agent( | |
| tools, | |
| llm_for_agent, | |
| agent_type=AgentType.REACT_DOCSTORE, | |
| verbose=True, | |
| handle_parsing_errors=True | |
| ) | |
| try: | |
| response_text = agent.run(search_query) | |
| except Exception as agent_exc: | |
| st.error(f"Error running search agent: {agent_exc}") | |
| response_text = f"Unfortunately, there was a problem processing the web search: {agent_exc}" | |
| elif uploaded_file and 'retriever' in st.session_state and st.session_state.get('rag_initialized_for_file') == uploaded_file.name: | |
| retriever = st.session_state.retriever | |
| rag_prompt_template = """Based on the previous conversation history and the text below extracted from the uploaded file, answer the user's question. | |
| If the information is not available in the text, state that the information was not found in the provided data. Do not make up an answer. | |
| Conversation History: | |
| {chat_history} | |
| Extracted Text: | |
| {context} | |
| Current User Question: {question} | |
| Answer:""" | |
| rag_prompt = PromptTemplate.from_template(rag_prompt_template) | |
| def format_docs(docs): | |
| return "\n\n".join(doc.page_content for doc in docs) | |
| rag_chain = ( | |
| { | |
| "context": itemgetter("question") | retriever | format_docs, | |
| "question": itemgetter("question"), | |
| "chat_history": itemgetter("chat_history") | |
| } | |
| | rag_prompt | |
| | llm_rag | |
| | StrOutputParser() | |
| ) | |
| formatted_history = format_chat_history(st.session_state.chat_messages) | |
| response_text = rag_chain.invoke({ | |
| "question": prompt, | |
| "chat_history": formatted_history | |
| }) | |
| elif uploaded_file and st.session_state.get('rag_initialized_for_file') != uploaded_file.name: | |
| response_text = f"File '{uploaded_file.name}' is still being processed or encountered an issue. Please wait or re-upload. You can also ask me to search the web." | |
| elif not uploaded_file and not is_search_request: | |
| response_text = "Please upload a file to ask questions about its content, or ask me to search the web (e.g., 'search for...')." | |
| elif not search_query and is_search_request: | |
| response_text = "It seems you wanted to search, but didn't specify what to search for. Please try again, e.g., 'search for latest tech news'." | |
| message_placeholder.markdown(response_text) | |
| st.session_state.chat_messages.append({"role": "assistant", "content": response_text}) | |
| except Exception as e: | |
| error_msg = f"An error occurred: {e}" | |
| st.error(error_msg) | |
| if not response_text: | |
| message_placeholder.markdown(f"Unfortunately, something went wrong: {e}") | |
| st.session_state.chat_messages.append({"role": "assistant", "content": f"Unfortunately, something went wrong: {e}"}) | |
| elif not uploaded_file and len(st.session_state.get("chat_messages", [])) <= 1: | |
| st.info("Upload an Excel or CSV file to chat about its data, or tell me to search the web.") | |
| # --- Main UI Layout --- | |
| def main(): | |
| st.sidebar.info("Ask questions about uploaded data or request web searches.") | |
| st.title("Chatbot Excel") | |
| api_key = st.sidebar.text_input("Enter your OpenAI API Key:", type="password", key="main_api_key_input") | |
| base_url = st.sidebar.text_input("Enter API Base URL:", key="base_url_input") | |
| if api_key and base_url: | |
| chat_with_data_and_web(api_key, base_url) | |
| else: | |
| st.sidebar.warning("Please enter your OpenAI API key and base URL to continue.") | |
| st.info("Please enter your OpenAI API key and base URL in the sidebar to use the application features.") | |
| if __name__ == "__main__": | |
| main() | |