Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| from typing import Any, Dict, List, Tuple, Optional, Union | |
| from datetime import datetime | |
| import gradio as gr | |
| import pandas as pd | |
| from pymongo import MongoClient | |
| from smolagents import ToolCallingAgent | |
| from second_brain_online.config import settings | |
| class CustomGradioUI: | |
| """Custom Gradio UI for better formatting of agent responses with source attribution.""" | |
| def __init__(self, agent: Union[ToolCallingAgent, Any]): | |
| """Initialize the UI with either a ToolCallingAgent or AgentWrapper. | |
| Args: | |
| agent: Either a raw ToolCallingAgent or an AgentWrapper that wraps it. | |
| """ | |
| self.agent = agent | |
| self.mongodb_client = None | |
| self.database = None | |
| self.conversation_collection = None | |
| self.setup_mongodb() | |
| self.setup_ui() | |
| def setup_mongodb(self): | |
| """Setup MongoDB connection.""" | |
| try: | |
| self.mongodb_client = MongoClient(settings.MONGODB_URI) | |
| self.database = self.mongodb_client[settings.MONGODB_DATABASE_NAME] | |
| self.conversation_collection = self.database["test_conversation_documents"] | |
| print("β MongoDB connection established successfully") | |
| except Exception as e: | |
| print(f"β Failed to connect to MongoDB: {e}") | |
| self.mongodb_client = None | |
| self.database = None | |
| self.conversation_collection = None | |
| def setup_ui(self): | |
| """Setup the Gradio interface with custom components.""" | |
| with gr.Blocks( | |
| title="Second Brain AI Assistant", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .source-card { | |
| border: 1px solid #e0e0e0; | |
| border-radius: 8px; | |
| padding: 12px; | |
| margin: 8px 0; | |
| background-color: #f8f9fa; | |
| } | |
| .source-title { | |
| font-weight: bold; | |
| color: #2c3e50; | |
| margin-bottom: 4px; | |
| } | |
| .source-date { | |
| font-size: 0.9em; | |
| color: #6c757d; | |
| margin-bottom: 8px; | |
| } | |
| .answer-section { | |
| background-color: #ffffff; | |
| border: 1px solid #dee2e6; | |
| border-radius: 8px; | |
| padding: 16px; | |
| margin-bottom: 16px; | |
| } | |
| .tool-usage { | |
| background-color: #e3f2fd; | |
| border-left: 4px solid #2196f3; | |
| padding: 8px 12px; | |
| margin: 8px 0; | |
| border-radius: 4px; | |
| font-size: 0.9em; | |
| } | |
| """ | |
| ) as self.interface: | |
| gr.Markdown("# π§ Second Brain AI Assistant") | |
| gr.Markdown("Ask questions about your documents and get AI-powered insights with source attribution.") | |
| self.query_input = gr.Textbox( | |
| label="Ask a question", | |
| placeholder="What pricing objections were raised in the meetings?", | |
| lines=2 | |
| ) | |
| self.submit_btn = gr.Button("Ask", variant="primary", size="lg") | |
| with gr.Row(): | |
| with gr.Column(): | |
| self.answer_output = gr.HTML(label="Answer") | |
| with gr.Accordion("π Conversations", open=False): | |
| with gr.Row(): | |
| self.conversation_search = gr.Textbox( | |
| label="Search Conversations", | |
| placeholder="Search by conversation ID, customer info, summary, or key findings...", | |
| scale=4 | |
| ) | |
| self.clear_search_btn = gr.Button("Clear", scale=1) | |
| self.conversation_table = gr.Dataframe( | |
| headers=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"], | |
| datatype=["str", "str", "str", "str", "str"], | |
| interactive=False, | |
| label="Available Conversations", | |
| wrap=True, | |
| max_height=400, | |
| value=self.load_conversations() | |
| ) | |
| with gr.Accordion("π Sources", open=False): | |
| self.sources_output = gr.HTML(label="Sources") | |
| with gr.Accordion("π οΈ Tools Used", open=False): | |
| self.tools_output = gr.HTML(label="Tools Used") | |
| with gr.Accordion("π Debug: Raw Response", open=False): | |
| self.debug_output = gr.Textbox( | |
| label="Raw Agent Response", | |
| lines=10, | |
| max_lines=20, | |
| interactive=False | |
| ) | |
| # Event handlers | |
| self.submit_btn.click( | |
| fn=self.process_query, | |
| inputs=[self.query_input], | |
| outputs=[self.answer_output, self.sources_output, self.tools_output, self.debug_output, self.conversation_table], | |
| show_progress="full" # Show progress indicator | |
| ) | |
| self.query_input.submit( | |
| fn=self.process_query, | |
| inputs=[self.query_input], | |
| outputs=[self.answer_output, self.sources_output, self.tools_output, self.debug_output, self.conversation_table], | |
| show_progress="full" # Show progress indicator | |
| ) | |
| # Conversation search handlers | |
| self.conversation_search.change( | |
| fn=self.filter_conversations, | |
| inputs=[self.conversation_search], | |
| outputs=[self.conversation_table] | |
| ) | |
| self.clear_search_btn.click( | |
| fn=self.clear_conversation_search, | |
| inputs=[], | |
| outputs=[self.conversation_search, self.conversation_table] | |
| ) | |
| def process_query(self, query: str, progress=gr.Progress()) -> Tuple[str, str, str, str, pd.DataFrame]: | |
| """Process the user query and return formatted response components.""" | |
| if not query.strip(): | |
| # Clear all outputs when query is empty | |
| return "", "", "", "", self.load_conversations() | |
| try: | |
| # Show progress indicator with descriptive message | |
| progress(0, desc="π Starting query processing...") | |
| # Run the agent (this takes 30-60 seconds) | |
| # Use None for indeterminate progress during long operation | |
| progress(None, desc="π Searching knowledge base and retrieving documents...") | |
| result = self.agent.run(query) | |
| # Quick post-processing steps | |
| progress(0.8, desc="β¨ Displaying results...") | |
| # CRITICAL DEBUG: Print what result actually is | |
| print("\n" + "="*80) | |
| print("DEBUG: WHAT IS RESULT?") | |
| print("="*80) | |
| print(f"Type: {type(result)}") | |
| print(f"Is string? {isinstance(result, str)}") | |
| print(f"Has π Sources? {'π Sources' in str(result) if result else False}") | |
| print(f"First 1500 chars of result:\n{str(result)[:1500]}") | |
| print("="*80) | |
| # Convert result to string | |
| result_str = str(result) | |
| # Debug information | |
| print("\n" + "="*80) | |
| print("DEBUG: RAW AGENT RESULT") | |
| print("="*80) | |
| print(f"Type: {type(result)}") | |
| print(f"Full Content:\n{result_str}") | |
| print("="*80) | |
| # Extract tools used from agent logs (for Tools Used section) | |
| agent_logs = getattr(self.agent, 'logs', []) if hasattr(self.agent, 'logs') else [] | |
| tools_used = [] | |
| if agent_logs: | |
| for step in agent_logs: | |
| if hasattr(step, 'tool_calls') and step.tool_calls: | |
| for tool_call in step.tool_calls: | |
| if hasattr(tool_call, 'name'): | |
| tools_used.append(tool_call.name) | |
| tools_used = list(set(tools_used)) # Remove duplicates | |
| # Format the raw answer with proper HTML structure (no parsing, just formatting) | |
| answer_html = self._format_raw_answer(result_str) | |
| # Leave Sources section empty (already in the answer) | |
| sources_html = "" | |
| # Format tools | |
| tools_html = self.format_tools(tools_used) | |
| # Debug text | |
| debug_text = result_str | |
| # Show all conversations (no filtering since we're not parsing sources) | |
| progress(0.95, desc="π Loading conversations...") | |
| all_conversations = self.load_conversations() | |
| progress(1.0, desc="β Complete!") | |
| return answer_html, sources_html, tools_html, debug_text, all_conversations | |
| except Exception as e: | |
| error_msg = f"<div style='color: #dc3545; padding: 12px; border: 1px solid #f5c6cb; border-radius: 4px; background-color: #f8d7da;'>Error: {str(e)}</div>" | |
| return error_msg, "", "", str(e), self.load_conversations() | |
| def _format_raw_answer(self, answer: str) -> str: | |
| """Format the raw answer with basic HTML structure without parsing. | |
| Just converts markdown-style formatting to HTML and preserves the structure. | |
| """ | |
| if not answer: | |
| return "<div class='answer-section'><p>No answer provided.</p></div>" | |
| # Convert markdown bold to HTML bold | |
| answer = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', answer) | |
| # Convert line breaks to HTML | |
| answer = answer.replace('\n', '<br>') | |
| # Clean up multiple line breaks | |
| answer = re.sub(r'(<br>){3,}', '<br><br>', answer) | |
| return f""" | |
| <div class='answer-section'> | |
| <div style='line-height: 1.8; font-size: 16px; white-space: pre-wrap;'>{answer}</div> | |
| </div> | |
| """ | |
| def _parse_sources_from_text(self, sources_text: str) -> List[Dict]: | |
| """Parse sources from the formatted text output. | |
| Expected format: | |
| Doc 1: Title (Date) | |
| Source: Type | Document ID: ID | URL | User ID | |
| Summary: ... | |
| Key Findings: | |
| - [Type/Impact] Finding | |
| """ | |
| sources = [] | |
| # Split by "Doc X:" pattern | |
| doc_pattern = r'Doc\s+(\d+):\s*([^\n]+)' | |
| doc_matches = re.finditer(doc_pattern, sources_text) | |
| for match in doc_matches: | |
| doc_num = match.group(1) | |
| title_line = match.group(2).strip() | |
| # Find the next Doc or end of string | |
| start_pos = match.end() | |
| next_match = re.search(r'Doc\s+\d+:', sources_text[start_pos:]) | |
| if next_match: | |
| end_pos = start_pos + next_match.start() | |
| doc_content = sources_text[start_pos:end_pos] | |
| else: | |
| doc_content = sources_text[start_pos:] | |
| # Extract title and date from title line | |
| title_date_match = re.match(r'(.+?)\s*\(([^)]+)\)', title_line) | |
| if title_date_match: | |
| title = title_date_match.group(1).strip() | |
| date = title_date_match.group(2).strip() | |
| else: | |
| title = title_line | |
| date = "" | |
| # Extract document ID | |
| doc_id = "" | |
| id_match = re.search(r'Document ID:\s*([a-zA-Z0-9]+)', doc_content) | |
| if id_match: | |
| doc_id = id_match.group(1) | |
| # Extract summary | |
| summary = "" | |
| summary_match = re.search(r'Summary:\s*([^\n]+)', doc_content) | |
| if summary_match: | |
| summary = summary_match.group(1).strip() | |
| # Extract key findings | |
| key_findings = [] | |
| findings_section = re.search(r'Key Findings:\s*(.+?)(?=\n\nDoc\s+\d+:|$)', doc_content, re.DOTALL) | |
| if findings_section: | |
| findings_text = findings_section.group(1) | |
| # Extract each finding line | |
| finding_lines = re.findall(r'-\s*\[([^\]]+)\]\s*([^\n]+)', findings_text) | |
| for finding_type, finding_text in finding_lines: | |
| key_findings.append(f"[{finding_type}] {finding_text.strip()}") | |
| sources.append({ | |
| "id": doc_id, | |
| "title": title, | |
| "date": date, | |
| "summary": summary, | |
| "key_findings": key_findings, | |
| "quotes": [] # Not using quotes in new format | |
| }) | |
| return sources | |
| def parse_agent_response(self, result: Any, agent_logs: List = None) -> Tuple[str, List[Dict], List[str]]: | |
| """Parse the agent response to extract answer, sources, and tools used.""" | |
| answer = "" | |
| sources = [] | |
| tools_used = [] | |
| # Convert result to string if it's not already | |
| result_str = str(result) | |
| # Extract the answer from the result | |
| # Pattern 1: JSON format with "answer" key | |
| json_match = re.search(r'{"answer":\s*"([^"]+)"}', result_str) | |
| if json_match: | |
| answer = json_match.group(1) | |
| # Unescape the JSON string | |
| answer = answer.replace('\\n', '\n').replace('\\"', '"') | |
| else: | |
| # Pattern 2: Look for "Final answer:" followed by content | |
| final_answer_match = re.search(r'Final answer:\s*(.+?)(?=\n\n|\Z)', result_str, re.DOTALL) | |
| if final_answer_match: | |
| answer = final_answer_match.group(1).strip() | |
| # Try to extract JSON from final answer | |
| json_in_final = re.search(r'{"answer":\s*"([^"]+)"}', answer) | |
| if json_in_final: | |
| answer = json_in_final.group(1).replace('\\n', '\n').replace('\\"', '"') | |
| else: | |
| # Pattern 3: Use the entire result as answer if no specific pattern matches | |
| answer = result_str | |
| # NEW: Split answer and sources section | |
| # Look for the Sources section marker (π Sources:) | |
| sources_split = re.split(r'π\s*Sources:?', answer, maxsplit=1, flags=re.IGNORECASE) | |
| if len(sources_split) == 2: | |
| # We found a Sources section | |
| answer_only = sources_split[0].strip() | |
| sources_text = sources_split[1].strip() | |
| # Parse sources from the text | |
| sources = self._parse_sources_from_text(sources_text) | |
| # Update answer to only include the answer part | |
| answer = answer_only | |
| else: | |
| # No sources section found, answer remains as-is | |
| pass | |
| # If we have agent logs, extract tools and sources from them | |
| if agent_logs: | |
| for step in agent_logs: | |
| # Extract tool calls | |
| if hasattr(step, 'tool_calls') and step.tool_calls: | |
| for tool_call in step.tool_calls: | |
| if hasattr(tool_call, 'name'): | |
| tools_used.append(tool_call.name) | |
| # Extract sources from observations | |
| if hasattr(step, 'observations') and step.observations: | |
| print(f"DEBUG: Processing observations: {step.observations[:500]}...") | |
| # Look for complete document blocks with all content | |
| document_pattern = r'<document id="(\d+)">\s*<title>(.*?)</title>\s*<date>(.*?)</date>\s*<contextual_summary>(.*?)</contextual_summary>\s*<marketing_insights>(.*?)</marketing_insights>\s*<content>(.*?)</content>' | |
| document_matches = re.findall(document_pattern, step.observations, re.DOTALL) | |
| print(f"DEBUG: Found {len(document_matches)} document matches with full pattern") | |
| for doc_id, doc_title, doc_date, contextual_summary, marketing_insights, content in document_matches: | |
| # Clean up the basic fields | |
| clean_title = doc_title.strip() | |
| clean_date = doc_date.strip() | |
| clean_summary = contextual_summary.strip() | |
| # Extract key findings from marketing insights | |
| key_findings = [] | |
| key_findings_pattern = r'<key_findings>(.*?)</key_findings>' | |
| key_findings_match = re.search(key_findings_pattern, marketing_insights, re.DOTALL) | |
| if key_findings_match: | |
| key_findings_text = key_findings_match.group(1).strip() | |
| # Split by lines and clean up | |
| key_findings = [line.strip() for line in key_findings_text.split('\n') if line.strip() and line.strip().startswith('-')] | |
| # Extract quotes from marketing insights | |
| quotes = [] | |
| quotes_pattern = r'<quotes>(.*?)</quotes>' | |
| quotes_match = re.search(quotes_pattern, marketing_insights, re.DOTALL) | |
| if quotes_match: | |
| quotes_text = quotes_match.group(1).strip() | |
| # Split by lines and clean up | |
| quotes = [line.strip() for line in quotes_text.split('\n') if line.strip() and line.strip().startswith('-')] | |
| sources.append({ | |
| "id": doc_id, | |
| "title": clean_title, | |
| "date": clean_date, | |
| "summary": clean_summary, | |
| "key_findings": key_findings, | |
| "quotes": quotes | |
| }) | |
| # Fallback: Look for simpler document patterns if the full pattern didn't match | |
| if not document_matches: | |
| print("DEBUG: Trying fallback document patterns...") | |
| # Pattern 1: Simple document with ID and title | |
| simple_pattern = r'<document id="(\d+)">\s*<title>(.*?)</title>' | |
| simple_matches = re.findall(simple_pattern, step.observations, re.DOTALL) | |
| print(f"DEBUG: Found {len(simple_matches)} simple document matches") | |
| for doc_id, doc_title in simple_matches: | |
| sources.append({ | |
| "id": doc_id, | |
| "title": doc_title.strip(), | |
| "date": "", | |
| "summary": "", | |
| "key_findings": [], | |
| "quotes": [] | |
| }) | |
| # Pattern 2: Look for conversation IDs in the content | |
| conv_id_pattern = r'conversation[_\s]*id[:\s]*(\d+)' | |
| conv_id_matches = re.findall(conv_id_pattern, step.observations, re.IGNORECASE) | |
| print(f"DEBUG: Found {len(conv_id_matches)} conversation ID matches: {conv_id_matches}") | |
| for conv_id in conv_id_matches: | |
| sources.append({ | |
| "id": conv_id, | |
| "title": f"Conversation {conv_id}", | |
| "date": "", | |
| "summary": "", | |
| "key_findings": [], | |
| "quotes": [] | |
| }) | |
| # Fallback: Try to extract from result string if no logs provided | |
| if not agent_logs: | |
| # Extract tool usage from the result first | |
| # Pattern 1: π οΈ Used tool toolname | |
| tool_pattern1 = r'π οΈ Used tool (\w+)' | |
| tool_matches1 = re.findall(tool_pattern1, result_str) | |
| # Pattern 2: Calling tool: 'toolname' (with single quotes) | |
| tool_pattern2 = r"Calling tool:\s*'([^']+)'" | |
| tool_matches2 = re.findall(tool_pattern2, result_str) | |
| # Pattern 3: Calling tool: 'toolname' (with double quotes) | |
| tool_pattern3 = r'Calling tool:\s*"([^"]+)"' | |
| tool_matches3 = re.findall(tool_pattern3, result_str) | |
| # Pattern 4: Calling tool: toolname (without quotes) | |
| tool_pattern4 = r'Calling tool:\s*([a-zA-Z_][a-zA-Z0-9_]*)' | |
| tool_matches4 = re.findall(tool_pattern4, result_str) | |
| # Combine all patterns | |
| all_tool_matches = tool_matches1 + tool_matches2 + tool_matches3 + tool_matches4 | |
| tools_used = list(set(all_tool_matches)) # Remove duplicates | |
| # Extract sources from the structured search_results format | |
| # Look for <document> tags in the search results | |
| document_pattern = r'<document id="(\d+)">\s*<title>(.*?)</title>\s*<date>(.*?)</date>' | |
| document_matches = re.findall(document_pattern, result_str, re.DOTALL) | |
| for doc_id, doc_title, doc_date in document_matches: | |
| # Clean up the title and date | |
| clean_title = doc_title.strip() | |
| clean_date = doc_date.strip() | |
| sources.append({ | |
| "id": doc_id, | |
| "title": clean_title, | |
| "date": clean_date | |
| }) | |
| # Remove duplicates based on document ID (keep all unique documents) | |
| unique_sources = [] | |
| seen = set() | |
| for source in sources: | |
| # Use document ID as the unique key, fallback to title+date if no ID | |
| key = source.get("id", f"{source['title']}_{source['date']}") | |
| if key not in seen: | |
| seen.add(key) | |
| unique_sources.append(source) | |
| # Remove duplicate tools | |
| tools_used = list(set(tools_used)) | |
| return answer, unique_sources, tools_used | |
| def format_answer(self, answer: str) -> str: | |
| """Format the answer with proper HTML structure.""" | |
| if not answer: | |
| return "<div class='answer-section'><p>No answer provided.</p></div>" | |
| # Check if the answer is a JSON string and extract the actual answer | |
| if answer.strip().startswith('{"answer":') and answer.strip().endswith('}'): | |
| try: | |
| import json | |
| answer_data = json.loads(answer) | |
| if isinstance(answer_data, dict) and 'answer' in answer_data: | |
| answer = answer_data['answer'] | |
| except (json.JSONDecodeError, KeyError): | |
| # If JSON parsing fails, use the original answer | |
| pass | |
| # Remove source references from the answer text for cleaner display | |
| answer = re.sub(r'\(Document:[^)]+\)', '', answer) | |
| # Clean up extra whitespace but preserve intentional line breaks | |
| answer = re.sub(r'[ \t]+', ' ', answer) # Replace multiple spaces/tabs with single space | |
| answer = re.sub(r' *\n *', '\n', answer) # Clean up spaces around newlines | |
| # Format numbered lists and bullet points | |
| answer = re.sub(r'\n\s*\d+\.\s*', '\n\n<strong>', answer) # Numbered lists | |
| answer = re.sub(r'\n\s*β’\s*', '\nβ’ ', answer) # Bullet points | |
| answer = re.sub(r'\n\s*-\s*', '\nβ’ ', answer) # Dash points | |
| # Format bold text (markdown style) | |
| answer = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', answer) | |
| # Convert line breaks to HTML | |
| answer = answer.replace('\n', '<br>') | |
| # Clean up multiple line breaks | |
| answer = re.sub(r'(<br>){3,}', '<br><br>', answer) | |
| return f""" | |
| <div class='answer-section'> | |
| <h3>π Answer</h3> | |
| <div style='line-height: 1.6; font-size: 16px;'>{answer}</div> | |
| </div> | |
| """ | |
| def format_sources(self, sources: List[Dict]) -> str: | |
| """Format the sources with rich information including key findings and marketing insights.""" | |
| if not sources: | |
| return "<div><p>No sources found.</p></div>" | |
| sources_html = "<div>" | |
| for i, source in enumerate(sources, 1): | |
| title = source.get("title", "Unknown") | |
| date = source.get("date", "Unknown") | |
| doc_id = source.get("id", "") | |
| summary = source.get("summary", "") | |
| key_findings = source.get("key_findings", []) | |
| quotes = source.get("quotes", []) | |
| sources_html += f""" | |
| <div class='source-card' style='margin-bottom: 20px; padding: 15px; border: 1px solid #e0e0e0; border-radius: 8px; background-color: #f9f9f9;'> | |
| <div class='source-title' style='font-weight: bold; font-size: 16px; margin-bottom: 8px;'>{i}. {title}</div> | |
| <div class='source-meta' style='color: #666; margin-bottom: 10px;'> | |
| π {date} | |
| {f" | ID: {doc_id}" if doc_id else ""} | |
| </div> | |
| """ | |
| if summary: | |
| sources_html += f""" | |
| <div class='source-summary' style='margin-bottom: 10px;'> | |
| <strong>Summary:</strong> {summary} | |
| </div> | |
| """ | |
| if key_findings: | |
| sources_html += """ | |
| <div class='source-findings' style='margin-bottom: 10px;'> | |
| <strong>Key Findings:</strong> | |
| <ul style='margin: 5px 0; padding-left: 20px;'> | |
| """ | |
| for finding in key_findings: | |
| clean_finding = finding.lstrip('- ').strip() | |
| sources_html += f"<li style='margin-bottom: 3px;'>{clean_finding}</li>" | |
| sources_html += "</ul></div>" | |
| if quotes: | |
| sources_html += """ | |
| <div class='source-quotes' style='margin-bottom: 10px;'> | |
| <strong>Key Quotes:</strong> | |
| <ul style='margin: 5px 0; padding-left: 20px;'> | |
| """ | |
| for quote in quotes: | |
| clean_quote = quote.lstrip('- ').strip() | |
| sources_html += f"<li style='margin-bottom: 3px; font-style: italic; color: #555;'>{clean_quote}</li>" | |
| sources_html += "</ul></div>" | |
| sources_html += "</div>" | |
| sources_html += "</div>" | |
| return sources_html | |
| def format_tools(self, tools_used: List[str]) -> str: | |
| """Format the tools used with proper HTML structure.""" | |
| if not tools_used: | |
| return "<div><p>No tools used.</p></div>" | |
| tools_html = "<div>" | |
| for tool in tools_used: | |
| tools_html += f""" | |
| <div class='tool-usage'> | |
| π§ {tool.replace('_', ' ').title()} | |
| </div> | |
| """ | |
| tools_html += "</div>" | |
| return tools_html | |
| def load_conversations(self, limit: int = 50) -> pd.DataFrame: | |
| """Load conversations from MongoDB and format for display.""" | |
| if self.conversation_collection is None: | |
| return pd.DataFrame(columns=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"]) | |
| try: | |
| # Query for documents with conversation_analysis | |
| pipeline = [ | |
| {"$match": {"conversation_analysis": {"$exists": True}}}, | |
| {"$limit": limit}, | |
| {"$project": { | |
| "conversation_id": "$metadata.properties.conversation_id", | |
| "user_id": "$metadata.properties.user_id", | |
| "icp_region": "$metadata.properties.icp_region", | |
| "icp_country": "$metadata.properties.icp_country", | |
| "team_size": "$metadata.properties.team_size", | |
| "summary": "$conversation_analysis.aggregated_contextual_summary", | |
| "key_findings": "$conversation_analysis.aggregated_marketing_insights.key_findings", | |
| "follow_up_email": "$conversation_analysis.follow_up_email" | |
| }} | |
| ] | |
| docs = list(self.conversation_collection.aggregate(pipeline)) | |
| data = [] | |
| for doc in docs: | |
| conversation_id = doc.get("conversation_id", "Unknown") | |
| user_id = doc.get("user_id", "N/A") | |
| icp_region = doc.get("icp_region", "N/A") | |
| icp_country = doc.get("icp_country", "N/A") | |
| team_size = doc.get("team_size", "N/A") | |
| summary = doc.get("summary", "No summary available") | |
| follow_up_email = doc.get("follow_up_email", "No follow-up email available") | |
| # Format customer info into a single column | |
| customer_info_parts = [] | |
| if user_id != "N/A": | |
| customer_info_parts.append(f"User: {user_id}") | |
| if icp_region != "N/A": | |
| customer_info_parts.append(f"Region: {icp_region}") | |
| if icp_country != "N/A": | |
| customer_info_parts.append(f"Country: {icp_country}") | |
| if team_size != "N/A": | |
| customer_info_parts.append(f"Team Size: {team_size}") | |
| customer_info = "\n".join(customer_info_parts) if customer_info_parts else "No customer info available" | |
| # Format key findings | |
| key_findings = doc.get("key_findings", []) | |
| if key_findings and isinstance(key_findings, list): | |
| findings_text = "\n".join([f"β’ {finding.get('finding', '')}" for finding in key_findings[:3]]) # Limit to 3 findings | |
| if len(key_findings) > 3: | |
| findings_text += f"\n... and {len(key_findings) - 3} more" | |
| else: | |
| findings_text = "No key findings available" | |
| # Truncate summary for table display | |
| if len(summary) > 200: | |
| summary = summary[:200] + "..." | |
| # Truncate follow-up email for table display | |
| if len(follow_up_email) > 150: | |
| follow_up_email = follow_up_email[:150] + "..." | |
| data.append({ | |
| "Conversation ID": conversation_id, | |
| "Customer Info": customer_info, | |
| "Summary": summary, | |
| "Key Findings": findings_text, | |
| "Follow-up Email": follow_up_email | |
| }) | |
| return pd.DataFrame(data) | |
| except Exception as e: | |
| print(f"Error loading conversations: {e}") | |
| return pd.DataFrame(columns=["Conversation ID", "Customer Info", "Summary", "Key Findings", "Follow-up Email"]) | |
| def filter_conversations_by_sources(self, sources: List[Dict]) -> pd.DataFrame: | |
| """Filter conversations to show only those used in the current query.""" | |
| if not sources or self.conversation_collection is None: | |
| return self.load_conversations() | |
| try: | |
| # Extract conversation IDs from sources | |
| source_conversation_ids = set() | |
| print(f"DEBUG: Filtering conversations based on {len(sources)} sources") | |
| for source in sources: | |
| print(f"DEBUG: Processing source: {source}") | |
| # Try to extract conversation ID from various possible fields | |
| doc_id = source.get("id", "") | |
| title = source.get("title", "") | |
| # Method 1: Try to extract conversation ID from title (if it contains conversation ID) | |
| if title and "conversation" in title.lower(): | |
| # Look for conversation ID pattern in title | |
| import re | |
| conv_id_match = re.search(r'conversation[_\s]*(\d+)', title, re.IGNORECASE) | |
| if conv_id_match: | |
| conv_id = conv_id_match.group(1) | |
| source_conversation_ids.add(conv_id) | |
| print(f"DEBUG: Found conversation ID from title: {conv_id}") | |
| continue | |
| # Method 2: Query the RAG collection to find the conversation ID for this document | |
| if doc_id: | |
| try: | |
| # Use the correct collection name for RAG data | |
| rag_collection = self.database["rag_conversations"] | |
| # Try different query patterns | |
| doc = None | |
| # Try by _id if it's a valid ObjectId | |
| if doc_id.isdigit(): | |
| doc = rag_collection.find_one({"_id": int(doc_id)}) | |
| if not doc: | |
| # Try by properties.conversation_id | |
| doc = rag_collection.find_one({"properties.conversation_id": doc_id}) | |
| if not doc: | |
| # Try by conversation_id in properties | |
| doc = rag_collection.find_one({"properties.conversation_id": str(doc_id)}) | |
| if doc and "properties" in doc and "conversation_id" in doc["properties"]: | |
| conv_id = doc["properties"]["conversation_id"] | |
| if conv_id: | |
| source_conversation_ids.add(str(conv_id)) | |
| print(f"DEBUG: Found conversation ID from RAG query: {conv_id}") | |
| else: | |
| print(f"DEBUG: No conversation ID found for doc_id: {doc_id}") | |
| except Exception as e: | |
| print(f"DEBUG: Error querying RAG collection for doc_id {doc_id}: {e}") | |
| print(f"DEBUG: Found {len(source_conversation_ids)} unique conversation IDs: {source_conversation_ids}") | |
| if not source_conversation_ids: | |
| print("DEBUG: No conversation IDs found, returning all conversations") | |
| return self.load_conversations() | |
| # Query for conversations that match the source conversation IDs | |
| pipeline = [ | |
| {"$match": { | |
| "conversation_analysis": {"$exists": True}, | |
| "metadata.properties.conversation_id": {"$in": list(source_conversation_ids)} | |
| }}, | |
| {"$project": { | |
| "conversation_id": "$metadata.properties.conversation_id", | |
| "user_id": "$metadata.properties.user_id", | |
| "icp_region": "$metadata.properties.icp_region", | |
| "icp_country": "$metadata.properties.icp_country", | |
| "team_size": "$metadata.properties.team_size", | |
| "summary": "$conversation_analysis.aggregated_contextual_summary", | |
| "key_findings": "$conversation_analysis.aggregated_marketing_insights.key_findings", | |
| "follow_up_email": "$conversation_analysis.follow_up_email" | |
| }} | |
| ] | |
| docs = list(self.conversation_collection.aggregate(pipeline)) | |
| print(f"DEBUG: Found {len(docs)} matching conversation documents") | |
| data = [] | |
| for doc in docs: | |
| conversation_id = doc.get("conversation_id", "Unknown") | |
| user_id = doc.get("user_id", "N/A") | |
| icp_region = doc.get("icp_region", "N/A") | |
| icp_country = doc.get("icp_country", "N/A") | |
| team_size = doc.get("team_size", "N/A") | |
| summary = doc.get("summary", "No summary available") | |
| follow_up_email = doc.get("follow_up_email", "No follow-up email available") | |
| # Format customer info into a single column | |
| customer_info_parts = [] | |
| if user_id != "N/A": | |
| customer_info_parts.append(f"User: {user_id}") | |
| if icp_region != "N/A": | |
| customer_info_parts.append(f"Region: {icp_region}") | |
| if icp_country != "N/A": | |
| customer_info_parts.append(f"Country: {icp_country}") | |
| if team_size != "N/A": | |
| customer_info_parts.append(f"Team Size: {team_size}") | |
| customer_info = "\n".join(customer_info_parts) if customer_info_parts else "No customer info available" | |
| # Format key findings | |
| key_findings = doc.get("key_findings", []) | |
| if key_findings and isinstance(key_findings, list): | |
| findings_text = "\n".join([f"β’ {finding.get('finding', '')}" for finding in key_findings[:3]]) | |
| if len(key_findings) > 3: | |
| findings_text += f"\n... and {len(key_findings) - 3} more" | |
| else: | |
| findings_text = "No key findings available" | |
| # Truncate summary for table display | |
| if len(summary) > 200: | |
| summary = summary[:200] + "..." | |
| # Truncate follow-up email for table display | |
| if len(follow_up_email) > 150: | |
| follow_up_email = follow_up_email[:150] + "..." | |
| data.append({ | |
| "Conversation ID": conversation_id, | |
| "Customer Info": customer_info, | |
| "Summary": summary, | |
| "Key Findings": findings_text, | |
| "Follow-up Email": follow_up_email | |
| }) | |
| print(f"DEBUG: Returning {len(data)} filtered conversations") | |
| return pd.DataFrame(data) | |
| except Exception as e: | |
| print(f"Error filtering conversations: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return self.load_conversations() | |
| def filter_conversations(self, search_term: str) -> pd.DataFrame: | |
| """Filter conversations based on search term.""" | |
| if not search_term or not search_term.strip(): | |
| return self.load_conversations() | |
| try: | |
| # Load all conversations first | |
| all_conversations = self.load_conversations(limit=1000) # Load more for filtering | |
| if all_conversations.empty: | |
| return all_conversations | |
| # Convert search term to lowercase for case-insensitive search | |
| search_lower = search_term.lower().strip() | |
| # Filter conversations based on search term | |
| filtered_data = [] | |
| for _, row in all_conversations.iterrows(): | |
| # Search in conversation ID, customer info, summary, key findings, and follow-up email | |
| conversation_id = str(row.get("Conversation ID", "")).lower() | |
| customer_info = str(row.get("Customer Info", "")).lower() | |
| summary = str(row.get("Summary", "")).lower() | |
| key_findings = str(row.get("Key Findings", "")).lower() | |
| follow_up_email = str(row.get("Follow-up Email", "")).lower() | |
| # Check if search term matches any field | |
| if (search_lower in conversation_id or | |
| search_lower in customer_info or | |
| search_lower in summary or | |
| search_lower in key_findings or | |
| search_lower in follow_up_email): | |
| filtered_data.append(row.to_dict()) | |
| return pd.DataFrame(filtered_data) | |
| except Exception as e: | |
| print(f"Error filtering conversations: {e}") | |
| return self.load_conversations() | |
| def clear_conversation_search(self) -> Tuple[str, pd.DataFrame]: | |
| """Clear the search and show all conversations.""" | |
| return "", self.load_conversations() | |
| def reset_ui_state(self) -> Tuple[str, str, str, str, pd.DataFrame]: | |
| """Reset the UI state to show all conversations and clear outputs.""" | |
| return "", "", "", "", self.load_conversations() | |
| def launch(self, **kwargs): | |
| """Launch the Gradio interface.""" | |
| return self.interface.launch(**kwargs) | |