Spaces:
Running
Running
""" | |
Frontend Streamlit application for the chatbot. | |
""" | |
import os | |
os.environ["XDG_CACHE_HOME"] = "/tmp" # Fix the cloud bug | |
os.environ["HF_HOME"] = "/tmp" # Fix the cloud bug | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp" # Fix the cloud bug | |
os.environ["LLAMA_INDEX_CACHE_DIR"] = "/tmp" # Fix the cloud bug | |
import streamlit as st | |
import time | |
import logging | |
import backend | |
import config | |
from config import APP_NAME, PRIMARY_COLOR, SECONDARY_COLOR | |
from utils.logging_config import setup_logging | |
from utils.feedback import feedback_manager # Import the feedback manager | |
import hashlib | |
from datetime import datetime | |
# Set up logging | |
setup_logging() | |
logger = logging.getLogger(__name__) | |
# Constants for memory optimization | |
MAX_MESSAGES = 50 | |
# Set page config | |
st.set_page_config( | |
page_title=APP_NAME, | |
page_icon="π¬", | |
layout="centered", | |
initial_sidebar_state="collapsed" | |
) | |
# Hide Streamlit's default elements and style sidebar | |
st.markdown(""" | |
<style> | |
/* Hide default elements */ | |
button[kind="deploy"], | |
[data-testid="stToolbar"], | |
.stDeployButton, | |
#MainMenu, | |
footer { | |
display: none !important; | |
} | |
/* Style the sidebar */ | |
[data-testid="stSidebar"] { | |
width: 120px !important; | |
background-color: #0e1117 !important; | |
border-right: 1px solid #1e1e1e !important; | |
} | |
/* Style the collapse button to prevent movement and maintain size */ | |
button[kind="menuButton"] { | |
left: 120px !important; | |
margin-left: 0 !important; | |
position: fixed !important; | |
transform: translateX(0) !important; | |
transition: none !important; | |
background-color: #0e1117 !important; | |
color: #fff !important; | |
z-index: 999 !important; | |
} | |
/* Button styling */ | |
[data-testid="stSidebar"] [data-testid="stButton"] button { | |
background-color: #262730 !important; | |
color: white !important; | |
padding: 8px 10px !important; | |
width: 100px !important; | |
font-size: 0.9rem !important; | |
margin: 1rem auto !important; | |
display: block !important; | |
border-radius: 4px !important; | |
white-space: nowrap !important; | |
} | |
[data-testid="stSidebar"] [data-testid="stButton"] button p { | |
text-align: center !important; | |
white-space: nowrap !important; | |
overflow: visible !important; | |
} | |
[data-testid="stSidebar"] [data-testid="stButton"] button:hover { | |
background-color: #1E2130 !important; | |
} | |
/* Feedback button styles */ | |
.feedback-buttons { | |
display: flex; | |
gap: 10px; | |
margin-top: 5px; | |
} | |
.feedback-button { | |
border: none; | |
background: none; | |
cursor: pointer; | |
padding: 5px; | |
border-radius: 4px; | |
transition: background-color 0.3s; | |
} | |
.feedback-button:hover { | |
background-color: #f0f0f0; | |
} | |
.feedback-form { | |
margin-top: 10px; | |
padding: 10px; | |
border-radius: 4px; | |
background-color: #f7f7f7; | |
} | |
/* Custom radio buttons */ | |
.stRadio > div { | |
display: flex; | |
gap: 10px; | |
} | |
.stRadio > div > div { | |
flex: 1; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Helper function to add messages and maintain history length | |
def add_message(role, content): | |
"""Add message to session state and trim if needed.""" | |
# Generate a unique ID for this message based on content | |
message_id = f"{role}_{hashlib.md5(content.encode()).hexdigest()[:8]}" | |
st.session_state.messages.append({ | |
"role": role, | |
"content": content, | |
"id": message_id # Store the ID with the message | |
}) | |
# Trim message history if it gets too large | |
if len(st.session_state.messages) > MAX_MESSAGES: | |
# Keep the most recent messages | |
st.session_state.messages = st.session_state.messages[-MAX_MESSAGES:] | |
logger.info(f"Added {role} message with ID {message_id}. History length: {len(st.session_state.messages)}") | |
# Initialize session state for chat history and UI control | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "clear_clicked" not in st.session_state: | |
st.session_state.clear_clicked = False | |
if "processing" not in st.session_state: | |
st.session_state.processing = False | |
if "feedback" not in st.session_state: | |
st.session_state.feedback = {} # Store feedback data by message index | |
# Create a sidebar with button options | |
with st.sidebar: | |
def clear_chat(): | |
# Clear messages in session state | |
st.session_state.messages = [] | |
# Also clear the chatbot's memory if it exists | |
if "chatbot" in st.session_state and hasattr(st.session_state.chatbot, "reset_chat_history"): | |
st.session_state.chatbot.reset_chat_history() | |
# Set flag to true to trigger page refresh | |
st.session_state.clear_clicked = True | |
logger.info("Chat history cleared via sidebar button") | |
# Define stop processing function | |
def stop_processing(): | |
st.session_state.processing = False | |
logger.info("User requested to stop processing") | |
st.rerun() | |
st.button("Clear Chat", on_click=clear_chat) | |
# Only show Stop button when processing | |
if st.session_state.processing: | |
st.button("Stop", on_click=stop_processing, type="primary") | |
# Handle the clear button click by refreshing the page | |
if st.session_state.clear_clicked: | |
st.session_state.clear_clicked = False | |
st.rerun() | |
# Initialize cached resources for all sessions | |
def initialize_resources(): | |
"""Initialize shared resources once for all sessions.""" | |
logger.info("Initializing shared resources...") | |
# Get configuration from config module | |
chatbot_config = config.get_chatbot_config() | |
api_key = os.getenv("ANTHROPIC_API_KEY") | |
# Load models using cache - using the backend module directly | |
llm = backend.load_llm_model( | |
api_key, | |
chatbot_config.get("model", "claude-3-7-sonnet-20250219"), | |
chatbot_config.get("temperature", 0.1), | |
chatbot_config.get("max_tokens", 2048) | |
) | |
embed_model = backend.load_embedding_model( | |
chatbot_config.get("embedding_model", "sentence-transformers/all-MiniLM-L6-v2"), | |
chatbot_config.get("device", "cpu"), | |
chatbot_config.get("embed_batch_size", 8) | |
) | |
# Load or create index (shared across sessions) | |
index = backend.load_or_create_index() | |
return { | |
"config": chatbot_config, | |
"llm": llm, | |
"embed_model": embed_model, | |
"index": index | |
} | |
# Get shared resources | |
resources = initialize_resources() | |
# Initialize chatbot only in session state if it doesn't exist | |
if "chatbot" not in st.session_state: | |
with st.spinner("Initializing chatbot..."): | |
logger.info("Initializing chatbot with shared resources...") | |
st.session_state.chatbot = backend.Chatbot( | |
resources["config"], | |
resources["llm"], | |
resources["embed_model"], | |
resources["index"] | |
) | |
# Initialize chat engine instead of query engine | |
st.session_state.chatbot.initialize_chat_engine() | |
logger.info("Chatbot initialized successfully") | |
# Only show header/description if there are no user messages yet | |
if not any(msg["role"] == "user" for msg in st.session_state.messages): | |
st.title("Paul's Chatbot v0.5") | |
st.markdown(""" | |
This chatbot can answer questions about your documents. | |
Ask any question about the content in your documents! This demo is focused on a psychology book. Ask it questions about how the human brain works, how to stick with a habit, or how we are easily fooled. | |
""") | |
# Debug information - only show if DEBUG is enabled | |
if config.DEBUG: | |
st.sidebar.write("Debug Info:") | |
st.sidebar.write(f"Number of messages: {len(st.session_state.messages)}") | |
# Show message IDs | |
message_ids = [msg.get("id", "no-id") for msg in st.session_state.messages if msg["role"] == "assistant"] | |
st.sidebar.write(f"Assistant message IDs: {message_ids}") | |
# Show feedback state in a more readable format | |
if st.session_state.feedback: | |
st.sidebar.write("Feedback state:") | |
for msg_id, feedback in st.session_state.feedback.items(): | |
st.sidebar.write(f"- {msg_id}: {feedback}") | |
else: | |
st.sidebar.write("No feedback recorded yet") | |
st.sidebar.write(f"Processing: {st.session_state.processing}") | |
# Function to handle feedback with the feedback manager | |
def handle_feedback(message_id, feedback_type, query, response): | |
logger.info(f"Handling {feedback_type} feedback for message ID: {message_id}") | |
if feedback_type == "positive": | |
feedback_data = { | |
"user_id": "", # Add user tracking if available | |
"query": query, | |
"normalized_query": query.lower().strip() if query else "", | |
"question_hash": hashlib.md5(query.lower().strip().encode()).hexdigest()[:8] if query else "", | |
"response": response, | |
"rating": "positive", | |
"category": "", | |
"comment": "", | |
"tags": "", | |
"status": "open", | |
"admin_note": "", | |
"assigned_to": "", | |
"document_id": "", | |
"source": "user", | |
"priority": "low", | |
"reviewed": False, | |
"timestamp": datetime.now().isoformat() | |
} | |
success = feedback_manager.save_feedback_supabase(feedback_data) | |
if not success: | |
logger.error(f"Failed to save positive feedback for message {message_id}") | |
st.session_state.feedback[message_id] = { | |
"rating": "positive", | |
"submitted": True | |
} | |
logger.info(f"Positive feedback saved for message {message_id}") | |
else: | |
st.session_state.feedback[message_id] = { | |
"rating": "negative", | |
"show_form": True, | |
"submitted": False | |
} | |
logger.info(f"Negative feedback started for message {message_id}, showing form") | |
logger.info(f"Current feedback state: {st.session_state.feedback}") | |
# Function to submit detailed negative feedback | |
def submit_negative_feedback(message_id, category, comment, query, response): | |
feedback_data = { | |
"user_id": "", # Add user tracking if available | |
"query": query, | |
"normalized_query": query.lower().strip() if query else "", | |
"question_hash": hashlib.md5(query.lower().strip().encode()).hexdigest()[:8] if query else "", | |
"response": response, | |
"rating": "negative", | |
"category": category, | |
"comment": comment, | |
"tags": category.lower().replace(" ", "_") if category else "", | |
"status": "open", | |
"admin_note": "", | |
"assigned_to": "", | |
"document_id": "", | |
"source": "user", | |
"priority": "medium", | |
"reviewed": False, | |
"timestamp": datetime.now().isoformat() | |
} | |
success = feedback_manager.save_feedback_supabase(feedback_data) | |
if not success: | |
logger.error(f"Failed to save negative feedback for message {message_id}") | |
st.session_state.feedback[message_id] = { | |
"rating": "negative", | |
"submitted": True, | |
"show_form": False | |
} | |
logger.info(f"Negative feedback with category '{category}' submitted for message {message_id}") | |
# Function to generate visible feedback UI for a message | |
def display_feedback_ui(message, index): | |
"""Generate and display feedback UI for an assistant message""" | |
message_id = message.get("id", f"msg_{index}") # Fallback for old messages | |
# Check if feedback exists for THIS specific message | |
has_feedback = message_id in st.session_state.feedback | |
feedback_data = st.session_state.feedback.get(message_id, {}) | |
# Only show thank you message if feedback was explicitly submitted for THIS message | |
if has_feedback and feedback_data.get("submitted") == True: | |
# Show thank you message if feedback was submitted | |
st.success("Thank you for your feedback!") | |
else: | |
# Add a little vertical space | |
st.write("") | |
# Create more spaced columns for the feedback buttons | |
feedback_cols = st.columns([0.7, 0.7, 8.6]) | |
with feedback_cols[0]: | |
# Only disable button if positive feedback was given for THIS message | |
thumbs_up_disabled = has_feedback and feedback_data.get("rating") == "positive" | |
if st.button("π", key=f"up_{message_id}", disabled=thumbs_up_disabled): | |
handle_feedback(message_id, "positive", | |
st.session_state.messages[index-1]["content"] if index > 0 else "", | |
message["content"]) | |
# Force a rerun to show the thank you message | |
st.rerun() | |
with feedback_cols[1]: | |
# Only disable button if negative feedback was given for THIS message | |
thumbs_down_disabled = has_feedback and feedback_data.get("rating") == "negative" | |
if st.button("π", key=f"down_{message_id}", disabled=thumbs_down_disabled): | |
handle_feedback(message_id, "negative", | |
st.session_state.messages[index-1]["content"] if index > 0 else "", | |
message["content"]) | |
# Force a rerun to show the feedback form | |
st.rerun() | |
# Show feedback form if thumbs down was clicked for THIS message | |
if has_feedback and feedback_data.get("show_form") == True: | |
with st.expander("Please tell us why this response wasn't helpful", expanded=True): | |
# Add form for detailed feedback | |
category = st.radio( | |
"What was the issue with this response?", | |
["Incorrect information", "Incomplete answer", "Irrelevant to question", "Other"], | |
key=f"category_{message_id}" | |
) | |
comment = st.text_area( | |
"Additional comments (optional):", | |
key=f"comment_{message_id}", | |
height=100 | |
) | |
if st.button("Submit Feedback", key=f"submit_{message_id}"): | |
submit_negative_feedback(message_id, category, comment, | |
st.session_state.messages[index-1]["content"] if index > 0 else "", | |
message["content"]) | |
# Force a rerun to show the thank you message | |
st.rerun() | |
# Display chat messages | |
for i, message in enumerate(st.session_state.messages): | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Add feedback buttons only for assistant messages | |
if message["role"] == "assistant": | |
display_feedback_ui(message, i) | |
# Chat input | |
if prompt := st.chat_input("What would you like to know?", disabled=st.session_state.processing): | |
# Add user message to chat history | |
add_message("user", prompt) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Get chatbot response | |
with st.chat_message("assistant"): | |
# Set processing flag to true | |
st.session_state.processing = True | |
# Create containers for the response and status | |
status_container = st.empty() | |
response_container = st.empty() | |
# Process the query with Streamlit's built-in spinner | |
try: | |
logger.info(f"User query: {prompt}") | |
# Show spinner with "Processing..." text | |
with status_container: | |
with st.spinner("Processing your question..."): | |
# Execute the query | |
response = st.session_state.chatbot.query(prompt) | |
# Clear the status container | |
status_container.empty() | |
# Display the response | |
response_container.markdown(response) | |
# Add assistant message with a unique ID | |
add_message("assistant", response) | |
logger.info("Response provided to user") | |
# Reset processing flag | |
st.session_state.processing = False | |
# Force a rerun to properly display feedback buttons | |
st.rerun() | |
except Exception as e: | |
logger.error(f"Error during query processing: {e}") | |
status_container.empty() | |
response_container.error(f"Sorry, I encountered an error while processing your request: {str(e)}") | |
# Reset processing flag when done | |
st.session_state.processing = False |