chat / app.py
Paul Magee
fix for shadow header
775d896
raw
history blame
16.7 kB
"""
Frontend Streamlit application for the chatbot.
"""
import os
os.environ["XDG_CACHE_HOME"] = "/tmp" # Fix the cloud bug
os.environ["HF_HOME"] = "/tmp" # Fix the cloud bug
os.environ["TRANSFORMERS_CACHE"] = "/tmp" # Fix the cloud bug
os.environ["LLAMA_INDEX_CACHE_DIR"] = "/tmp" # Fix the cloud bug
import streamlit as st
import time
import logging
import backend
import config
from config import APP_NAME, PRIMARY_COLOR, SECONDARY_COLOR
from utils.logging_config import setup_logging
from utils.feedback import feedback_manager # Import the feedback manager
import hashlib
from datetime import datetime
# Set up logging
setup_logging()
logger = logging.getLogger(__name__)
# Constants for memory optimization
MAX_MESSAGES = 50
# Set page config
st.set_page_config(
page_title=APP_NAME,
page_icon="πŸ’¬",
layout="centered",
initial_sidebar_state="collapsed"
)
# Hide Streamlit's default elements and style sidebar
st.markdown("""
<style>
/* Hide default elements */
button[kind="deploy"],
[data-testid="stToolbar"],
.stDeployButton,
#MainMenu,
footer {
display: none !important;
}
/* Style the sidebar */
[data-testid="stSidebar"] {
width: 120px !important;
background-color: #0e1117 !important;
border-right: 1px solid #1e1e1e !important;
}
/* Style the collapse button to prevent movement and maintain size */
button[kind="menuButton"] {
left: 120px !important;
margin-left: 0 !important;
position: fixed !important;
transform: translateX(0) !important;
transition: none !important;
background-color: #0e1117 !important;
color: #fff !important;
z-index: 999 !important;
}
/* Button styling */
[data-testid="stSidebar"] [data-testid="stButton"] button {
background-color: #262730 !important;
color: white !important;
padding: 8px 10px !important;
width: 100px !important;
font-size: 0.9rem !important;
margin: 1rem auto !important;
display: block !important;
border-radius: 4px !important;
white-space: nowrap !important;
}
[data-testid="stSidebar"] [data-testid="stButton"] button p {
text-align: center !important;
white-space: nowrap !important;
overflow: visible !important;
}
[data-testid="stSidebar"] [data-testid="stButton"] button:hover {
background-color: #1E2130 !important;
}
/* Feedback button styles */
.feedback-buttons {
display: flex;
gap: 10px;
margin-top: 5px;
}
.feedback-button {
border: none;
background: none;
cursor: pointer;
padding: 5px;
border-radius: 4px;
transition: background-color 0.3s;
}
.feedback-button:hover {
background-color: #f0f0f0;
}
.feedback-form {
margin-top: 10px;
padding: 10px;
border-radius: 4px;
background-color: #f7f7f7;
}
/* Custom radio buttons */
.stRadio > div {
display: flex;
gap: 10px;
}
.stRadio > div > div {
flex: 1;
}
</style>
""", unsafe_allow_html=True)
# Helper function to add messages and maintain history length
def add_message(role, content):
"""Add message to session state and trim if needed."""
# Generate a unique ID for this message based on content
message_id = f"{role}_{hashlib.md5(content.encode()).hexdigest()[:8]}"
st.session_state.messages.append({
"role": role,
"content": content,
"id": message_id # Store the ID with the message
})
# Trim message history if it gets too large
if len(st.session_state.messages) > MAX_MESSAGES:
# Keep the most recent messages
st.session_state.messages = st.session_state.messages[-MAX_MESSAGES:]
logger.info(f"Added {role} message with ID {message_id}. History length: {len(st.session_state.messages)}")
# Initialize session state for chat history and UI control
if "messages" not in st.session_state:
st.session_state.messages = []
if "clear_clicked" not in st.session_state:
st.session_state.clear_clicked = False
if "processing" not in st.session_state:
st.session_state.processing = False
if "feedback" not in st.session_state:
st.session_state.feedback = {} # Store feedback data by message index
# Create a sidebar with button options
with st.sidebar:
def clear_chat():
# Clear messages in session state
st.session_state.messages = []
# Also clear the chatbot's memory if it exists
if "chatbot" in st.session_state and hasattr(st.session_state.chatbot, "reset_chat_history"):
st.session_state.chatbot.reset_chat_history()
# Set flag to true to trigger page refresh
st.session_state.clear_clicked = True
logger.info("Chat history cleared via sidebar button")
# Define stop processing function
def stop_processing():
st.session_state.processing = False
logger.info("User requested to stop processing")
st.rerun()
st.button("Clear Chat", on_click=clear_chat)
# Only show Stop button when processing
if st.session_state.processing:
st.button("Stop", on_click=stop_processing, type="primary")
# Handle the clear button click by refreshing the page
if st.session_state.clear_clicked:
st.session_state.clear_clicked = False
st.rerun()
# Initialize cached resources for all sessions
@st.cache_resource
def initialize_resources():
"""Initialize shared resources once for all sessions."""
logger.info("Initializing shared resources...")
# Get configuration from config module
chatbot_config = config.get_chatbot_config()
api_key = os.getenv("ANTHROPIC_API_KEY")
# Load models using cache - using the backend module directly
llm = backend.load_llm_model(
api_key,
chatbot_config.get("model", "claude-3-7-sonnet-20250219"),
chatbot_config.get("temperature", 0.1),
chatbot_config.get("max_tokens", 2048)
)
embed_model = backend.load_embedding_model(
chatbot_config.get("embedding_model", "sentence-transformers/all-MiniLM-L6-v2"),
chatbot_config.get("device", "cpu"),
chatbot_config.get("embed_batch_size", 8)
)
# Load or create index (shared across sessions)
index = backend.load_or_create_index()
return {
"config": chatbot_config,
"llm": llm,
"embed_model": embed_model,
"index": index
}
# Get shared resources
resources = initialize_resources()
# Initialize chatbot only in session state if it doesn't exist
if "chatbot" not in st.session_state:
with st.spinner("Initializing chatbot..."):
logger.info("Initializing chatbot with shared resources...")
st.session_state.chatbot = backend.Chatbot(
resources["config"],
resources["llm"],
resources["embed_model"],
resources["index"]
)
# Initialize chat engine instead of query engine
st.session_state.chatbot.initialize_chat_engine()
logger.info("Chatbot initialized successfully")
# Only show header/description if there are no user messages yet
if not any(msg["role"] == "user" for msg in st.session_state.messages):
st.title("Paul's Chatbot v0.5")
st.markdown("""
This chatbot can answer questions about your documents.
Ask any question about the content in your documents! This demo is focused on a psychology book. Ask it questions about how the human brain works, how to stick with a habit, or how we are easily fooled.
""")
# Debug information - only show if DEBUG is enabled
if config.DEBUG:
st.sidebar.write("Debug Info:")
st.sidebar.write(f"Number of messages: {len(st.session_state.messages)}")
# Show message IDs
message_ids = [msg.get("id", "no-id") for msg in st.session_state.messages if msg["role"] == "assistant"]
st.sidebar.write(f"Assistant message IDs: {message_ids}")
# Show feedback state in a more readable format
if st.session_state.feedback:
st.sidebar.write("Feedback state:")
for msg_id, feedback in st.session_state.feedback.items():
st.sidebar.write(f"- {msg_id}: {feedback}")
else:
st.sidebar.write("No feedback recorded yet")
st.sidebar.write(f"Processing: {st.session_state.processing}")
# Function to handle feedback with the feedback manager
def handle_feedback(message_id, feedback_type, query, response):
logger.info(f"Handling {feedback_type} feedback for message ID: {message_id}")
if feedback_type == "positive":
feedback_data = {
"user_id": "", # Add user tracking if available
"query": query,
"normalized_query": query.lower().strip() if query else "",
"question_hash": hashlib.md5(query.lower().strip().encode()).hexdigest()[:8] if query else "",
"response": response,
"rating": "positive",
"category": "",
"comment": "",
"tags": "",
"status": "open",
"admin_note": "",
"assigned_to": "",
"document_id": "",
"source": "user",
"priority": "low",
"reviewed": False,
"timestamp": datetime.now().isoformat()
}
success = feedback_manager.save_feedback_supabase(feedback_data)
if not success:
logger.error(f"Failed to save positive feedback for message {message_id}")
st.session_state.feedback[message_id] = {
"rating": "positive",
"submitted": True
}
logger.info(f"Positive feedback saved for message {message_id}")
else:
st.session_state.feedback[message_id] = {
"rating": "negative",
"show_form": True,
"submitted": False
}
logger.info(f"Negative feedback started for message {message_id}, showing form")
logger.info(f"Current feedback state: {st.session_state.feedback}")
# Function to submit detailed negative feedback
def submit_negative_feedback(message_id, category, comment, query, response):
feedback_data = {
"user_id": "", # Add user tracking if available
"query": query,
"normalized_query": query.lower().strip() if query else "",
"question_hash": hashlib.md5(query.lower().strip().encode()).hexdigest()[:8] if query else "",
"response": response,
"rating": "negative",
"category": category,
"comment": comment,
"tags": category.lower().replace(" ", "_") if category else "",
"status": "open",
"admin_note": "",
"assigned_to": "",
"document_id": "",
"source": "user",
"priority": "medium",
"reviewed": False,
"timestamp": datetime.now().isoformat()
}
success = feedback_manager.save_feedback_supabase(feedback_data)
if not success:
logger.error(f"Failed to save negative feedback for message {message_id}")
st.session_state.feedback[message_id] = {
"rating": "negative",
"submitted": True,
"show_form": False
}
logger.info(f"Negative feedback with category '{category}' submitted for message {message_id}")
# Function to generate visible feedback UI for a message
def display_feedback_ui(message, index):
"""Generate and display feedback UI for an assistant message"""
message_id = message.get("id", f"msg_{index}") # Fallback for old messages
# Check if feedback exists for THIS specific message
has_feedback = message_id in st.session_state.feedback
feedback_data = st.session_state.feedback.get(message_id, {})
# Only show thank you message if feedback was explicitly submitted for THIS message
if has_feedback and feedback_data.get("submitted") == True:
# Show thank you message if feedback was submitted
st.success("Thank you for your feedback!")
else:
# Add a little vertical space
st.write("")
# Create more spaced columns for the feedback buttons
feedback_cols = st.columns([0.7, 0.7, 8.6])
with feedback_cols[0]:
# Only disable button if positive feedback was given for THIS message
thumbs_up_disabled = has_feedback and feedback_data.get("rating") == "positive"
if st.button("πŸ‘", key=f"up_{message_id}", disabled=thumbs_up_disabled):
handle_feedback(message_id, "positive",
st.session_state.messages[index-1]["content"] if index > 0 else "",
message["content"])
# Force a rerun to show the thank you message
st.rerun()
with feedback_cols[1]:
# Only disable button if negative feedback was given for THIS message
thumbs_down_disabled = has_feedback and feedback_data.get("rating") == "negative"
if st.button("πŸ‘Ž", key=f"down_{message_id}", disabled=thumbs_down_disabled):
handle_feedback(message_id, "negative",
st.session_state.messages[index-1]["content"] if index > 0 else "",
message["content"])
# Force a rerun to show the feedback form
st.rerun()
# Show feedback form if thumbs down was clicked for THIS message
if has_feedback and feedback_data.get("show_form") == True:
with st.expander("Please tell us why this response wasn't helpful", expanded=True):
# Add form for detailed feedback
category = st.radio(
"What was the issue with this response?",
["Incorrect information", "Incomplete answer", "Irrelevant to question", "Other"],
key=f"category_{message_id}"
)
comment = st.text_area(
"Additional comments (optional):",
key=f"comment_{message_id}",
height=100
)
if st.button("Submit Feedback", key=f"submit_{message_id}"):
submit_negative_feedback(message_id, category, comment,
st.session_state.messages[index-1]["content"] if index > 0 else "",
message["content"])
# Force a rerun to show the thank you message
st.rerun()
# Display chat messages
for i, message in enumerate(st.session_state.messages):
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Add feedback buttons only for assistant messages
if message["role"] == "assistant":
display_feedback_ui(message, i)
# Chat input
if prompt := st.chat_input("What would you like to know?", disabled=st.session_state.processing):
# Add user message to chat history
add_message("user", prompt)
with st.chat_message("user"):
st.markdown(prompt)
# Get chatbot response
with st.chat_message("assistant"):
# Set processing flag to true
st.session_state.processing = True
# Create containers for the response and status
status_container = st.empty()
response_container = st.empty()
# Process the query with Streamlit's built-in spinner
try:
logger.info(f"User query: {prompt}")
# Show spinner with "Processing..." text
with status_container:
with st.spinner("Processing your question..."):
# Execute the query
response = st.session_state.chatbot.query(prompt)
# Clear the status container
status_container.empty()
# Display the response
response_container.markdown(response)
# Add assistant message with a unique ID
add_message("assistant", response)
logger.info("Response provided to user")
# Reset processing flag
st.session_state.processing = False
# Force a rerun to properly display feedback buttons
st.rerun()
except Exception as e:
logger.error(f"Error during query processing: {e}")
status_container.empty()
response_container.error(f"Sorry, I encountered an error while processing your request: {str(e)}")
# Reset processing flag when done
st.session_state.processing = False