Spaces:
Sleeping
Sleeping
from typing import TypedDict, List, Dict, Optional, Any | |
from typing_extensions import List, TypedDict | |
from dotenv import load_dotenv | |
import chainlit as cl | |
import os | |
import asyncio | |
import base64 | |
import requests | |
import time | |
import datetime | |
import random | |
import string | |
import fpdf | |
from pathlib import Path | |
# Re-enable the Tavily search tool | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_core.documents import Document | |
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage | |
from langchain_openai import ChatOpenAI | |
# from langchain_core.language_models import FakeListLLM # Add FakeListLLM for testing | |
from langgraph.graph import StateGraph, END | |
from openai import OpenAI, AsyncOpenAI | |
# Import InsightFlow components | |
from insight_state import InsightFlowState | |
from utils.persona import PersonaFactory, PersonaReasoning | |
# Load environment variables | |
load_dotenv() | |
# Initialize OpenAI client for DALL-E | |
openai_client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
# --- INITIALIZE CORE COMPONENTS --- | |
# Re-enable search tool initialization | |
tavily_tool = TavilySearchResults(max_results=3) | |
# Initialize LLMs with optimized settings for speed | |
llm_planner = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1, request_timeout=20) | |
llm_analytical = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2, request_timeout=20) | |
llm_scientific = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3, request_timeout=20) | |
llm_philosophical = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.4, request_timeout=20) | |
llm_factual = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3, request_timeout=20) | |
llm_metaphorical = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.6, request_timeout=20) | |
llm_futuristic = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.5, request_timeout=20) | |
llm_synthesizer = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2, request_timeout=20) | |
# Direct mode LLM with slightly higher quality | |
llm_direct = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3, request_timeout=25) | |
# --- SYSTEM PROMPTS --- | |
PLANNER_SYSPROMPT = """You are an expert planner agent that coordinates research across multiple personas. | |
Given a user query, your task is to create a research plan with specific sub-tasks for each selected persona. | |
Break down complex queries into specific tasks that leverage each persona's unique perspective. | |
""" | |
SYNTHESIZER_SYSPROMPT = """You are a synthesis expert that combines multiple perspectives into a coherent response. | |
Given different persona perspectives on the same query, create a unified response that: | |
1. Highlights unique insights from each perspective | |
2. Notes areas of agreement and divergence | |
3. Organizes information logically for the user | |
Present the final response in a cohesive format that integrates all perspectives. | |
""" | |
DIRECT_SYSPROMPT = """You are a highly intelligent AI assistant that provides clear, direct, and helpful answers. | |
Your responses should be accurate, concise, and well-reasoned. | |
""" | |
# --- LANGGRAPH NODES FOR INSIGHTFLOW AI --- | |
async def run_planner_agent(state: InsightFlowState) -> InsightFlowState: | |
"""Plan the research approach for multiple personas""" | |
query = state["query"] | |
selected_personas = state["selected_personas"] | |
# For the MVP implementation, we'll use a simplified planning approach | |
# that just assigns the same query to each selected persona | |
# In a full implementation, the planner would create custom tasks for each persona | |
print(f"Planning research for query: {query}") | |
print(f"Selected personas: {selected_personas}") | |
state["current_step_name"] = "execute_persona_tasks" | |
return state | |
async def execute_persona_tasks(state: InsightFlowState) -> InsightFlowState: | |
"""Execute tasks for each selected persona""" | |
query = state["query"] | |
selected_personas = state["selected_personas"] | |
persona_factory = cl.user_session.get("persona_factory") | |
# Initialize responses dict if not exists | |
if "persona_responses" not in state: | |
state["persona_responses"] = {} | |
print(f"Executing persona tasks for {len(selected_personas)} personas") | |
# Get progress message if it exists | |
progress_msg = cl.user_session.get("progress_msg") | |
total_personas = len(selected_personas) | |
# Process each persona with timeout safety | |
# Using asyncio.gather to run multiple persona tasks in parallel for speed | |
persona_tasks = [] | |
# First, create all personas and tasks | |
for persona_id in selected_personas: | |
persona = persona_factory.create_persona(persona_id) | |
if persona: | |
# Add progress message for user feedback | |
await cl.Message(content=f"Generating insights from {persona_id} perspective...").send() | |
# Create task to run in parallel | |
task = generate_perspective_with_timeout(persona, query) | |
persona_tasks.append((persona_id, task)) | |
# Run all perspective generations in parallel | |
completed = 0 | |
for persona_id, task in persona_tasks: | |
try: | |
# Update dynamic progress if progress message exists | |
if progress_msg: | |
percent_done = 40 + int((completed / total_personas) * 40) | |
await update_message( | |
progress_msg, | |
f"⏳ Generating perspective from {persona_id} ({percent_done}%)..." | |
) | |
response = await task | |
state["persona_responses"][persona_id] = response | |
print(f"Perspective generated for {persona_id}") | |
# Increment completed count | |
completed += 1 | |
except Exception as e: | |
print(f"Error getting {persona_id} perspective: {e}") | |
state["persona_responses"][persona_id] = f"Could not generate perspective: {str(e)}" | |
# Still increment completed count | |
completed += 1 | |
state["current_step_name"] = "synthesize_responses" | |
return state | |
async def generate_perspective_with_timeout(persona, query): | |
"""Generate a perspective with timeout handling""" | |
try: | |
# Set a timeout for each perspective generation | |
response = await asyncio.wait_for( | |
cl.make_async(persona.generate_perspective)(query), | |
timeout=30 # 30-second timeout (reduced for speed) | |
) | |
return response | |
except asyncio.TimeoutError: | |
# Handle timeout by providing a simplified response | |
return f"The perspective generation timed out. This may be due to high API traffic or complexity of the query." | |
except Exception as e: | |
# Handle other errors | |
return f"Error generating perspective: {str(e)}" | |
async def synthesize_responses(state: InsightFlowState) -> InsightFlowState: | |
"""Combine perspectives from different personas""" | |
query = state["query"] | |
persona_responses = state["persona_responses"] | |
if not persona_responses: | |
state["synthesized_response"] = "No persona perspectives were generated." | |
state["current_step_name"] = "present_results" | |
return state | |
print(f"Synthesizing responses from {len(persona_responses)} personas") | |
# Add progress message for user feedback | |
await cl.Message(content="Synthesizing insights from all perspectives...").send() | |
# Prepare input for synthesizer | |
perspectives_text = "" | |
for persona_id, response in persona_responses.items(): | |
perspectives_text += f"\n\n{persona_id.capitalize()} Perspective:\n{response}" | |
# Use LLM to synthesize with timeout | |
messages = [ | |
SystemMessage(content=SYNTHESIZER_SYSPROMPT), | |
HumanMessage(content=f"Query: {query}\n\nPerspectives:{perspectives_text}\n\nPlease synthesize these perspectives into a coherent response.") | |
] | |
try: | |
# Set a timeout for the synthesis | |
synthesizer_response = await asyncio.wait_for( | |
llm_synthesizer.ainvoke(messages), | |
timeout=30 # 30-second timeout (reduced for speed) | |
) | |
state["synthesized_response"] = synthesizer_response.content | |
print("Synthesis complete") | |
except asyncio.TimeoutError: | |
# Handle timeout for synthesis | |
state["synthesized_response"] = "The synthesis of perspectives timed out. Here are the individual perspectives instead." | |
print("Synthesis timed out") | |
except Exception as e: | |
print(f"Error synthesizing perspectives: {e}") | |
state["synthesized_response"] = f"Error synthesizing perspectives: {str(e)}" | |
state["current_step_name"] = "generate_visualization" | |
return state | |
async def generate_dalle_image(prompt: str) -> Optional[str]: | |
"""Generate a DALL-E image and return the URL""" | |
try: | |
# Create a detailed prompt for hand-drawn style visualization | |
full_prompt = f"Create a hand-drawn style visual note or sketch that represents: {prompt}. Make it look like a thoughtful drawing with annotations and key concepts highlighted. Include multiple perspectives connected together in a coherent visualization. Style: thoughtful hand-drawn sketch, notebook style with labels." | |
# Call DALL-E to generate the image | |
response = await openai_client.images.generate( | |
model="dall-e-3", | |
prompt=full_prompt, | |
size="1024x1024", | |
quality="standard", | |
n=1 | |
) | |
# Return the URL of the generated image | |
return response.data[0].url | |
except Exception as e: | |
print(f"DALL-E image generation failed: {e}") | |
return None | |
async def generate_visualization(state: InsightFlowState) -> InsightFlowState: | |
"""Generate a Mermaid diagram from the multiple perspectives""" | |
# Get progress message if available and update it | |
progress_msg = cl.user_session.get("progress_msg") | |
if progress_msg: | |
await update_message(progress_msg, "⏳ Generating visual representation (90%)...") | |
# Skip if no synthesized response or no personas | |
if not state.get("synthesized_response") or not state.get("persona_responses"): | |
state["current_step_name"] = "present_results" | |
return state | |
# Get visualization settings | |
show_visualization = cl.user_session.get("show_visualization", True) | |
visual_only_mode = cl.user_session.get("visual_only_mode", False) | |
# Determine if we should generate visualizations (either mode is on) | |
should_visualize = show_visualization or visual_only_mode | |
# Generate mermaid diagram if visualizations are enabled | |
if should_visualize: | |
try: | |
# Create the absolute simplest Mermaid diagram possible | |
query = state.get("query", "Query") | |
query_short = query[:20] + "..." if len(query) > 20 else query | |
# Generate the most basic diagram structure | |
mermaid_text = f"""graph TD | |
Q["{query_short}"] | |
S["Synthesized View"]""" | |
# Add each persona with a simple connection | |
for i, persona in enumerate(state.get("persona_responses", {}).keys()): | |
persona_short = persona.capitalize() | |
node_id = f"P{i+1}" | |
mermaid_text += f""" | |
{node_id}["{persona_short}"] | |
Q --> {node_id} | |
{node_id} --> S""" | |
# Store the simplified mermaid code | |
state["visualization_code"] = mermaid_text | |
print("Visualization generation complete with simplified diagram") | |
except Exception as e: | |
print(f"Error generating visualization: {e}") | |
state["visualization_code"] = None | |
# Generate DALL-E image if visualizations are enabled | |
try: | |
# Update progress message | |
if progress_msg: | |
await update_message(progress_msg, "⏳ Generating hand-drawn visualization (92%)...") | |
# Create a prompt from the synthesized response | |
image_prompt = state.get("synthesized_response", "") | |
if len(image_prompt) > 500: | |
image_prompt = image_prompt[:500] # Limit prompt length | |
# Add the query for context | |
image_prompt = f"Query: {state.get('query', '')}\n\nSynthesis: {image_prompt}" | |
# Generate the image | |
image_url = await generate_dalle_image(image_prompt) | |
state["visualization_image_url"] = image_url | |
print("DALL-E visualization generated successfully") | |
except Exception as e: | |
print(f"Error generating DALL-E image: {e}") | |
state["visualization_image_url"] = None | |
state["current_step_name"] = "present_results" | |
return state | |
async def present_results(state: InsightFlowState) -> InsightFlowState: | |
"""Present the final results to the user""" | |
synthesized_response = state.get("synthesized_response", "No synthesized response available.") | |
print("Presenting results to user") | |
# Ensure progress is at 100% before showing results | |
progress_msg = cl.user_session.get("progress_msg") | |
if progress_msg: | |
await update_message(progress_msg, "✅ Process complete (100%)") | |
# Get visualization settings | |
visual_only_mode = cl.user_session.get("visual_only_mode", False) | |
show_visualization = cl.user_session.get("show_visualization", True) | |
# Check if either visualization mode is enabled | |
visualization_enabled = visual_only_mode or show_visualization | |
# Determine panel mode | |
panel_mode = "Research Assistant" if state["panel_type"] == "research" else "Multi-Persona Discussion" | |
# Check if we have visualizations available | |
has_mermaid = state.get("visualization_code") is not None | |
has_dalle_image = state.get("visualization_image_url") is not None | |
has_any_visualization = has_mermaid or has_dalle_image | |
# Send text response if we're not in visual-only mode OR if no visualizations are available | |
if not visual_only_mode or (visual_only_mode and not has_any_visualization): | |
panel_indicator = f"**{panel_mode} Insights:**\n\n" | |
# In visual-only mode with no visualizations, add an explanation | |
if visual_only_mode and not has_any_visualization: | |
panel_indicator = f"**{panel_mode} Insights (No visualizations available):**\n\n" | |
await cl.Message(content=panel_indicator + synthesized_response).send() | |
# Display DALL-E generated image if available and visualizations are enabled | |
if has_dalle_image and visualization_enabled: | |
try: | |
# Add a title for the image | |
if visual_only_mode: | |
image_title = f"**Hand-drawn Visualization of {panel_mode} Insights:**" | |
else: | |
image_title = "**Hand-drawn Visualization:**" | |
# Send the title | |
await cl.Message(content=image_title).send() | |
# Send the image URL as markdown | |
image_url = state["visualization_image_url"] | |
image_markdown = f"" | |
await cl.Message(content=image_markdown).send() | |
except Exception as e: | |
print(f"Error displaying DALL-E image: {e}") | |
# If in visual-only mode and image fails but we have no other visualization or text shown | |
if visual_only_mode and not has_mermaid and state.get("text_fallback_shown", False) is not True: | |
panel_indicator = f"**{panel_mode} Insights (Image generation failed):**\n\n" | |
await cl.Message(content=panel_indicator + synthesized_response).send() | |
state["text_fallback_shown"] = True | |
# Display Mermaid diagram if available and visualizations are enabled | |
if has_mermaid and visualization_enabled: | |
try: | |
# Add a brief summary in visual-only mode | |
if visual_only_mode: | |
diagram_title = f"**Concept Map of {panel_mode} Insights:**" | |
else: | |
diagram_title = "**Concept Map:**" | |
# First send a title message | |
await cl.Message(content=diagram_title).send() | |
# Try to render the mermaid diagram | |
try: | |
# Ensure the diagram is extremely simple and valid | |
mermaid_code = state['visualization_code'] | |
# Fallback to a guaranteed working diagram if rendering fails | |
if not mermaid_code or len(mermaid_code) < 10: | |
mermaid_code = """graph TD | |
A[Query] --> B[Analysis] | |
B --> C[Result]""" | |
# Create the mermaid block with proper syntax | |
# Each line needs to be separate without extra indentation | |
mermaid_block = "```mermaid\n" | |
for line in mermaid_code.split('\n'): | |
mermaid_block += line.strip() + "\n" | |
mermaid_block += "```" | |
# Send the diagram as its own message | |
await cl.Message(content=mermaid_block).send() | |
except Exception as diagram_err: | |
print(f"Error rendering diagram: {diagram_err}") | |
# Try an ultra-simple fallback diagram | |
ultra_simple = """```mermaid | |
graph TD | |
A[Start] --> B[End] | |
```""" | |
await cl.Message(content=ultra_simple).send() | |
# Send the footer only if we have visualizations | |
if has_any_visualization: | |
await cl.Message(content="_Visualizations represent the key relationships between concepts from different perspectives._").send() | |
except Exception as e: | |
print(f"Error displaying visualization: {e}") | |
# If in visual-only mode and visualization fails but no image shown yet and no text shown yet | |
if visual_only_mode and not has_dalle_image and state.get("text_fallback_shown", False) is not True: | |
panel_indicator = f"**{panel_mode} Insights (Visualization failed):**\n\n" | |
await cl.Message(content=panel_indicator + synthesized_response).send() | |
# Mark that we showed the fallback text to avoid duplicates | |
state["text_fallback_shown"] = True | |
# Check if user wants to see individual perspectives (not in visual-only mode) | |
if cl.user_session.get("show_perspectives", True) and not visual_only_mode: | |
# Show individual perspectives as separate messages instead of expandable elements | |
for persona_id, response in state["persona_responses"].items(): | |
persona_name = persona_id.capitalize() | |
# Get proper display name from config if available | |
persona_factory = cl.user_session.get("persona_factory") | |
if persona_factory: | |
config = persona_factory.get_config(persona_id) | |
if config and "name" in config: | |
persona_name = config["name"] | |
# Just send the perspective as a message with a header | |
perspective_message = f"**{persona_name}'s Perspective:**\n\n{response}" | |
await cl.Message(content=perspective_message).send() | |
state["current_step_name"] = "END" | |
return state | |
# --- LANGGRAPH SETUP FOR INSIGHTFLOW AI --- | |
# Now define the graph with the functions we've defined above | |
insight_graph_builder = StateGraph(InsightFlowState) | |
# Add all nodes | |
insight_graph_builder.add_node("planner_agent", run_planner_agent) | |
insight_graph_builder.add_node("execute_persona_tasks", execute_persona_tasks) | |
insight_graph_builder.add_node("synthesize_responses", synthesize_responses) | |
insight_graph_builder.add_node("generate_visualization", generate_visualization) | |
insight_graph_builder.add_node("present_results", present_results) | |
# Add edges | |
insight_graph_builder.add_edge("planner_agent", "execute_persona_tasks") | |
insight_graph_builder.add_edge("execute_persona_tasks", "synthesize_responses") | |
insight_graph_builder.add_edge("synthesize_responses", "generate_visualization") | |
insight_graph_builder.add_edge("generate_visualization", "present_results") | |
insight_graph_builder.add_edge("present_results", END) | |
# Set entry point | |
insight_graph_builder.set_entry_point("planner_agent") | |
# Compile the graph | |
insight_flow_graph = insight_graph_builder.compile() | |
print("InsightFlow graph compiled successfully") | |
# --- DIRECT QUERY FUNCTION --- | |
async def direct_query(query: str): | |
"""Process a direct query without using multiple personas""" | |
messages = [ | |
SystemMessage(content=DIRECT_SYSPROMPT), | |
HumanMessage(content=query) | |
] | |
try: | |
# Direct query to LLM with streaming | |
async for chunk in llm_direct.astream(messages): | |
if chunk.content: | |
# Yield chunk for streaming UI updates | |
yield chunk.content | |
except Exception as e: | |
error_msg = f"Error processing direct query: {str(e)}" | |
yield error_msg | |
# Helper function to display help information | |
async def display_help(): | |
"""Display all available commands""" | |
help_text = """ | |
# InsightFlow AI Commands | |
**Persona Management:** | |
- `/add persona_name` - Add a persona to your research team (e.g., `/add factual`) | |
- `/remove persona_name` - Remove a persona from your team (e.g., `/remove philosophical`) | |
- `/list` - Show all available personas | |
- `/team` - Show your current team and settings | |
**Speed and Mode Options:** | |
- `/direct on|off` - Toggle direct LLM mode (bypasses multi-persona system) | |
- `/quick on|off` - Toggle quick mode (uses fewer personas) | |
- `/perspectives on|off` - Toggle showing individual perspectives | |
- `/visualization on|off` - Toggle showing visualizations (Mermaid diagrams & DALL-E images) | |
- `/visual_only on|off` - Show only visualizations without text (faster) | |
**Export Options:** | |
- `/export_md` - Export the current insight analysis to a markdown file | |
- `/export_pdf` - Export the current insight analysis to a PDF file | |
**System Commands:** | |
- `/help` - Show this help message | |
**Available Personas:** | |
- analytical - Logical problem-solving | |
- scientific - Evidence-based reasoning | |
- philosophical - Meaning and implications | |
- factual - Practical information | |
- metaphorical - Creative analogies | |
- futuristic - Forward-looking possibilities | |
""" | |
await cl.Message(content=help_text).send() | |
# Export functions | |
async def generate_random_id(length=8): | |
"""Generate a random ID for export filenames""" | |
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) | |
async def export_to_markdown(state: InsightFlowState): | |
"""Export the current insight analysis to a markdown file""" | |
if not state.get("synthesized_response"): | |
return None, "No analysis available to export. Please run a query first." | |
# Create exports directory if it doesn't exist | |
Path("./exports").mkdir(exist_ok=True) | |
# Generate a unique filename with timestamp | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
random_id = await generate_random_id() | |
filename = f"exports/insightflow_analysis_{timestamp}_{random_id}.md" | |
# Prepare content | |
query = state.get("query", "No query specified") | |
synthesized = state.get("synthesized_response", "No synthesized response") | |
panel_mode = "Research Assistant" if state["panel_type"] == "research" else "Multi-Persona Discussion" | |
# Create markdown content | |
md_content = f"""# InsightFlow AI Analysis | |
*Generated on: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}* | |
## Query | |
{query} | |
## {panel_mode} Insights | |
{synthesized} | |
""" | |
# Add perspectives if available | |
if state.get("persona_responses"): | |
md_content += "## Individual Perspectives\n\n" | |
for persona_id, response in state["persona_responses"].items(): | |
persona_name = persona_id.capitalize() | |
md_content += f"### {persona_name}'s Perspective\n{response}\n\n" | |
# Add visualization section header | |
md_content += "## Visualizations\n\n" | |
# Add DALL-E image if available | |
if state.get("visualization_image_url"): | |
md_content += f"### Hand-drawn Visual Representation\n\n" | |
md_content += f"\n\n" | |
# Add visualization if available | |
if state.get("visualization_code"): | |
md_content += "### Concept Map\n\n```mermaid\n" | |
for line in state["visualization_code"].split('\n'): | |
md_content += line.strip() + "\n" | |
md_content += "```\n\n" | |
md_content += "*Note: The mermaid diagram will render in applications that support mermaid syntax, like GitHub or VS Code with appropriate extensions.*\n\n" | |
# Add footer | |
md_content += "---\n*Generated by InsightFlow AI*" | |
# Write to file | |
try: | |
with open(filename, "w", encoding="utf-8") as f: | |
f.write(md_content) | |
return filename, None | |
except Exception as e: | |
return None, f"Error exporting to markdown: {str(e)}" | |
async def export_to_pdf(state: InsightFlowState): | |
"""Export the current insight analysis to a PDF file""" | |
if not state.get("synthesized_response"): | |
return None, "No analysis available to export. Please run a query first." | |
# Create exports directory if it doesn't exist | |
Path("./exports").mkdir(exist_ok=True) | |
# Generate a unique filename with timestamp | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
random_id = await generate_random_id() | |
filename = f"exports/insightflow_analysis_{timestamp}_{random_id}.pdf" | |
try: | |
# Create PDF | |
pdf = fpdf.FPDF() | |
pdf.add_page() | |
# Add title | |
pdf.set_font('Arial', 'B', 16) | |
pdf.cell(0, 10, 'InsightFlow AI Analysis', 0, 1, 'C') | |
pdf.set_font('Arial', 'I', 10) | |
pdf.cell(0, 10, f"Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", 0, 1, 'C') | |
pdf.ln(10) | |
# Add query | |
pdf.set_font('Arial', 'B', 12) | |
pdf.cell(0, 10, 'Query:', 0, 1) | |
pdf.set_font('Arial', '', 11) | |
query = state.get("query", "No query specified") | |
pdf.multi_cell(0, 10, query) | |
pdf.ln(5) | |
# Add synthesized insights | |
panel_mode = "Research Assistant" if state["panel_type"] == "research" else "Multi-Persona Discussion" | |
pdf.set_font('Arial', 'B', 12) | |
pdf.cell(0, 10, f'{panel_mode} Insights:', 0, 1) | |
pdf.set_font('Arial', '', 11) | |
synthesized = state.get("synthesized_response", "No synthesized response") | |
pdf.multi_cell(0, 10, synthesized) | |
pdf.ln(10) | |
# Add perspectives if available | |
if state.get("persona_responses"): | |
pdf.set_font('Arial', 'B', 12) | |
pdf.cell(0, 10, 'Individual Perspectives:', 0, 1) | |
pdf.ln(5) | |
for persona_id, response in state["persona_responses"].items(): | |
persona_name = persona_id.capitalize() | |
pdf.set_font('Arial', 'B', 11) | |
pdf.cell(0, 10, f"{persona_name}'s Perspective:", 0, 1) | |
pdf.set_font('Arial', '', 11) | |
pdf.multi_cell(0, 10, response) | |
pdf.ln(5) | |
# Add visualizations section | |
pdf.add_page() | |
pdf.set_font('Arial', 'B', 14) | |
pdf.cell(0, 10, 'Visualizations', 0, 1, 'C') | |
pdf.ln(5) | |
# Add DALL-E image if available | |
if state.get("visualization_image_url"): | |
try: | |
# Add header for the visualization | |
pdf.set_font('Arial', 'B', 12) | |
pdf.cell(0, 10, 'Hand-drawn Visual Representation:', 0, 1) | |
pdf.ln(5) | |
# Download the image | |
image_url = state.get("visualization_image_url") | |
image_path = f"exports/temp_image_{timestamp}_{random_id}.jpg" | |
# Download the image using requests | |
response = requests.get(image_url, stream=True) | |
if response.status_code == 200: | |
with open(image_path, 'wb') as img_file: | |
for chunk in response.iter_content(1024): | |
img_file.write(chunk) | |
# Add the image to PDF with proper sizing | |
pdf.image(image_path, x=10, y=None, w=190) | |
pdf.ln(5) | |
# Remove the temporary image | |
os.remove(image_path) | |
else: | |
pdf.multi_cell(0, 10, "Could not download the visualization image.") | |
except Exception as img_error: | |
pdf.multi_cell(0, 10, f"Error including visualization image: {str(img_error)}") | |
# Add mermaid diagram if available | |
if state.get("visualization_code"): | |
pdf.ln(10) | |
pdf.set_font('Arial', 'B', 12) | |
pdf.cell(0, 10, 'Concept Map Structure:', 0, 1) | |
pdf.ln(5) | |
# Extract relationships from the mermaid code | |
mermaid_code = state.get("visualization_code", "") | |
pdf.set_font('Arial', 'I', 10) | |
pdf.multi_cell(0, 10, "Below is a text representation of the concept relationships:") | |
pdf.ln(5) | |
# Add a text representation of the diagram | |
try: | |
# Parse the mermaid code to extract relationships | |
relationships = [] | |
for line in mermaid_code.split('\n'): | |
line = line.strip() | |
if '-->' in line: | |
parts = line.split('-->') | |
if len(parts) == 2: | |
source = parts[0].strip() | |
target = parts[1].strip() | |
relationships.append(f"• {source} connects to {target}") | |
if relationships: | |
pdf.set_font('Arial', '', 10) | |
for rel in relationships: | |
pdf.multi_cell(0, 8, rel) | |
else: | |
# Add a simplified representation of the concept map | |
pdf.multi_cell(0, 10, "The concept map shows relationships between the query and multiple perspectives, leading to a synthesized view.") | |
except Exception as diagram_error: | |
pdf.multi_cell(0, 10, f"Error parsing concept map: {str(diagram_error)}") | |
pdf.multi_cell(0, 10, "The concept map shows the relationships between different perspectives on the topic.") | |
# Add footer | |
pdf.set_y(-15) | |
pdf.set_font('Arial', 'I', 8) | |
pdf.cell(0, 10, 'Generated by InsightFlow AI', 0, 0, 'C') | |
# Output PDF | |
pdf.output(filename) | |
return filename, None | |
except Exception as e: | |
return None, f"Error exporting to PDF: {str(e)}" | |
# --- CHAINLIT INTEGRATION --- | |
# Super simplified version with command-based persona selection | |
async def start_chat(): | |
"""Initialize the InsightFlow AI session""" | |
print("InsightFlow AI chat started: Initializing session...") | |
# Initialize persona factory and load configs | |
persona_factory = PersonaFactory(config_dir="persona_configs") | |
cl.user_session.set("persona_factory", persona_factory) | |
# Initialize state with default personas | |
initial_state = InsightFlowState( | |
panel_type="research", | |
query="", | |
selected_personas=["analytical", "scientific", "philosophical"], | |
persona_responses={}, | |
synthesized_response=None, | |
current_step_name="awaiting_query", | |
error_message=None | |
) | |
# Initialize LangGraph | |
cl.user_session.set("insight_state", initial_state) | |
cl.user_session.set("insight_graph", insight_flow_graph) | |
# Set default options | |
cl.user_session.set("direct_mode", False) # Default to InsightFlow mode | |
cl.user_session.set("show_perspectives", True) # Default to showing all perspectives | |
cl.user_session.set("quick_mode", False) # Default to normal speed | |
cl.user_session.set("show_visualization", True) # Default to showing visualizations | |
cl.user_session.set("visual_only_mode", False) # Default to showing both text and visuals | |
# Welcome message with command instructions | |
welcome_message = """ | |
# Welcome to InsightFlow AI | |
This assistant provides multiple perspectives on your questions using specialized personas. | |
**Your current research team:** | |
- Analytical reasoning | |
- Scientific reasoning | |
- Philosophical reasoning | |
Type `/help` to see all available commands. | |
""" | |
await cl.Message(content=welcome_message).send() | |
# Display help initially | |
await display_help() | |
# Update function for Chainlit 2.5.5 compatibility | |
async def update_message(message, new_content): | |
"""Update a message in a way that's compatible with Chainlit 2.5.5""" | |
try: | |
# First try the direct content update method (newer versions) | |
await message.update(content=new_content) | |
except TypeError: | |
# Fall back to older method for Chainlit 2.5.5 | |
message.content = new_content | |
await message.update() | |
async def handle_message(message: cl.Message): | |
"""Handle user messages""" | |
state = cl.user_session.get("insight_state") | |
graph = cl.user_session.get("insight_graph") | |
if not state or not graph: | |
await cl.Message(content="Session error. Please refresh the page.").send() | |
return | |
# Check for commands to change personas or settings | |
msg_content = message.content.strip() | |
# Handle commands | |
if msg_content.startswith('/'): | |
parts = msg_content.split() | |
command = parts[0].lower() | |
if command == '/help': | |
# Show help text | |
await display_help() | |
return | |
elif command == '/list': | |
# List available personas | |
persona_list = """ | |
**Available personas:** | |
- analytical - Logical problem-solving | |
- scientific - Evidence-based reasoning | |
- philosophical - Meaning and implications | |
- factual - Practical information | |
- metaphorical - Creative analogies | |
- futuristic - Forward-looking possibilities | |
""" | |
await cl.Message(content=persona_list).send() | |
return | |
elif command == '/team': | |
# Show current team | |
team_list = ", ".join([p.capitalize() for p in state["selected_personas"]]) | |
direct_mode = "ON" if cl.user_session.get("direct_mode", False) else "OFF" | |
quick_mode = "ON" if cl.user_session.get("quick_mode", False) else "OFF" | |
show_perspectives = "ON" if cl.user_session.get("show_perspectives", True) else "OFF" | |
show_visualization = "ON" if cl.user_session.get("show_visualization", True) else "OFF" | |
visual_only_mode = "ON" if cl.user_session.get("visual_only_mode", False) else "OFF" | |
status = f""" | |
**Your current settings:** | |
- Research team: {team_list} | |
- Direct mode: {direct_mode} | |
- Quick mode: {quick_mode} | |
- Show perspectives: {show_perspectives} | |
- Show visualizations: {show_visualization} | |
- Visual-only mode: {visual_only_mode} (Mermaid diagrams & DALL-E images) | |
""" | |
await cl.Message(content=status).send() | |
return | |
elif command == '/add' and len(parts) > 1: | |
# Add persona | |
persona_id = parts[1].lower() | |
persona_factory = cl.user_session.get("persona_factory") | |
if persona_factory and persona_factory.get_config(persona_id): | |
if persona_id not in state["selected_personas"]: | |
state["selected_personas"].append(persona_id) | |
cl.user_session.set("insight_state", state) | |
await cl.Message(content=f"Added {persona_id} to your research team.").send() | |
else: | |
await cl.Message(content=f"{persona_id} is already in your research team.").send() | |
else: | |
await cl.Message(content=f"Unknown persona: {persona_id}. Use /list to see available personas.").send() | |
return | |
elif command == '/remove' and len(parts) > 1: | |
# Remove persona | |
persona_id = parts[1].lower() | |
if persona_id in state["selected_personas"]: | |
if len(state["selected_personas"]) > 1: # Don't remove the last persona | |
state["selected_personas"].remove(persona_id) | |
cl.user_session.set("insight_state", state) | |
await cl.Message(content=f"Removed {persona_id} from your research team.").send() | |
else: | |
await cl.Message(content="Cannot remove the last persona. You need at least one for analysis.").send() | |
else: | |
await cl.Message(content=f"{persona_id} is not in your research team.").send() | |
return | |
elif command == '/direct' and len(parts) > 1: | |
# Toggle direct mode | |
setting = parts[1].lower() | |
if setting in ['on', 'true', '1', 'yes']: | |
cl.user_session.set("direct_mode", True) | |
await cl.Message(content="Direct mode enabled. Bypassing InsightFlow for faster responses.").send() | |
elif setting in ['off', 'false', '0', 'no']: | |
cl.user_session.set("direct_mode", False) | |
await cl.Message(content="Direct mode disabled. Using full InsightFlow system.").send() | |
else: | |
await cl.Message(content="Invalid option. Use `/direct on` or `/direct off`.").send() | |
return | |
elif command == '/perspectives' and len(parts) > 1: | |
# Toggle showing perspectives | |
setting = parts[1].lower() | |
if setting in ['on', 'true', '1', 'yes']: | |
cl.user_session.set("show_perspectives", True) | |
await cl.Message(content="Individual perspectives will be shown.").send() | |
elif setting in ['off', 'false', '0', 'no']: | |
cl.user_session.set("show_perspectives", False) | |
await cl.Message(content="Individual perspectives will be hidden for concise output.").send() | |
else: | |
await cl.Message(content="Invalid option. Use `/perspectives on` or `/perspectives off`.").send() | |
return | |
elif command == '/quick' and len(parts) > 1: | |
# Toggle quick mode | |
setting = parts[1].lower() | |
if setting in ['on', 'true', '1', 'yes']: | |
cl.user_session.set("quick_mode", True) | |
if len(state["selected_personas"]) > 2: | |
# In quick mode, use max 2 personas | |
state["selected_personas"] = state["selected_personas"][:2] | |
cl.user_session.set("insight_state", state) | |
await cl.Message(content="Quick mode enabled. Using fewer personas for faster responses.").send() | |
elif setting in ['off', 'false', '0', 'no']: | |
cl.user_session.set("quick_mode", False) | |
await cl.Message(content="Quick mode disabled. Using your full research team.").send() | |
else: | |
await cl.Message(content="Invalid option. Use `/quick on` or `/quick off`.").send() | |
return | |
elif command == '/visualization' and len(parts) > 1: | |
# Toggle showing Mermaid diagrams | |
setting = parts[1].lower() | |
if setting in ['on', 'true', '1', 'yes']: | |
cl.user_session.set("show_visualization", True) | |
await cl.Message(content="Visual diagrams will be shown to represent insights.").send() | |
elif setting in ['off', 'false', '0', 'no']: | |
cl.user_session.set("show_visualization", False) | |
await cl.Message(content="Visual diagrams will be hidden.").send() | |
else: | |
await cl.Message(content="Invalid option. Use `/visualization on` or `/visualization off`.").send() | |
return | |
elif command == '/visual_only' and len(parts) > 1: | |
# Toggle visual-only mode | |
setting = parts[1].lower() | |
if setting in ['on', 'true', '1', 'yes']: | |
# When enabling visual-only mode, turn off other display options | |
cl.user_session.set("visual_only_mode", True) | |
cl.user_session.set("show_visualization", True) # Ensure visualization is on | |
cl.user_session.set("show_perspectives", False) # Turn off perspective display | |
await cl.Message(content="Visual-only mode enabled. Only visualizations (Mermaid diagrams & DALL-E images) will be shown. Individual perspectives have been disabled.").send() | |
elif setting in ['off', 'false', '0', 'no']: | |
cl.user_session.set("visual_only_mode", False) | |
cl.user_session.set("show_perspectives", True) # Restore default when turning off | |
await cl.Message(content="Visual-only mode disabled. Both text and visualizations will be shown.").send() | |
else: | |
await cl.Message(content="Invalid option. Use `/visual_only on` or `/visual_only off`.").send() | |
return | |
elif command == '/export_md': | |
# Export to markdown | |
state = cl.user_session.get("insight_state") | |
if not state: | |
await cl.Message(content="No analysis data available. Run a query first.").send() | |
return | |
await cl.Message(content="Exporting analysis to markdown...").send() | |
filename, error = await export_to_markdown(state) | |
if error: | |
await cl.Message(content=f"Error: {error}").send() | |
else: | |
await cl.Message(content=f"Analysis exported to: `{filename}`").send() | |
return | |
elif command == '/export_pdf': | |
# Export to PDF | |
state = cl.user_session.get("insight_state") | |
if not state: | |
await cl.Message(content="No analysis data available. Run a query first.").send() | |
return | |
await cl.Message(content="Exporting analysis to PDF...").send() | |
filename, error = await export_to_pdf(state) | |
if error: | |
await cl.Message(content=f"Error: {error}").send() | |
else: | |
await cl.Message(content=f"Analysis exported to: `{filename}`").send() | |
return | |
# Process query (either direct or through InsightFlow) | |
# Create streaming message for results | |
answer_msg = cl.Message(content="") | |
await answer_msg.send() | |
# Create progress message | |
progress_msg = cl.Message(content="⏳ Processing your query (0%)...") | |
await progress_msg.send() | |
try: | |
# Check if direct mode is enabled | |
if cl.user_session.get("direct_mode", False): | |
# Direct mode with streaming - bypass InsightFlow | |
await update_message(progress_msg, "⏳ Processing in direct mode (20%)...") | |
# Stream response directly | |
full_response = "" | |
async for chunk in direct_query(msg_content): | |
full_response += chunk | |
# Update the message with the new chunk | |
await update_message(answer_msg, f"**Direct Answer:**\n\n{full_response}") | |
# Complete the progress | |
await update_message(progress_msg, "✅ Processing complete (100%)") | |
return | |
# Apply quick mode if enabled | |
if cl.user_session.get("quick_mode", False) and len(state["selected_personas"]) > 2: | |
# Temporarily use just 2 personas for speed | |
original_personas = state["selected_personas"].copy() | |
state["selected_personas"] = state["selected_personas"][:2] | |
await update_message(progress_msg, f"⏳ Using quick mode with personas: {', '.join(state['selected_personas'])} (10%)...") | |
# Standard InsightFlow processing | |
# Set query in state | |
state["query"] = msg_content | |
# Setup for progress tracking | |
cl.user_session.set("progress_msg", progress_msg) | |
cl.user_session.set("progress_steps", { | |
"planner_agent": 10, | |
"execute_persona_tasks": 40, | |
"synthesize_responses": 80, | |
"generate_visualization": 90, | |
"present_results": 95, | |
"END": 100 | |
}) | |
# Hook into state changes for progress | |
async def state_monitor(): | |
"""Monitor state changes to update progress""" | |
last_step = None | |
while True: | |
current_step = state.get("current_step_name") | |
if current_step != last_step: | |
progress_steps = cl.user_session.get("progress_steps", {}) | |
if current_step in progress_steps: | |
progress = progress_steps[current_step] | |
status_messages = { | |
"planner_agent": "Planning research approach", | |
"execute_persona_tasks": "Generating persona perspectives", | |
"synthesize_responses": "Synthesizing perspectives", | |
"generate_visualization": "Generating visual representation", | |
"present_results": "Finalizing results", | |
"END": "Complete" | |
} | |
status = status_messages.get(current_step, current_step) | |
await update_message(progress_msg, f"⏳ {status} ({progress}%)...") | |
last_step = current_step | |
# Check if we're done | |
if current_step == "END": | |
await update_message(progress_msg, f"✅ Process complete (100%)") | |
break | |
# Wait before checking again | |
await asyncio.sleep(0.5) | |
# Start the monitor in the background | |
asyncio.create_task(state_monitor()) | |
# Run the graph with timeout protection | |
thread_id = cl.user_session.get("id", "default_thread_id") | |
config = {"configurable": {"thread_id": thread_id}} | |
# Set an overall timeout for the entire graph execution | |
final_state = await asyncio.wait_for( | |
graph.ainvoke(state, config), | |
timeout=150 # 2.5 minute timeout | |
) | |
cl.user_session.set("insight_state", final_state) | |
# Update the answer message with the response | |
panel_mode = "Research Assistant" if final_state["panel_type"] == "research" else "Multi-Persona Discussion" | |
panel_indicator = f"**{panel_mode} Insights:**\n\n" | |
await update_message(answer_msg, panel_indicator + final_state.get("synthesized_response", "No response generated.")) | |
# Show individual perspectives if enabled | |
if cl.user_session.get("show_perspectives", True): | |
for persona_id, response in final_state["persona_responses"].items(): | |
persona_name = persona_id.capitalize() | |
# Get proper display name from config if available | |
persona_factory = cl.user_session.get("persona_factory") | |
if persona_factory: | |
config = persona_factory.get_config(persona_id) | |
if config and "name" in config: | |
persona_name = config["name"] | |
# Send perspective as a message | |
perspective_message = f"**{persona_name}'s Perspective:**\n\n{response}" | |
await cl.Message(content=perspective_message).send() | |
# Restore original personas if in quick mode | |
if cl.user_session.get("quick_mode", False) and 'original_personas' in locals(): | |
state["selected_personas"] = original_personas | |
cl.user_session.set("insight_state", state) | |
except asyncio.TimeoutError: | |
print("Overall graph execution timed out") | |
await update_message(answer_msg, "The analysis took too long and timed out. Try using `/direct on` or `/quick on` for faster responses.") | |
await update_message(progress_msg, "❌ Process timed out") | |
except Exception as e: | |
print(f"Error in query processing: {e}") | |
await update_message(answer_msg, f"I encountered an error: {e}") | |
await update_message(progress_msg, f"❌ Error: {str(e)}") | |
print("InsightFlow AI setup complete. Ready to start.") |