|
""" |
|
This module handles the chat display components and rendering. |
|
""" |
|
|
|
import streamlit as st |
|
import os |
|
import re |
|
from utils import has_meaningful_content, remove_reasoning_and_sources, clean_explanation, get_image_base64 |
|
from session_state import get_full_history |
|
|
|
|
|
def get_avatars(): |
|
""" |
|
Get avatar images for the user and assistant. |
|
|
|
Returns: |
|
tuple: (user_avatar, assistant_avatar) - URLs or emoji fallbacks |
|
""" |
|
|
|
user_logo_path = "src/assets/icon3.png" |
|
assistant_logo_path = "src/assets/logo.png" |
|
|
|
|
|
user_logo_base64 = get_image_base64(user_logo_path) |
|
assistant_logo_base64 = get_image_base64(assistant_logo_path) |
|
|
|
|
|
user_avatar = f"data:image/png;base64,{user_logo_base64}" if user_logo_base64 else "π€" |
|
assistant_avatar = f"data:image/png;base64,{assistant_logo_base64}" if assistant_logo_base64 else "π€" |
|
|
|
return user_avatar, assistant_avatar |
|
|
|
|
|
def extract_follow_up_questions(content): |
|
""" |
|
Extract follow-up questions from the main content. |
|
|
|
Args: |
|
content (str): The message content |
|
|
|
Returns: |
|
str: Extracted follow-up questions or empty string if none found |
|
""" |
|
|
|
patterns = [ |
|
r'(?i)follow[ -]?up questions:?\s*(.*?)(?=\n+\s*(?:#{1,3}|reasoning:|sources:|\Z))', |
|
r'(?i)#{1,3}\s*follow[ -]?up questions:?\s*(.*?)(?=\n+\s*(?:#{1,3}|reasoning:|sources:|\Z))', |
|
r'(?i)important questions to ask:?\s*(.*?)(?=\n+\s*(?:#{1,3}|reasoning:|sources:|\Z))', |
|
r'(?i)clarifying questions:?\s*(.*?)(?=\n+\s*(?:#{1,3}|reasoning:|sources:|\Z))', |
|
r'(?i)additional questions:?\s*(.*?)(?=\n+\s*(?:#{1,3}|reasoning:|sources:|\Z))', |
|
r'(?i)please clarify(?:[ a-z]+)?:?\s*(.*?)(?=\n+\s*(?:#{1,3}|reasoning:|sources:|\Z))', |
|
r'(?i)additional information needed:?\s*(.*?)(?=\n+\s*(?:#{1,3}|reasoning:|sources:|\Z))' |
|
] |
|
|
|
|
|
for pattern in patterns: |
|
follow_up_match = re.search(pattern, content, re.DOTALL) |
|
if follow_up_match: |
|
follow_up_text = follow_up_match.group(1).strip() |
|
|
|
|
|
intro_text = "" |
|
questions = [] |
|
|
|
|
|
intro_match = re.match(r'(.*?)(?=\n\s*[-β’*]|\n\s*\d+\.)', follow_up_text, re.DOTALL) |
|
if intro_match and intro_match.group(1).strip(): |
|
intro_text = intro_match.group(1).strip() |
|
|
|
|
|
if re.search(r'\n\s*[-β’*]', follow_up_text): |
|
|
|
bullet_points = re.findall(r'(?:\n|\A)\s*[-β’*]\s*(.*?)(?=\n\s*[-β’*]|\Z)', follow_up_text, re.DOTALL) |
|
if bullet_points: |
|
questions = [point.strip() for point in bullet_points if point.strip()] |
|
elif re.search(r'\n\s*\d+\.', follow_up_text): |
|
|
|
numbered_points = re.findall(r'(?:\n|\A)\s*\d+\.\s*(.*?)(?=\n\s*\d+\.|\Z)', follow_up_text, re.DOTALL) |
|
if numbered_points: |
|
questions = [point.strip() for point in numbered_points if point.strip()] |
|
else: |
|
|
|
lines = follow_up_text.split('\n') |
|
questions = [line.strip() for line in lines if line.strip()] |
|
|
|
if intro_text and questions and questions[0] == intro_text: |
|
questions = questions[1:] |
|
|
|
|
|
result = "" |
|
if intro_text: |
|
result += f"{intro_text}\n\n" |
|
|
|
|
|
if questions: |
|
for i, question in enumerate(questions): |
|
|
|
clean_question = re.sub(r'^\s*(?:\d+\.|\-|\β’|\*)\s*', '', question) |
|
result += f"{i+1}. {clean_question}\n" |
|
else: |
|
|
|
result += follow_up_text |
|
|
|
return result.strip() |
|
|
|
return "" |
|
|
|
|
|
def display_chat_history(): |
|
""" |
|
Display the chat history from the database. |
|
""" |
|
user_avatar, assistant_avatar = get_avatars() |
|
|
|
|
|
history = get_full_history() |
|
|
|
|
|
display_history = history |
|
|
|
for message in display_history: |
|
if message["role"] == "user": |
|
|
|
with st.container(): |
|
col1, col2 = st.columns([2, 10]) |
|
with col2: |
|
with st.chat_message("user", avatar=user_avatar): |
|
st.write(message["content"]) |
|
else: |
|
|
|
with st.container(): |
|
col1, col2 = st.columns([10, 2]) |
|
with col1: |
|
with st.chat_message("assistant", avatar=assistant_avatar): |
|
|
|
cleaned_response = remove_reasoning_and_sources(message["content"]) |
|
st.markdown(cleaned_response) |
|
|
|
|
|
if message.get("follow_up_questions") and message["follow_up_questions"].strip(): |
|
print(f"Found follow-up questions in message: {message['follow_up_questions']}") |
|
with st.expander("Additional Questions"): |
|
st.markdown(message["follow_up_questions"]) |
|
else: |
|
print(f"No follow-up questions found in message keys: {list(message.keys())}") |
|
|
|
|
|
if message.get("explanation") and has_meaningful_content(message.get("explanation")): |
|
|
|
cleaned_explanation = clean_explanation(message["explanation"]) |
|
|
|
|
|
|
|
cleaned_explanation = re.sub(r'(?i)(\n+\s*sources:|\n+\s*references:|\n+\s*\*{0,2}sources\*{0,2}:?|\n+\s*\*{0,2}references\*{0,2}:?|\n+\s*#{1,3}\s*sources|\n+\s*#{1,3}\s*references).*', '', cleaned_explanation, flags=re.DOTALL) |
|
|
|
cleaned_explanation = re.sub(r'#{1,3}\s+Sources.*', '', cleaned_explanation, flags=re.DOTALL) |
|
|
|
with st.expander("Show Reasoning"): |
|
st.markdown(cleaned_explanation) |
|
|
|
|
|
if message.get("evidence") and len(message.get("evidence", [])) > 0: |
|
with st.expander("Show Sources"): |
|
st.markdown("**Medical sources used:**") |
|
for i, source in enumerate(message.get("evidence", [])): |
|
title = source.get("title", "No title") |
|
url = source.get("url", "#") |
|
source_type = source.get("source_type", "Unknown Source") |
|
is_open_access = source.get("is_open_access", False) |
|
|
|
access_icon = "π " if is_open_access else "" |
|
|
|
citation_text = source.get('citation', title).strip() |
|
|
|
markdown_string = ( |
|
f"{i+1}. {access_icon}{citation_text}\n" |
|
f" [[Access Article]]({url}) - *{source_type}*" |
|
) |
|
st.markdown(markdown_string) |
|
|
|
st.markdown("---") |
|
st.markdown("**Legend:** π = Open Access (full text available)") |
|
|
|
|
|
def show_typing_indicator(): |
|
""" |
|
Display a typing indicator when the system is processing a response. |
|
""" |
|
if st.session_state.processing: |
|
user_avatar, assistant_avatar = get_avatars() |
|
with st.container(): |
|
col1, col2 = st.columns([10, 2]) |
|
with col1: |
|
with st.chat_message("assistant", avatar=assistant_avatar): |
|
st.markdown(""" |
|
<div class="typing-indicator"> |
|
<span></span> |
|
<span></span> |
|
<span></span> |
|
</div> |
|
<p style="margin: 0; color: rgba(255,255,255,0.6) !important; font-size: 0.8rem;">Analyzing your query...</p> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
def display_legal_disclaimer(): |
|
""" |
|
Display the legal disclaimer at the bottom of the page. |
|
""" |
|
st.markdown(""" |
|
<div class="footer-text"> |
|
For informational purposes only. Not a substitute for professional medical advice. |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|