chinmayjha's picture
Improve agent output formatting with inline citations and full sources
8c6064d unverified
import json
import re
from typing import Any, Dict, List, Tuple, Optional, Union
from datetime import datetime
import gradio as gr
import pandas as pd
from pymongo import MongoClient
from smolagents import ToolCallingAgent
from second_brain_online.config import settings
class CustomGradioUI:
"""Custom Gradio UI for better formatting of agent responses with source attribution."""
def __init__(self, agent: Union[ToolCallingAgent, Any]):
"""Initialize the UI with either a ToolCallingAgent or AgentWrapper.
Args:
agent: Either a raw ToolCallingAgent or an AgentWrapper that wraps it.
"""
self.agent = agent
self.mongodb_client = None
self.database = None
self.conversation_collection = None
self.setup_mongodb()
self.setup_ui()
def setup_mongodb(self):
"""Setup MongoDB connection."""
try:
self.mongodb_client = MongoClient(settings.MONGODB_URI)
self.database = self.mongodb_client[settings.MONGODB_DATABASE_NAME]
self.conversation_collection = self.database["test_conversation_documents"]
print("βœ… MongoDB connection established successfully")
except Exception as e:
print(f"❌ Failed to connect to MongoDB: {e}")
self.mongodb_client = None
self.database = None
self.conversation_collection = None
def setup_ui(self):
"""Setup the Gradio interface with custom components."""
with gr.Blocks(
title="Second Brain AI Assistant",
theme=gr.themes.Soft(),
css="""
.source-card {
border: 1px solid #e0e0e0;
border-radius: 8px;
padding: 12px;
margin: 8px 0;
background-color: #f8f9fa;
}
.source-title {
font-weight: bold;
color: #2c3e50;
margin-bottom: 4px;
}
.source-date {
font-size: 0.9em;
color: #6c757d;
margin-bottom: 8px;
}
.answer-section {
background-color: #ffffff;
border: 1px solid #dee2e6;
border-radius: 8px;
padding: 16px;
margin-bottom: 16px;
}
.tool-usage {
background-color: #e3f2fd;
border-left: 4px solid #2196f3;
padding: 8px 12px;
margin: 8px 0;
border-radius: 4px;
font-size: 0.9em;
}
"""
) as self.interface:
gr.Markdown("# 🧠 Second Brain AI Assistant")
gr.Markdown("Ask questions about your documents and get AI-powered insights with source attribution.")
self.query_input = gr.Textbox(
label="Ask a question",
placeholder="What pricing objections were raised in the meetings?",
lines=2
)
self.submit_btn = gr.Button("Ask", variant="primary", size="lg")
with gr.Row():
with gr.Column():
self.answer_output = gr.HTML(label="Answer")
with gr.Accordion("πŸ“Š Conversations", open=False):
with gr.Row():
self.conversation_search = gr.Textbox(
label="Search Conversations",
placeholder="Search by conversation ID, customer info, summary, or key findings...",
scale=4
)
self.clear_search_btn = gr.Button("Clear", scale=1)
self.conversation_table = gr.Dataframe(
headers=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"],
datatype=["str", "str", "str", "str", "str"],
interactive=False,
label="Available Conversations",
wrap=True,
max_height=400,
value=self.load_conversations()
)
with gr.Accordion("πŸ“š Sources", open=False):
self.sources_output = gr.HTML(label="Sources")
with gr.Accordion("πŸ› οΈ Tools Used", open=False):
self.tools_output = gr.HTML(label="Tools Used")
with gr.Accordion("πŸ” Debug: Raw Response", open=False):
self.debug_output = gr.Textbox(
label="Raw Agent Response",
lines=10,
max_lines=20,
interactive=False
)
# Event handlers
self.submit_btn.click(
fn=self.process_query,
inputs=[self.query_input],
outputs=[self.answer_output, self.sources_output, self.tools_output, self.debug_output, self.conversation_table],
show_progress="full" # Show progress indicator
)
self.query_input.submit(
fn=self.process_query,
inputs=[self.query_input],
outputs=[self.answer_output, self.sources_output, self.tools_output, self.debug_output, self.conversation_table],
show_progress="full" # Show progress indicator
)
# Conversation search handlers
self.conversation_search.change(
fn=self.filter_conversations,
inputs=[self.conversation_search],
outputs=[self.conversation_table]
)
self.clear_search_btn.click(
fn=self.clear_conversation_search,
inputs=[],
outputs=[self.conversation_search, self.conversation_table]
)
def process_query(self, query: str, progress=gr.Progress()) -> Tuple[str, str, str, str, pd.DataFrame]:
"""Process the user query and return formatted response components."""
if not query.strip():
# Clear all outputs when query is empty
return "", "", "", "", self.load_conversations()
try:
# Show progress indicator with descriptive message
progress(0, desc="πŸ” Starting query processing...")
# Run the agent (this takes 30-60 seconds)
# Use None for indeterminate progress during long operation
progress(None, desc="πŸ” Searching knowledge base and retrieving documents...")
result = self.agent.run(query)
# Quick post-processing steps
progress(0.8, desc="✨ Displaying results...")
# CRITICAL DEBUG: Print what result actually is
print("\n" + "="*80)
print("DEBUG: WHAT IS RESULT?")
print("="*80)
print(f"Type: {type(result)}")
print(f"Is string? {isinstance(result, str)}")
print(f"Has πŸ“š Sources? {'πŸ“š Sources' in str(result) if result else False}")
print(f"First 1500 chars of result:\n{str(result)[:1500]}")
print("="*80)
# Convert result to string
result_str = str(result)
# Debug information
print("\n" + "="*80)
print("DEBUG: RAW AGENT RESULT")
print("="*80)
print(f"Type: {type(result)}")
print(f"Full Content:\n{result_str}")
print("="*80)
# Extract tools used from agent logs (for Tools Used section)
agent_logs = getattr(self.agent, 'logs', []) if hasattr(self.agent, 'logs') else []
tools_used = []
if agent_logs:
for step in agent_logs:
if hasattr(step, 'tool_calls') and step.tool_calls:
for tool_call in step.tool_calls:
if hasattr(tool_call, 'name'):
tools_used.append(tool_call.name)
tools_used = list(set(tools_used)) # Remove duplicates
# Format the raw answer with proper HTML structure (no parsing, just formatting)
answer_html = self._format_raw_answer(result_str)
# Leave Sources section empty (already in the answer)
sources_html = ""
# Format tools
tools_html = self.format_tools(tools_used)
# Debug text
debug_text = result_str
# Show all conversations (no filtering since we're not parsing sources)
progress(0.95, desc="πŸ“Š Loading conversations...")
all_conversations = self.load_conversations()
progress(1.0, desc="βœ… Complete!")
return answer_html, sources_html, tools_html, debug_text, all_conversations
except Exception as e:
error_msg = f"<div style='color: #dc3545; padding: 12px; border: 1px solid #f5c6cb; border-radius: 4px; background-color: #f8d7da;'>Error: {str(e)}</div>"
return error_msg, "", "", str(e), self.load_conversations()
def _format_raw_answer(self, answer: str) -> str:
"""Format the raw answer with basic HTML structure without parsing.
Just converts markdown-style formatting to HTML and preserves the structure.
"""
if not answer:
return "<div class='answer-section'><p>No answer provided.</p></div>"
# Convert markdown bold to HTML bold
answer = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', answer)
# Convert line breaks to HTML
answer = answer.replace('\n', '<br>')
# Clean up multiple line breaks
answer = re.sub(r'(<br>){3,}', '<br><br>', answer)
return f"""
<div class='answer-section'>
<div style='line-height: 1.8; font-size: 16px; white-space: pre-wrap;'>{answer}</div>
</div>
"""
def _parse_sources_from_text(self, sources_text: str) -> List[Dict]:
"""Parse sources from the formatted text output.
Expected format:
Doc 1: Title (Date)
Source: Type | Document ID: ID | URL | User ID
Summary: ...
Key Findings:
- [Type/Impact] Finding
"""
sources = []
# Split by "Doc X:" pattern
doc_pattern = r'Doc\s+(\d+):\s*([^\n]+)'
doc_matches = re.finditer(doc_pattern, sources_text)
for match in doc_matches:
doc_num = match.group(1)
title_line = match.group(2).strip()
# Find the next Doc or end of string
start_pos = match.end()
next_match = re.search(r'Doc\s+\d+:', sources_text[start_pos:])
if next_match:
end_pos = start_pos + next_match.start()
doc_content = sources_text[start_pos:end_pos]
else:
doc_content = sources_text[start_pos:]
# Extract title and date from title line
title_date_match = re.match(r'(.+?)\s*\(([^)]+)\)', title_line)
if title_date_match:
title = title_date_match.group(1).strip()
date = title_date_match.group(2).strip()
else:
title = title_line
date = ""
# Extract document ID
doc_id = ""
id_match = re.search(r'Document ID:\s*([a-zA-Z0-9]+)', doc_content)
if id_match:
doc_id = id_match.group(1)
# Extract summary
summary = ""
summary_match = re.search(r'Summary:\s*([^\n]+)', doc_content)
if summary_match:
summary = summary_match.group(1).strip()
# Extract key findings
key_findings = []
findings_section = re.search(r'Key Findings:\s*(.+?)(?=\n\nDoc\s+\d+:|$)', doc_content, re.DOTALL)
if findings_section:
findings_text = findings_section.group(1)
# Extract each finding line
finding_lines = re.findall(r'-\s*\[([^\]]+)\]\s*([^\n]+)', findings_text)
for finding_type, finding_text in finding_lines:
key_findings.append(f"[{finding_type}] {finding_text.strip()}")
sources.append({
"id": doc_id,
"title": title,
"date": date,
"summary": summary,
"key_findings": key_findings,
"quotes": [] # Not using quotes in new format
})
return sources
def parse_agent_response(self, result: Any, agent_logs: List = None) -> Tuple[str, List[Dict], List[str]]:
"""Parse the agent response to extract answer, sources, and tools used."""
answer = ""
sources = []
tools_used = []
# Convert result to string if it's not already
result_str = str(result)
# Extract the answer from the result
# Pattern 1: JSON format with "answer" key
json_match = re.search(r'{"answer":\s*"([^"]+)"}', result_str)
if json_match:
answer = json_match.group(1)
# Unescape the JSON string
answer = answer.replace('\\n', '\n').replace('\\"', '"')
else:
# Pattern 2: Look for "Final answer:" followed by content
final_answer_match = re.search(r'Final answer:\s*(.+?)(?=\n\n|\Z)', result_str, re.DOTALL)
if final_answer_match:
answer = final_answer_match.group(1).strip()
# Try to extract JSON from final answer
json_in_final = re.search(r'{"answer":\s*"([^"]+)"}', answer)
if json_in_final:
answer = json_in_final.group(1).replace('\\n', '\n').replace('\\"', '"')
else:
# Pattern 3: Use the entire result as answer if no specific pattern matches
answer = result_str
# NEW: Split answer and sources section
# Look for the Sources section marker (πŸ“š Sources:)
sources_split = re.split(r'πŸ“š\s*Sources:?', answer, maxsplit=1, flags=re.IGNORECASE)
if len(sources_split) == 2:
# We found a Sources section
answer_only = sources_split[0].strip()
sources_text = sources_split[1].strip()
# Parse sources from the text
sources = self._parse_sources_from_text(sources_text)
# Update answer to only include the answer part
answer = answer_only
else:
# No sources section found, answer remains as-is
pass
# If we have agent logs, extract tools and sources from them
if agent_logs:
for step in agent_logs:
# Extract tool calls
if hasattr(step, 'tool_calls') and step.tool_calls:
for tool_call in step.tool_calls:
if hasattr(tool_call, 'name'):
tools_used.append(tool_call.name)
# Extract sources from observations
if hasattr(step, 'observations') and step.observations:
print(f"DEBUG: Processing observations: {step.observations[:500]}...")
# Look for complete document blocks with all content
document_pattern = r'<document id="(\d+)">\s*<title>(.*?)</title>\s*<date>(.*?)</date>\s*<contextual_summary>(.*?)</contextual_summary>\s*<marketing_insights>(.*?)</marketing_insights>\s*<content>(.*?)</content>'
document_matches = re.findall(document_pattern, step.observations, re.DOTALL)
print(f"DEBUG: Found {len(document_matches)} document matches with full pattern")
for doc_id, doc_title, doc_date, contextual_summary, marketing_insights, content in document_matches:
# Clean up the basic fields
clean_title = doc_title.strip()
clean_date = doc_date.strip()
clean_summary = contextual_summary.strip()
# Extract key findings from marketing insights
key_findings = []
key_findings_pattern = r'<key_findings>(.*?)</key_findings>'
key_findings_match = re.search(key_findings_pattern, marketing_insights, re.DOTALL)
if key_findings_match:
key_findings_text = key_findings_match.group(1).strip()
# Split by lines and clean up
key_findings = [line.strip() for line in key_findings_text.split('\n') if line.strip() and line.strip().startswith('-')]
# Extract quotes from marketing insights
quotes = []
quotes_pattern = r'<quotes>(.*?)</quotes>'
quotes_match = re.search(quotes_pattern, marketing_insights, re.DOTALL)
if quotes_match:
quotes_text = quotes_match.group(1).strip()
# Split by lines and clean up
quotes = [line.strip() for line in quotes_text.split('\n') if line.strip() and line.strip().startswith('-')]
sources.append({
"id": doc_id,
"title": clean_title,
"date": clean_date,
"summary": clean_summary,
"key_findings": key_findings,
"quotes": quotes
})
# Fallback: Look for simpler document patterns if the full pattern didn't match
if not document_matches:
print("DEBUG: Trying fallback document patterns...")
# Pattern 1: Simple document with ID and title
simple_pattern = r'<document id="(\d+)">\s*<title>(.*?)</title>'
simple_matches = re.findall(simple_pattern, step.observations, re.DOTALL)
print(f"DEBUG: Found {len(simple_matches)} simple document matches")
for doc_id, doc_title in simple_matches:
sources.append({
"id": doc_id,
"title": doc_title.strip(),
"date": "",
"summary": "",
"key_findings": [],
"quotes": []
})
# Pattern 2: Look for conversation IDs in the content
conv_id_pattern = r'conversation[_\s]*id[:\s]*(\d+)'
conv_id_matches = re.findall(conv_id_pattern, step.observations, re.IGNORECASE)
print(f"DEBUG: Found {len(conv_id_matches)} conversation ID matches: {conv_id_matches}")
for conv_id in conv_id_matches:
sources.append({
"id": conv_id,
"title": f"Conversation {conv_id}",
"date": "",
"summary": "",
"key_findings": [],
"quotes": []
})
# Fallback: Try to extract from result string if no logs provided
if not agent_logs:
# Extract tool usage from the result first
# Pattern 1: πŸ› οΈ Used tool toolname
tool_pattern1 = r'πŸ› οΈ Used tool (\w+)'
tool_matches1 = re.findall(tool_pattern1, result_str)
# Pattern 2: Calling tool: 'toolname' (with single quotes)
tool_pattern2 = r"Calling tool:\s*'([^']+)'"
tool_matches2 = re.findall(tool_pattern2, result_str)
# Pattern 3: Calling tool: 'toolname' (with double quotes)
tool_pattern3 = r'Calling tool:\s*"([^"]+)"'
tool_matches3 = re.findall(tool_pattern3, result_str)
# Pattern 4: Calling tool: toolname (without quotes)
tool_pattern4 = r'Calling tool:\s*([a-zA-Z_][a-zA-Z0-9_]*)'
tool_matches4 = re.findall(tool_pattern4, result_str)
# Combine all patterns
all_tool_matches = tool_matches1 + tool_matches2 + tool_matches3 + tool_matches4
tools_used = list(set(all_tool_matches)) # Remove duplicates
# Extract sources from the structured search_results format
# Look for <document> tags in the search results
document_pattern = r'<document id="(\d+)">\s*<title>(.*?)</title>\s*<date>(.*?)</date>'
document_matches = re.findall(document_pattern, result_str, re.DOTALL)
for doc_id, doc_title, doc_date in document_matches:
# Clean up the title and date
clean_title = doc_title.strip()
clean_date = doc_date.strip()
sources.append({
"id": doc_id,
"title": clean_title,
"date": clean_date
})
# Remove duplicates based on document ID (keep all unique documents)
unique_sources = []
seen = set()
for source in sources:
# Use document ID as the unique key, fallback to title+date if no ID
key = source.get("id", f"{source['title']}_{source['date']}")
if key not in seen:
seen.add(key)
unique_sources.append(source)
# Remove duplicate tools
tools_used = list(set(tools_used))
return answer, unique_sources, tools_used
def format_answer(self, answer: str) -> str:
"""Format the answer with proper HTML structure."""
if not answer:
return "<div class='answer-section'><p>No answer provided.</p></div>"
# Check if the answer is a JSON string and extract the actual answer
if answer.strip().startswith('{"answer":') and answer.strip().endswith('}'):
try:
import json
answer_data = json.loads(answer)
if isinstance(answer_data, dict) and 'answer' in answer_data:
answer = answer_data['answer']
except (json.JSONDecodeError, KeyError):
# If JSON parsing fails, use the original answer
pass
# Remove source references from the answer text for cleaner display
answer = re.sub(r'\(Document:[^)]+\)', '', answer)
# Clean up extra whitespace but preserve intentional line breaks
answer = re.sub(r'[ \t]+', ' ', answer) # Replace multiple spaces/tabs with single space
answer = re.sub(r' *\n *', '\n', answer) # Clean up spaces around newlines
# Format numbered lists and bullet points
answer = re.sub(r'\n\s*\d+\.\s*', '\n\n<strong>', answer) # Numbered lists
answer = re.sub(r'\n\s*β€’\s*', '\nβ€’ ', answer) # Bullet points
answer = re.sub(r'\n\s*-\s*', '\nβ€’ ', answer) # Dash points
# Format bold text (markdown style)
answer = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', answer)
# Convert line breaks to HTML
answer = answer.replace('\n', '<br>')
# Clean up multiple line breaks
answer = re.sub(r'(<br>){3,}', '<br><br>', answer)
return f"""
<div class='answer-section'>
<h3>πŸ“ Answer</h3>
<div style='line-height: 1.6; font-size: 16px;'>{answer}</div>
</div>
"""
def format_sources(self, sources: List[Dict]) -> str:
"""Format the sources with rich information including key findings and marketing insights."""
if not sources:
return "<div><p>No sources found.</p></div>"
sources_html = "<div>"
for i, source in enumerate(sources, 1):
title = source.get("title", "Unknown")
date = source.get("date", "Unknown")
doc_id = source.get("id", "")
summary = source.get("summary", "")
key_findings = source.get("key_findings", [])
quotes = source.get("quotes", [])
sources_html += f"""
<div class='source-card' style='margin-bottom: 20px; padding: 15px; border: 1px solid #e0e0e0; border-radius: 8px; background-color: #f9f9f9;'>
<div class='source-title' style='font-weight: bold; font-size: 16px; margin-bottom: 8px;'>{i}. {title}</div>
<div class='source-meta' style='color: #666; margin-bottom: 10px;'>
πŸ“… {date}
{f" | ID: {doc_id}" if doc_id else ""}
</div>
"""
if summary:
sources_html += f"""
<div class='source-summary' style='margin-bottom: 10px;'>
<strong>Summary:</strong> {summary}
</div>
"""
if key_findings:
sources_html += """
<div class='source-findings' style='margin-bottom: 10px;'>
<strong>Key Findings:</strong>
<ul style='margin: 5px 0; padding-left: 20px;'>
"""
for finding in key_findings:
clean_finding = finding.lstrip('- ').strip()
sources_html += f"<li style='margin-bottom: 3px;'>{clean_finding}</li>"
sources_html += "</ul></div>"
if quotes:
sources_html += """
<div class='source-quotes' style='margin-bottom: 10px;'>
<strong>Key Quotes:</strong>
<ul style='margin: 5px 0; padding-left: 20px;'>
"""
for quote in quotes:
clean_quote = quote.lstrip('- ').strip()
sources_html += f"<li style='margin-bottom: 3px; font-style: italic; color: #555;'>{clean_quote}</li>"
sources_html += "</ul></div>"
sources_html += "</div>"
sources_html += "</div>"
return sources_html
def format_tools(self, tools_used: List[str]) -> str:
"""Format the tools used with proper HTML structure."""
if not tools_used:
return "<div><p>No tools used.</p></div>"
tools_html = "<div>"
for tool in tools_used:
tools_html += f"""
<div class='tool-usage'>
πŸ”§ {tool.replace('_', ' ').title()}
</div>
"""
tools_html += "</div>"
return tools_html
def load_conversations(self, limit: int = 50) -> pd.DataFrame:
"""Load conversations from MongoDB and format for display."""
if self.conversation_collection is None:
return pd.DataFrame(columns=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"])
try:
# Query for documents with conversation_analysis
pipeline = [
{"$match": {"conversation_analysis": {"$exists": True}}},
{"$limit": limit},
{"$project": {
"conversation_id": "$metadata.properties.conversation_id",
"user_id": "$metadata.properties.user_id",
"icp_region": "$metadata.properties.icp_region",
"icp_country": "$metadata.properties.icp_country",
"team_size": "$metadata.properties.team_size",
"summary": "$conversation_analysis.aggregated_contextual_summary",
"key_findings": "$conversation_analysis.aggregated_marketing_insights.key_findings",
"follow_up_email": "$conversation_analysis.follow_up_email"
}}
]
docs = list(self.conversation_collection.aggregate(pipeline))
data = []
for doc in docs:
conversation_id = doc.get("conversation_id", "Unknown")
user_id = doc.get("user_id", "N/A")
icp_region = doc.get("icp_region", "N/A")
icp_country = doc.get("icp_country", "N/A")
team_size = doc.get("team_size", "N/A")
summary = doc.get("summary", "No summary available")
follow_up_email = doc.get("follow_up_email", "No follow-up email available")
# Format customer info into a single column
customer_info_parts = []
if user_id != "N/A":
customer_info_parts.append(f"User: {user_id}")
if icp_region != "N/A":
customer_info_parts.append(f"Region: {icp_region}")
if icp_country != "N/A":
customer_info_parts.append(f"Country: {icp_country}")
if team_size != "N/A":
customer_info_parts.append(f"Team Size: {team_size}")
customer_info = "\n".join(customer_info_parts) if customer_info_parts else "No customer info available"
# Format key findings
key_findings = doc.get("key_findings", [])
if key_findings and isinstance(key_findings, list):
findings_text = "\n".join([f"β€’ {finding.get('finding', '')}" for finding in key_findings[:3]]) # Limit to 3 findings
if len(key_findings) > 3:
findings_text += f"\n... and {len(key_findings) - 3} more"
else:
findings_text = "No key findings available"
# Truncate summary for table display
if len(summary) > 200:
summary = summary[:200] + "..."
# Truncate follow-up email for table display
if len(follow_up_email) > 150:
follow_up_email = follow_up_email[:150] + "..."
data.append({
"Conversation ID": conversation_id,
"Customer Info": customer_info,
"Summary": summary,
"Key Findings": findings_text,
"Follow-up Email": follow_up_email
})
return pd.DataFrame(data)
except Exception as e:
print(f"Error loading conversations: {e}")
return pd.DataFrame(columns=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"])
def filter_conversations_by_sources(self, sources: List[Dict]) -> pd.DataFrame:
"""Filter conversations to show only those used in the current query."""
if not sources or self.conversation_collection is None:
return self.load_conversations()
try:
# Extract conversation IDs from sources
source_conversation_ids = set()
print(f"DEBUG: Filtering conversations based on {len(sources)} sources")
for source in sources:
print(f"DEBUG: Processing source: {source}")
# Try to extract conversation ID from various possible fields
doc_id = source.get("id", "")
title = source.get("title", "")
# Method 1: Try to extract conversation ID from title (if it contains conversation ID)
if title and "conversation" in title.lower():
# Look for conversation ID pattern in title
import re
conv_id_match = re.search(r'conversation[_\s]*(\d+)', title, re.IGNORECASE)
if conv_id_match:
conv_id = conv_id_match.group(1)
source_conversation_ids.add(conv_id)
print(f"DEBUG: Found conversation ID from title: {conv_id}")
continue
# Method 2: Query the RAG collection to find the conversation ID for this document
if doc_id:
try:
# Use the correct collection name for RAG data
rag_collection = self.database["rag_conversations"]
# Try different query patterns
doc = None
# Try by _id if it's a valid ObjectId
if doc_id.isdigit():
doc = rag_collection.find_one({"_id": int(doc_id)})
if not doc:
# Try by properties.conversation_id
doc = rag_collection.find_one({"properties.conversation_id": doc_id})
if not doc:
# Try by conversation_id in properties
doc = rag_collection.find_one({"properties.conversation_id": str(doc_id)})
if doc and "properties" in doc and "conversation_id" in doc["properties"]:
conv_id = doc["properties"]["conversation_id"]
if conv_id:
source_conversation_ids.add(str(conv_id))
print(f"DEBUG: Found conversation ID from RAG query: {conv_id}")
else:
print(f"DEBUG: No conversation ID found for doc_id: {doc_id}")
except Exception as e:
print(f"DEBUG: Error querying RAG collection for doc_id {doc_id}: {e}")
print(f"DEBUG: Found {len(source_conversation_ids)} unique conversation IDs: {source_conversation_ids}")
if not source_conversation_ids:
print("DEBUG: No conversation IDs found, returning all conversations")
return self.load_conversations()
# Query for conversations that match the source conversation IDs
pipeline = [
{"$match": {
"conversation_analysis": {"$exists": True},
"metadata.properties.conversation_id": {"$in": list(source_conversation_ids)}
}},
{"$project": {
"conversation_id": "$metadata.properties.conversation_id",
"user_id": "$metadata.properties.user_id",
"icp_region": "$metadata.properties.icp_region",
"icp_country": "$metadata.properties.icp_country",
"team_size": "$metadata.properties.team_size",
"summary": "$conversation_analysis.aggregated_contextual_summary",
"key_findings": "$conversation_analysis.aggregated_marketing_insights.key_findings",
"follow_up_email": "$conversation_analysis.follow_up_email"
}}
]
docs = list(self.conversation_collection.aggregate(pipeline))
print(f"DEBUG: Found {len(docs)} matching conversation documents")
data = []
for doc in docs:
conversation_id = doc.get("conversation_id", "Unknown")
user_id = doc.get("user_id", "N/A")
icp_region = doc.get("icp_region", "N/A")
icp_country = doc.get("icp_country", "N/A")
team_size = doc.get("team_size", "N/A")
summary = doc.get("summary", "No summary available")
follow_up_email = doc.get("follow_up_email", "No follow-up email available")
# Format customer info into a single column
customer_info_parts = []
if user_id != "N/A":
customer_info_parts.append(f"User: {user_id}")
if icp_region != "N/A":
customer_info_parts.append(f"Region: {icp_region}")
if icp_country != "N/A":
customer_info_parts.append(f"Country: {icp_country}")
if team_size != "N/A":
customer_info_parts.append(f"Team Size: {team_size}")
customer_info = "\n".join(customer_info_parts) if customer_info_parts else "No customer info available"
# Format key findings
key_findings = doc.get("key_findings", [])
if key_findings and isinstance(key_findings, list):
findings_text = "\n".join([f"β€’ {finding.get('finding', '')}" for finding in key_findings[:3]])
if len(key_findings) > 3:
findings_text += f"\n... and {len(key_findings) - 3} more"
else:
findings_text = "No key findings available"
# Truncate summary for table display
if len(summary) > 200:
summary = summary[:200] + "..."
# Truncate follow-up email for table display
if len(follow_up_email) > 150:
follow_up_email = follow_up_email[:150] + "..."
data.append({
"Conversation ID": conversation_id,
"Customer Info": customer_info,
"Summary": summary,
"Key Findings": findings_text,
"Follow-up Email": follow_up_email
})
print(f"DEBUG: Returning {len(data)} filtered conversations")
return pd.DataFrame(data)
except Exception as e:
print(f"Error filtering conversations: {e}")
import traceback
traceback.print_exc()
return self.load_conversations()
def filter_conversations(self, search_term: str) -> pd.DataFrame:
"""Filter conversations based on search term."""
if not search_term or not search_term.strip():
return self.load_conversations()
try:
# Load all conversations first
all_conversations = self.load_conversations(limit=1000) # Load more for filtering
if all_conversations.empty:
return all_conversations
# Convert search term to lowercase for case-insensitive search
search_lower = search_term.lower().strip()
# Filter conversations based on search term
filtered_data = []
for _, row in all_conversations.iterrows():
# Search in conversation ID, customer info, summary, key findings, and follow-up email
conversation_id = str(row.get("Conversation ID", "")).lower()
customer_info = str(row.get("Customer Info", "")).lower()
summary = str(row.get("Summary", "")).lower()
key_findings = str(row.get("Key Findings", "")).lower()
follow_up_email = str(row.get("Follow-up Email", "")).lower()
# Check if search term matches any field
if (search_lower in conversation_id or
search_lower in customer_info or
search_lower in summary or
search_lower in key_findings or
search_lower in follow_up_email):
filtered_data.append(row.to_dict())
return pd.DataFrame(filtered_data)
except Exception as e:
print(f"Error filtering conversations: {e}")
return self.load_conversations()
def clear_conversation_search(self) -> Tuple[str, pd.DataFrame]:
"""Clear the search and show all conversations."""
return "", self.load_conversations()
def reset_ui_state(self) -> Tuple[str, str, str, str, pd.DataFrame]:
"""Reset the UI state to show all conversations and clear outputs."""
return "", "", "", "", self.load_conversations()
def launch(self, **kwargs):
"""Launch the Gradio interface."""
return self.interface.launch(**kwargs)