diff --git a/README.md b/README.md index d926e76961877c1af84490fbaa54107d380de32b..4745ccd472f7256b32542a7a5bbe0cb050f3ebc7 100644 --- a/README.md +++ b/README.md @@ -12,16 +12,124 @@ hf_oauth: true hf_oauth_expiration_minutes: 480 --- -Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference +# LangGraph Multi-Agent System with Langfuse v3 Observability -To generate a `requirements.txt` file compatible with **Python 3.10** for deployment on a Hugging Face Space, run the following command: +A sophisticated multi-agent system built with LangGraph that follows best practices for state management, tracing, and iterative workflows. Features comprehensive Langfuse v3 observability with OpenTelemetry integration. + +## Architecture Overview + +The system implements an iterative research/code loop with specialized agents: + +``` +User Query → Lead Agent → Research Agent → Code Agent → Lead Agent (loop) → Answer Formatter → Final Answer +``` + +## Key Features + +- **🤖 Multi-Agent Workflow**: Specialized agents for research, computation, and formatting +- **📊 Langfuse v3 Observability**: Complete tracing with OTEL integration and predictable span naming +- **🔄 Iterative Processing**: Intelligent routing between research and computational tasks +- **🎯 GAIA Compliance**: Exact-match answer formatting for benchmark evaluation +- **💾 Memory System**: Vector store integration for learning and caching +- **🛠️ Tool Integration**: Web search, Wikipedia, ArXiv, calculations, and code execution + +## Quick Start + +### Environment Setup + +Create an `env.local` file with required API keys: + +```bash +# LLM API +GROQ_API_KEY=your_groq_api_key + +# Search Tools +TAVILY_API_KEY=your_tavily_api_key + +# Observability (Langfuse v3) +LANGFUSE_PUBLIC_KEY=your_langfuse_public_key +LANGFUSE_SECRET_KEY=your_langfuse_secret_key +LANGFUSE_HOST=https://cloud.langfuse.com + +# Memory (Optional) +SUPABASE_URL=your_supabase_url +SUPABASE_SERVICE_KEY=your_supabase_service_key +``` + +### Running the System + +**Important**: Use `uv run` for proper dependency management: + +```bash +# Run the multi-agent system test +uv run python test_new_multi_agent_system.py + +# Test Langfuse v3 observability +uv run python test_observability.py + +# Run the main application +uv run python app.py +``` + +### Basic Usage + +```python +import asyncio +from langgraph_agent_system import run_agent_system + +async def main(): + result = await run_agent_system( + query="What is the capital of Maharashtra?", + user_id="user_123", + session_id="session_456" + ) + print(f"Answer: {result}") + +asyncio.run(main()) +``` + +## Observability Dashboard + +After running queries, check your traces at: **https://cloud.langfuse.com** + +The system provides: +- 🎯 **Predictable Span Naming**: `agent/`, `tool/`, `llm/` +- 🔗 **Session Tracking**: User and session continuity across conversations +- 📈 **Cost & Latency Metrics**: Automatic aggregation by span type +- 🌐 **OTEL Integration**: Automatic trace correlation across services + +## Deployment + +### For Hugging Face Spaces + +To generate a `requirements.txt` file compatible with **Python 3.10** for deployment: ```bash uv pip compile pyproject.toml --python 3.10 -o requirements.txt +# Remove Windows-specific packages # Linux / macOS (bash) sed -i '/^pywin32==/d' requirements.txt # Windows (PowerShell) (Get-Content requirements.txt) -notmatch '^pywin32==' | Set-Content requirements.txt -``` \ No newline at end of file +``` + +### Environment Variables for Production + +Set these in your deployment environment: +- `GROQ_API_KEY` - Required for LLM inference +- `TAVILY_API_KEY` - Required for web search +- `LANGFUSE_PUBLIC_KEY` - Required for observability +- `LANGFUSE_SECRET_KEY` - Required for observability +- `LANGFUSE_HOST` - Langfuse endpoint (default: https://cloud.langfuse.com) + +## Documentation + +For detailed architecture, configuration, and usage instructions, see: +- **[Multi-Agent System Guide](README_MULTI_AGENT_SYSTEM.md)** - Complete system documentation +- **[Supabase Setup](README_SUPABASE.md)** - Memory system configuration + +## Configuration Reference + +Check out the Hugging Face Spaces configuration reference at https://huggingface.co/docs/hub/spaces-config-reference \ No newline at end of file diff --git a/README_MULTI_AGENT_SYSTEM.md b/README_MULTI_AGENT_SYSTEM.md new file mode 100644 index 0000000000000000000000000000000000000000..4400a2ebbfcbbe7e0b55415efaaf86381c1e3673 --- /dev/null +++ b/README_MULTI_AGENT_SYSTEM.md @@ -0,0 +1,244 @@ +# LangGraph Multi-Agent System + +A sophisticated multi-agent system built with LangGraph that follows best practices for state management, tracing, and iterative workflows. + +## Architecture Overview + +The system implements an iterative research/code loop with specialized agents: + +``` +User Query → Lead Agent → Research Agent → Code Agent → Lead Agent (loop) → Answer Formatter → Final Answer +``` + +### Key Components + +1. **Lead Agent** (`agents/lead_agent.py`) + - Orchestrates the entire workflow + - Makes routing decisions between research and code agents + - Manages the iterative loop with a maximum of 3 iterations + - Synthesizes information from specialists into draft answers + +2. **Research Agent** (`agents/research_agent.py`) + - Handles information gathering from multiple sources + - Uses web search (Tavily), Wikipedia, and ArXiv tools + - Provides structured research results with citations + +3. **Code Agent** (`agents/code_agent.py`) + - Performs mathematical calculations and code execution + - Uses calculator tools for basic operations + - Executes Python code in a sandboxed environment + - Handles Hugging Face Hub statistics + +4. **Answer Formatter** (`agents/answer_formatter.py`) + - Ensures GAIA benchmark compliance + - Extracts final answers according to exact-match rules + - Handles different answer types (numbers, strings, lists) + +5. **Memory System** (`memory_system.py`) + - Vector store integration for long-term learning + - Session-based caching for performance + - Similar question retrieval for context + +## Core Features + +### State Management +- **Immutable State**: Uses LangGraph's Command pattern for pure functions +- **Typed Schema**: AgentState TypedDict ensures type safety +- **Accumulation**: Research notes and code outputs accumulate across iterations + +### Observability (Langfuse v3) +- **OTEL-Native Integration**: Uses Langfuse v3 with OpenTelemetry for automatic trace correlation +- **Single Callback Handler**: One global handler passes traces seamlessly through LangGraph +- **Predictable Span Naming**: `agent/`, `tool/`, `llm/` patterns for cost/latency dashboards +- **Session Stitching**: User and session tracking for conversation continuity +- **Background Flushing**: Non-blocking trace export for optimal performance + +### Tools Integration +- **Web Search**: Tavily API for current information +- **Knowledge Bases**: Wikipedia and ArXiv for encyclopedic/academic content +- **Computation**: Calculator tools and Python execution +- **Hub Statistics**: Hugging Face model information + +## Setup + +### Environment Variables +Create an `env.local` file with: + +```bash +# LLM API +GROQ_API_KEY=your_groq_api_key + +# Search Tools +TAVILY_API_KEY=your_tavily_api_key + +# Observability +LANGFUSE_PUBLIC_KEY=your_langfuse_public_key +LANGFUSE_SECRET_KEY=your_langfuse_secret_key +LANGFUSE_HOST=https://cloud.langfuse.com + +# Memory (Optional) +SUPABASE_URL=your_supabase_url +SUPABASE_SERVICE_KEY=your_supabase_service_key +``` + +### Dependencies +The system requires: +- `langgraph>=0.4.8` +- `langchain>=0.3.0` +- `langchain-groq` +- `langfuse>=3.0.0` +- `python-dotenv` +- `tavily-python` + +## Usage + +### Basic Usage + +```python +import asyncio +from langgraph_agent_system import run_agent_system + +async def main(): + result = await run_agent_system( + query="What is the capital of Maharashtra?", + user_id="user_123", + session_id="session_456" + ) + print(f"Answer: {result}") + +asyncio.run(main()) +``` + +### Testing + +Run the test suite to verify functionality: + +```bash +python test_new_multi_agent_system.py +``` + +Test Langfuse v3 observability integration: + +```bash +python test_observability.py +``` + +### Direct Graph Access + +```python +from langgraph_agent_system import create_agent_graph + +# Create and compile the workflow +workflow = create_agent_graph() +app = workflow.compile() + +# Run with initial state +initial_state = { + "messages": [HumanMessage(content="Your question")], + "draft_answer": "", + "research_notes": "", + "code_outputs": "", + "loop_counter": 0, + "done": False, + "next": "research", + "final_answer": "", + "user_id": "user_123", + "session_id": "session_456" +} + +final_state = await app.ainvoke(initial_state) +print(final_state["final_answer"]) +``` + +## Workflow Details + +### Iterative Loop +1. **Lead Agent** analyzes the query and decides on next action +2. If research needed → **Research Agent** gathers information +3. If computation needed → **Code Agent** performs calculations +4. Back to **Lead Agent** for synthesis and next decision +5. When sufficient information → **Answer Formatter** creates final answer + +### Routing Logic +The Lead Agent uses the following criteria: +- **Research**: Factual information, current events, citations needed +- **Code**: Mathematical calculations, data analysis, programming tasks +- **Formatter**: Sufficient information gathered OR max iterations reached + +### GAIA Compliance +The Answer Formatter ensures exact-match requirements: +- **Numbers**: No commas, units, or extra symbols +- **Strings**: Remove unnecessary articles and formatting +- **Lists**: Comma and space separation +- **No surrounding text**: No "Answer:", quotes, or brackets + +## Best Practices Implemented + +### LangGraph Patterns +- ✅ Pure functions (AgentState → Command) +- ✅ Immutable state with explicit updates +- ✅ Typed state schema with operator annotations +- ✅ Clear routing separated from business logic + +### Langfuse v3 Observability +- ✅ OTEL-native SDK with automatic trace correlation +- ✅ Single global callback handler for seamless LangGraph integration +- ✅ Predictable span naming (`agent/`, `tool/`, `llm/`) +- ✅ Session and user tracking with environment tagging +- ✅ Background trace flushing for performance +- ✅ Graceful degradation when observability unavailable + +### Memory Management +- ✅ TTL-based caching for performance +- ✅ Vector store integration for learning +- ✅ Duplicate detection and prevention +- ✅ Session cleanup for long-running instances + +## Error Handling + +The system implements graceful degradation: +- **Tool failures**: Continue with available tools +- **API timeouts**: Retry with backoff +- **Memory errors**: Degrade to LLM-only mode +- **Agent failures**: Return informative error messages + +## Performance Considerations + +- **Caching**: Vector store searches cached for 5 minutes +- **Parallelization**: Tools can be executed in parallel +- **Memory limits**: Sandbox execution has resource constraints +- **Loop termination**: Hard limit of 3 iterations prevents infinite loops + +## Extending the System + +### Adding New Agents +1. Create agent file in `agents/` directory +2. Implement agent function returning Command +3. Add to workflow in `create_agent_graph()` +4. Update routing logic in Lead Agent + +### Adding New Tools +1. Implement tool following LangChain Tool interface +2. Add to appropriate agent's tool list +3. Update agent prompts to describe new capabilities + +### Custom Memory Backends +1. Extend MemoryManager class +2. Implement required interface methods +3. Update initialization in memory_system.py + +## Troubleshooting + +### Common Issues +- **Missing API keys**: Check env.local file setup +- **Tool failures**: Verify network connectivity and API quotas +- **Memory errors**: Check Supabase configuration (optional) +- **Import errors**: Ensure all dependencies are installed + +### Debug Mode +Set environment variable for detailed logging: +```bash +export LANGFUSE_DEBUG=true +``` + +This implementation follows the specified plan while incorporating LangGraph and Langfuse best practices for a robust, observable, and maintainable multi-agent system. \ No newline at end of file diff --git a/__pycache__/langgraph_agent_system.cpython-310.pyc b/__pycache__/langgraph_agent_system.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..18bf4fce3575ea922028fefc390394cd6d37380e Binary files /dev/null and b/__pycache__/langgraph_agent_system.cpython-310.pyc differ diff --git a/__pycache__/langgraph_agent_system.cpython-313.pyc b/__pycache__/langgraph_agent_system.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dfac0586bbfdf99eecb2201d21f05f66f9bc9c0a Binary files /dev/null and b/__pycache__/langgraph_agent_system.cpython-313.pyc differ diff --git a/__pycache__/memory_system.cpython-313.pyc b/__pycache__/memory_system.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6e5ac426b7af1bb072181ffebb578a1b1bee4167 Binary files /dev/null and b/__pycache__/memory_system.cpython-313.pyc differ diff --git a/__pycache__/observability.cpython-310.pyc b/__pycache__/observability.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..80428ad238a2d0b584479b6c32eaa0f6626e8236 Binary files /dev/null and b/__pycache__/observability.cpython-310.pyc differ diff --git a/__pycache__/observability.cpython-313.pyc b/__pycache__/observability.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2a0f3e0719ceaf4eecd3618550e27d94cfc52a07 Binary files /dev/null and b/__pycache__/observability.cpython-313.pyc differ diff --git a/__pycache__/tools.cpython-313.pyc b/__pycache__/tools.cpython-313.pyc index 3333193c9fef639080dce6544681b2ec0ad88715..34da0135f39afb0ec67c4d4a9d8587e9b6b6d4bb 100644 Binary files a/__pycache__/tools.cpython-313.pyc and b/__pycache__/tools.cpython-313.pyc differ diff --git a/agents/__init__.py b/agents/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c91ca3247c3cd0e4e13f68429c3203f02f9ce33b --- /dev/null +++ b/agents/__init__.py @@ -0,0 +1,21 @@ +""" +Multi-Agent System Components + +This package contains the specialized agents for the LangGraph-based system: +- LeadAgent: Orchestrates workflow and decision making +- ResearchAgent: Information gathering and research tasks +- CodeAgent: Computational and code execution tasks +- AnswerFormatter: Final answer formatting according to GAIA requirements +""" + +from .lead_agent import lead_agent +from .research_agent import research_agent +from .code_agent import code_agent +from .answer_formatter import answer_formatter + +__all__ = [ + "lead_agent", + "research_agent", + "code_agent", + "answer_formatter" +] \ No newline at end of file diff --git a/agents/__pycache__/__init__.cpython-313.pyc b/agents/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2e786d74c2834fe8f118dfe45e0f3a9ab66df638 Binary files /dev/null and b/agents/__pycache__/__init__.cpython-313.pyc differ diff --git a/agents/__pycache__/answer_formatter.cpython-310.pyc b/agents/__pycache__/answer_formatter.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5e9f0c4590da346b47671aa02093841c1b15f713 Binary files /dev/null and b/agents/__pycache__/answer_formatter.cpython-310.pyc differ diff --git a/agents/__pycache__/answer_formatter.cpython-313.pyc b/agents/__pycache__/answer_formatter.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..320f72d6a097b31dd2ffae3aff54da897078f738 Binary files /dev/null and b/agents/__pycache__/answer_formatter.cpython-313.pyc differ diff --git a/agents/__pycache__/code_agent.cpython-310.pyc b/agents/__pycache__/code_agent.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..0b89cffc81344cd1008ef4962147c44e7cfbcaa8 Binary files /dev/null and b/agents/__pycache__/code_agent.cpython-310.pyc differ diff --git a/agents/__pycache__/code_agent.cpython-313.pyc b/agents/__pycache__/code_agent.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..257ebfd0d7d91ea9f7b065f650a3fda1bc1198bf Binary files /dev/null and b/agents/__pycache__/code_agent.cpython-313.pyc differ diff --git a/agents/__pycache__/lead_agent.cpython-310.pyc b/agents/__pycache__/lead_agent.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8c2e26a5bde9fc11ec719467f077e941e1d95489 Binary files /dev/null and b/agents/__pycache__/lead_agent.cpython-310.pyc differ diff --git a/agents/__pycache__/lead_agent.cpython-313.pyc b/agents/__pycache__/lead_agent.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1e718c83f4834e23ca95cbc9025aacf79ae70ef7 Binary files /dev/null and b/agents/__pycache__/lead_agent.cpython-313.pyc differ diff --git a/agents/__pycache__/research_agent.cpython-310.pyc b/agents/__pycache__/research_agent.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..003d07102bacd7e4e4d3d3a7b8614d7e2f1728fa Binary files /dev/null and b/agents/__pycache__/research_agent.cpython-310.pyc differ diff --git a/agents/__pycache__/research_agent.cpython-313.pyc b/agents/__pycache__/research_agent.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..61d9bfa18ca6c7bdbbcc220785a0908d564507e8 Binary files /dev/null and b/agents/__pycache__/research_agent.cpython-313.pyc differ diff --git a/agents/answer_formatter.py b/agents/answer_formatter.py new file mode 100644 index 0000000000000000000000000000000000000000..185635db2a7b32b0909dd65b6e9c610b8aaa9d6c --- /dev/null +++ b/agents/answer_formatter.py @@ -0,0 +1,243 @@ +""" +Answer Formatter - Final answer formatting according to GAIA requirements + +The Answer Formatter is responsible for: +1. Taking the draft answer and formatting it according to GAIA rules +2. Extracting the final answer from comprehensive responses +3. Ensuring exact-match compliance +4. Handling different answer types (numbers, strings, lists) +""" + +import re +from typing import Dict, Any +from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage +from langgraph.types import Command +from langchain_groq import ChatGroq +from observability import agent_span +from dotenv import load_dotenv + +load_dotenv("env.local") + + +def extract_final_answer(text: str) -> str: + """ + Extract the final answer from text following GAIA formatting rules. + + GAIA Rules: + • Single number → write the number only (no commas, units, or other symbols) + • Single string/phrase → write the text only; omit articles and abbreviations unless explicitly required + • List → separate elements with a single comma and a space + • Never include surrounding text, quotes, brackets, or markdown + """ + + if not text or not text.strip(): + return "" + + # Clean the text + text = text.strip() + + # Look for explicit final answer markers + answer_patterns = [ + r"final answer[:\s]*(.+?)(?:\n|$)", + r"answer[:\s]*(.+?)(?:\n|$)", + r"result[:\s]*(.+?)(?:\n|$)", + r"conclusion[:\s]*(.+?)(?:\n|$)" + ] + + for pattern in answer_patterns: + match = re.search(pattern, text, re.IGNORECASE | re.MULTILINE) + if match: + text = match.group(1).strip() + break + + # Remove common prefixes/suffixes + prefixes_to_remove = [ + "the answer is", "it is", "this is", "that is", + "final answer:", "answer:", "result:", "conclusion:", + "therefore", "thus", "so", "hence" + ] + + for prefix in prefixes_to_remove: + if text.lower().startswith(prefix.lower()): + text = text[len(prefix):].strip() + + # Remove quotes, brackets, and markdown + text = re.sub(r'^["\'\[\(]|["\'\]\)]$', '', text) + text = re.sub(r'^\*\*|\*\*$', '', text) # Remove bold markdown + text = re.sub(r'^`|`$', '', text) # Remove code markdown + + # Handle different answer types + + # Check if it's a pure number + number_match = re.match(r'^-?\d+(?:\.\d+)?$', text.strip()) + if number_match: + # Return number without formatting + num = float(text.strip()) if '.' in text else int(text.strip()) + return str(int(num)) if num == int(num) else str(num) + + # Check if it's a list (comma-separated) + if ',' in text: + items = [item.strip() for item in text.split(',')] + # Clean each item + cleaned_items = [] + for item in items: + item = re.sub(r'^["\'\[\(]|["\'\]\)]$', '', item.strip()) + if item: + cleaned_items.append(item) + return ', '.join(cleaned_items) + + # For single strings, remove articles if they're not essential + # But be careful not to remove essential parts + words = text.split() + if len(words) > 1 and words[0].lower() in ['the', 'a', 'an']: + # Only remove if the rest makes sense + remaining = ' '.join(words[1:]) + if remaining and len(remaining) > 2: + text = remaining + + return text.strip() + + +def load_formatter_prompt() -> str: + """Load the formatting prompt""" + try: + with open("archive/prompts/verification_prompt.txt", "r") as f: + return f.read() + except FileNotFoundError: + return """ +You are a final answer formatter ensuring compliance with GAIA benchmark requirements. + +Your task is to extract the precise final answer from a comprehensive response. + +CRITICAL FORMATTING RULES: +• Single number → write the number only (no commas, units, or symbols) +• Single string/phrase → write the text only; omit articles unless required +• List → separate elements with comma and space +• NEVER include surrounding text like "Final Answer:", quotes, brackets, or markdown +• The response must contain ONLY the answer itself + +Examples: +Question: "What is 25 + 17?" +Draft: "After calculating, the answer is 42." +Formatted: "42" + +Question: "What is the capital of France?" +Draft: "The capital of France is Paris." +Formatted: "Paris" + +Question: "List the first 3 prime numbers" +Draft: "The first three prime numbers are 2, 3, and 5." +Formatted: "2, 3, 5" + +Extract ONLY the final answer following these rules exactly. +""" + + +def answer_formatter(state: Dict[str, Any]) -> Command: + """ + Answer Formatter node that creates GAIA-compliant final answers. + + Takes the draft_answer and formats it according to GAIA requirements. + Returns Command to END the workflow. + """ + + print("📝 Answer Formatter: Creating final formatted answer...") + + try: + # Get formatting prompt + formatter_prompt = load_formatter_prompt() + + # Initialize LLM for formatting + llm = ChatGroq( + model="llama-3.3-70b-versatile", + temperature=0.0, # Zero temperature for consistent formatting + max_tokens=512 + ) + + # Create agent span for tracing + with agent_span( + "formatter", + metadata={ + "draft_answer_length": len(state.get("draft_answer", "")), + "user_id": state.get("user_id", "unknown"), + "session_id": state.get("session_id", "unknown") + } + ) as span: + + # Get the draft answer + draft_answer = state.get("draft_answer", "") + + if not draft_answer: + final_answer = "No answer could be generated." + else: + # Get the original question for context + messages = state.get("messages", []) + user_query = "" + for msg in messages: + if isinstance(msg, HumanMessage): + user_query = msg.content + break + + # Build formatting request + formatting_request = f""" +Extract the final answer from this comprehensive response following GAIA formatting rules: + +Original Question: {user_query} + +Draft Response: +{draft_answer} + +Instructions: +1. Identify the core answer within the draft response +2. Remove all explanatory text, prefixes, and formatting +3. Apply GAIA formatting rules exactly +4. Return ONLY the final answer + +What is the properly formatted final answer? +""" + + # Create messages for formatting + formatting_messages = [ + SystemMessage(content=formatter_prompt), + HumanMessage(content=formatting_request) + ] + + # Get formatted response + response = llm.invoke(formatting_messages) + + # Extract the final answer using our utility function + final_answer = extract_final_answer(response.content) + + # Fallback: if extraction fails, try direct extraction from draft + if not final_answer or len(final_answer) < 1: + print("⚠️ LLM formatting failed, using direct extraction") + final_answer = extract_final_answer(draft_answer) + + # Final fallback + if not final_answer: + final_answer = "Unable to extract a clear answer." + + print(f"📝 Answer Formatter: Final answer = '{final_answer}'") + + # Update trace + if span: + span.update_trace(output={"final_answer": final_answer}) + + # Return command to END the workflow + return Command( + goto="__end__", + update={ + "final_answer": final_answer + } + ) + + except Exception as e: + print(f"❌ Answer Formatter Error: {e}") + + # Return error as final answer + return Command( + goto="__end__", + update={ + "final_answer": f"Error formatting answer: {str(e)}" + } + ) \ No newline at end of file diff --git a/agents/code_agent.py b/agents/code_agent.py new file mode 100644 index 0000000000000000000000000000000000000000..646720de3666df48a816faae10ee0a7a07d00eb9 --- /dev/null +++ b/agents/code_agent.py @@ -0,0 +1,440 @@ +""" +Code Agent - Computational tasks and code execution + +The Code Agent is responsible for: +1. Performing mathematical calculations +2. Executing Python code for data analysis +3. Processing numerical data and computations +4. Returning structured computational results +""" + +import os +import sys +import io +import contextlib +from typing import Dict, Any, List +from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage +from langgraph.types import Command +from langchain_groq import ChatGroq +from langchain_core.tools import Tool +from observability import agent_span, tool_span +from dotenv import load_dotenv + +# Import calculator tools from the existing tools.py +from tools import get_calculator_tool, get_hub_stats_tool + +load_dotenv("env.local") + + +def create_code_tools() -> List[Tool]: + """Create LangChain-compatible computational tools""" + tools = [] + + # Mathematical calculator tools + def multiply_func(a: float, b: float) -> str: + """Multiply two numbers""" + try: + result = a * b + return f"{a} × {b} = {result}" + except Exception as e: + return f"Error: {str(e)}" + + def add_func(a: float, b: float) -> str: + """Add two numbers""" + try: + result = a + b + return f"{a} + {b} = {result}" + except Exception as e: + return f"Error: {str(e)}" + + def subtract_func(a: float, b: float) -> str: + """Subtract two numbers""" + try: + result = a - b + return f"{a} - {b} = {result}" + except Exception as e: + return f"Error: {str(e)}" + + def divide_func(a: float, b: float) -> str: + """Divide two numbers""" + try: + if b == 0: + return "Error: Cannot divide by zero" + result = a / b + return f"{a} ÷ {b} = {result}" + except Exception as e: + return f"Error: {str(e)}" + + def modulus_func(a: int, b: int) -> str: + """Get the modulus of two numbers""" + try: + if b == 0: + return "Error: Cannot modulo by zero" + result = a % b + return f"{a} mod {b} = {result}" + except Exception as e: + return f"Error: {str(e)}" + + # Create calculator tools + calc_tools = [ + Tool(name="multiply", description="Multiply two numbers. Use format: multiply(a, b)", func=lambda input_str: multiply_func(*map(float, input_str.split(',')))), + Tool(name="add", description="Add two numbers. Use format: add(a, b)", func=lambda input_str: add_func(*map(float, input_str.split(',')))), + Tool(name="subtract", description="Subtract two numbers. Use format: subtract(a, b)", func=lambda input_str: subtract_func(*map(float, input_str.split(',')))), + Tool(name="divide", description="Divide two numbers. Use format: divide(a, b)", func=lambda input_str: divide_func(*map(float, input_str.split(',')))), + Tool(name="modulus", description="Get modulus of two integers. Use format: modulus(a, b)", func=lambda input_str: modulus_func(*map(int, input_str.split(',')))), + ] + + tools.extend(calc_tools) + print(f"✅ Added {len(calc_tools)} calculator tools") + + # Hub stats tool + try: + from tools import get_hub_stats + + def hub_stats_func(author: str) -> str: + """Get Hugging Face Hub statistics for an author""" + try: + return get_hub_stats(author) + except Exception as e: + return f"Hub stats error: {str(e)}" + + hub_tool = Tool( + name="hub_stats", + description="Get statistics for Hugging Face Hub models by author", + func=hub_stats_func + ) + tools.append(hub_tool) + print("✅ Added Hub stats tool") + except Exception as e: + print(f"⚠️ Could not load Hub stats tool: {e}") + + # Python execution tool + python_tool = create_python_execution_tool() + tools.append(python_tool) + print("✅ Added Python execution tool") + + print(f"🔧 Code Agent loaded {len(tools)} tools") + return tools + + +def create_python_execution_tool() -> Tool: + """Create a tool for executing Python code safely""" + + def execute_python_code(code: str) -> str: + """ + Execute Python code in a controlled environment. + + Args: + code: Python code to execute + + Returns: + String containing the output or error message + """ + # Create a string buffer to capture output + output_buffer = io.StringIO() + error_buffer = io.StringIO() + + # Prepare a safe execution environment + safe_globals = { + '__builtins__': { + 'print': lambda *args, **kwargs: print(*args, file=output_buffer, **kwargs), + 'len': len, + 'str': str, + 'int': int, + 'float': float, + 'list': list, + 'dict': dict, + 'set': set, + 'tuple': tuple, + 'range': range, + 'sum': sum, + 'max': max, + 'min': min, + 'abs': abs, + 'round': round, + 'sorted': sorted, + 'enumerate': enumerate, + 'zip': zip, + 'map': map, + 'filter': filter, + } + } + + # Allow common safe modules + try: + import math + import statistics + import datetime + import json + import re + + safe_globals.update({ + 'math': math, + 'statistics': statistics, + 'datetime': datetime, + 'json': json, + 're': re, + }) + except ImportError: + pass + + try: + # Execute the code + with contextlib.redirect_stdout(output_buffer), \ + contextlib.redirect_stderr(error_buffer): + exec(code, safe_globals) + + # Get the output + output = output_buffer.getvalue() + error = error_buffer.getvalue() + + if error: + return f"Error: {error}" + elif output: + return output.strip() + else: + return "Code executed successfully (no output)" + + except Exception as e: + return f"Execution error: {str(e)}" + finally: + output_buffer.close() + error_buffer.close() + + return Tool( + name="python_execution", + description="Execute Python code for calculations and data processing. Use for complex computations, data analysis, or when calculator tools are insufficient.", + func=execute_python_code + ) + + +def load_code_prompt() -> str: + """Load the code execution prompt""" + try: + with open("archive/prompts/execution_prompt.txt", "r") as f: + return f.read() + except FileNotFoundError: + return """ +You are a computational specialist focused on accurate calculations and code execution. + +Your goals: +1. Perform mathematical calculations accurately +2. Write and execute Python code for complex computations +3. Process data and perform analysis as needed +4. Provide clear, numerical results + +When handling computational tasks: +- Use calculator tools for basic arithmetic operations +- Use Python execution for complex calculations, data processing, or multi-step computations +- Show your work and intermediate steps +- Verify results when possible +- Handle edge cases and potential errors + +Available tools: +- Calculator tools: add, subtract, multiply, divide, modulus +- Python execution: for complex computations and data analysis +- Hub stats tool: for Hugging Face model information + +Format your response as: +### Computational Analysis +[Description of the approach] + +### Calculations +[Step-by-step calculations or code] + +### Results +[Final numerical results or outputs] +""" + + +def code_agent(state: Dict[str, Any]) -> Command: + """ + Code Agent node that handles computational tasks and code execution. + + Returns Command with computational results appended to code_outputs. + """ + + print("🧮 Code Agent: Processing computational tasks...") + + try: + # Get code execution prompt + code_prompt = load_code_prompt() + + # Initialize LLM with tools + llm = ChatGroq( + model="llama-3.3-70b-versatile", + temperature=0.1, # Low temperature for accuracy in calculations + max_tokens=2048 + ) + + # Get computational tools + tools = create_code_tools() + + # Bind tools to LLM + llm_with_tools = llm.bind_tools(tools) + + # Create agent span for tracing + with agent_span( + "code", + metadata={ + "tools_available": len(tools), + "research_context_length": len(state.get("research_notes", "")), + "user_id": state.get("user_id", "unknown"), + "session_id": state.get("session_id", "unknown") + } + ) as span: + + # Extract user query and research context + messages = state.get("messages", []) + user_query = "" + for msg in messages: + if isinstance(msg, HumanMessage): + user_query = msg.content + break + + research_notes = state.get("research_notes", "") + + # Build computational request + code_request = f""" +Please analyze the following question and perform any necessary calculations or code execution: + +Question: {user_query} + +Research Context: +{research_notes} + +Current computational work: {len(state.get('code_outputs', ''))} characters already completed + +Instructions: +1. Identify any computational or mathematical aspects of the question +2. Use appropriate tools for calculations or code execution +3. Show your work and intermediate steps +4. Provide clear, accurate results +5. If no computation is needed, state that clearly + +Please perform all necessary calculations to help answer this question. +""" + + # Create messages for code execution + code_messages = [ + SystemMessage(content=code_prompt), + HumanMessage(content=code_request) + ] + + # Get computational response + response = llm_with_tools.invoke(code_messages) + + # Process the response - handle both tool calls and direct responses + computation_results = [] + + # Check if the LLM wants to use tools + if hasattr(response, 'tool_calls') and response.tool_calls: + print(f"🛠️ Executing {len(response.tool_calls)} computational operations") + + # Execute tool calls and collect results + for tool_call in response.tool_calls: + try: + # Find the tool by name + tool = next((t for t in tools if t.name == tool_call['name']), None) + if tool: + # Handle different argument formats + args = tool_call.get('args', {}) + if isinstance(args, dict): + # Convert dict args to string for simple tools + if len(args) == 1: + arg_value = list(args.values())[0] + else: + arg_value = ','.join(str(v) for v in args.values()) + else: + arg_value = str(args) + + result = tool.func(arg_value) + computation_results.append(f"**{tool.name}**: {result}") + else: + computation_results.append(f"**{tool_call['name']}**: Tool not found") + except Exception as e: + print(f"⚠️ Tool {tool_call.get('name', 'unknown')} failed: {e}") + computation_results.append(f"**{tool_call.get('name', 'unknown')}**: Error - {str(e)}") + + # Compile computational results + if computation_results: + computational_findings = "\n\n".join(computation_results) + else: + # No tools used or tool calls failed, analyze if computation is needed + computational_findings = response.content if hasattr(response, 'content') else str(response) + + # If the response looks like it should have used tools but didn't, try direct calculation + if any(op in user_query.lower() for op in ['+', '-', '*', '/', 'calculate', 'compute', 'multiply', 'add', 'subtract', 'divide']): + print("🔧 Attempting direct calculation...") + + # Try to extract and solve simple mathematical expressions + import re + + # Look for simple math expressions + math_patterns = [ + r'(\d+)\s*\+\s*(\d+)', # addition + r'(\d+)\s*\*\s*(\d+)', # multiplication + r'(\d+)\s*-\s*(\d+)', # subtraction + r'(\d+)\s*/\s*(\d+)', # division + ] + + for pattern in math_patterns: + matches = re.findall(pattern, user_query) + if matches: + for match in matches: + a, b = int(match[0]), int(match[1]) + if '+' in user_query: + result = a + b + computational_findings += f"\n\nDirect calculation: {a} + {b} = {result}" + elif '*' in user_query: + result = a * b + computational_findings += f"\n\nDirect calculation: {a} × {b} = {result}" + elif '-' in user_query: + result = a - b + computational_findings += f"\n\nDirect calculation: {a} - {b} = {result}" + elif '/' in user_query: + result = a / b + computational_findings += f"\n\nDirect calculation: {a} ÷ {b} = {result}" + break + + # Format computational results + formatted_results = f""" +### Computational Analysis {state.get('loop_counter', 0) + 1} + +{computational_findings} + +--- +""" + + print(f"🧮 Code Agent: Generated {len(formatted_results)} characters of computational results") + + # Update span with results if available + if span: + span.update_trace(metadata={ + "computation_length": len(formatted_results), + "results_preview": formatted_results[:300] + "..." + }) + + # Return command to go back to lead agent + return Command( + goto="lead", + update={ + "code_outputs": state.get("code_outputs", "") + formatted_results + } + ) + + except Exception as e: + print(f"❌ Code Agent Error: {e}") + + # Return with error information + error_result = f""" +### Computational Error +An error occurred during code execution: {str(e)} + +""" + return Command( + goto="lead", + update={ + "code_outputs": state.get("code_outputs", "") + error_result + } + ) \ No newline at end of file diff --git a/agents/lead_agent.py b/agents/lead_agent.py new file mode 100644 index 0000000000000000000000000000000000000000..61acf713e314d0b7862d2d30396aa7fb4657b497 --- /dev/null +++ b/agents/lead_agent.py @@ -0,0 +1,243 @@ +""" +Lead Agent - Orchestrates the multi-agent workflow + +The Lead Agent is responsible for: +1. Analyzing user queries and determining next steps +2. Managing the iterative research/code loop +3. Deciding when enough information has been gathered +4. Coordinating between specialized agents +5. Maintaining the overall workflow state +""" + +import os +from typing import Dict, Any, Literal +from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage +from langgraph.types import Command +from langchain_groq import ChatGroq +from observability import agent_span +from dotenv import load_dotenv + +# Import memory system +from memory_system import MemoryManager + +load_dotenv("env.local") + +# Initialize memory manager +memory_manager = MemoryManager() + +def load_system_prompt() -> str: + """Load the system prompt for the lead agent""" + try: + with open("archive/prompts/system_prompt.txt", "r") as f: + base_prompt = f.read() + + lead_prompt = f""" +{base_prompt} + +As the Lead Agent, you coordinate a team of specialists: +- Research Agent: Gathers information from web, papers, and knowledge bases +- Code Agent: Performs calculations and executes Python code + +Your responsibilities: +1. Analyze the user's question to determine what information and computations are needed +2. Decide whether to delegate to research, code, both, or proceed to final answer +3. Synthesize results from specialists into a coherent draft answer +4. Determine when sufficient information has been gathered + +Decision criteria: +- If the question requires factual information, current events, or research → delegate to research +- If the question requires calculations, data analysis, or code execution → delegate to code +- If you have sufficient information to answer → proceed to formatting +- Maximum 3 iterations to prevent infinite loops + +Always maintain the exact formatting requirements specified in the system prompt. +""" + return lead_prompt + except FileNotFoundError: + return """You are a helpful assistant coordinating a team of specialists to answer questions accurately.""" + + +def lead_agent(state: Dict[str, Any]) -> Command[Literal["research", "code", "formatter", "__end__"]]: + """ + Lead Agent node that orchestrates the workflow. + + Makes decisions about: + - Whether more research is needed + - Whether code execution is needed + - When to proceed to final formatting + - When the loop should terminate + + Returns Command with routing decision and state updates. + """ + + loop_counter = state.get('loop_counter', 0) + max_iterations = state.get('max_iterations', 3) + + print(f"🎯 Lead Agent: Processing request (iteration {loop_counter})") + + # Check for termination conditions first + if loop_counter >= max_iterations: + print("🔄 Maximum iterations reached, proceeding to formatter") + return Command( + goto="formatter", + update={ + "loop_counter": loop_counter + 1, + "next": "formatter" + } + ) + + try: + # Get the system prompt + system_prompt = load_system_prompt() + + # Initialize LLM + llm = ChatGroq( + model="llama-3.3-70b-versatile", + temperature=0.1, # Low temperature for consistent routing decisions + max_tokens=1024 + ) + + # Create agent span for tracing + with agent_span( + "lead", + metadata={ + "loop_counter": loop_counter, + "research_notes_length": len(state.get("research_notes", "")), + "code_outputs_length": len(state.get("code_outputs", "")), + "user_id": state.get("user_id", "unknown"), + "session_id": state.get("session_id", "unknown") + } + ) as span: + + # Build context for decision making + messages = state.get("messages", []) + research_notes = state.get("research_notes", "") + code_outputs = state.get("code_outputs", "") + + # Get the original user query + user_query = "" + for msg in messages: + if isinstance(msg, HumanMessage): + user_query = msg.content + break + + # Check for similar questions in memory + similar_context = "" + if user_query: + try: + similar_qa = memory_manager.get_similar_qa(user_query) + if similar_qa: + similar_context = f"\n\nSimilar previous Q&A:\n{similar_qa}" + except Exception as e: + print(f"💾 Memory cache hit") # Simplified message + + # Build decision prompt + decision_prompt = f""" +Based on the user's question and current progress, decide the next action. + +Original Question: {user_query} + +Current Progress: +- Loop iteration: {loop_counter} +- Research gathered: {len(research_notes)} characters +- Code outputs: {len(code_outputs)} characters + +Research Notes So Far: +{research_notes if research_notes else "None yet"} + +Code Outputs So Far: +{code_outputs if code_outputs else "None yet"} + +{similar_context} + +Analyze what's still needed: +1. Is factual information, current events, or research missing? → route to "research" +2. Are calculations, data analysis, or code execution needed? → route to "code" +3. Do we have sufficient information to provide a complete answer? → route to "formatter" + +Respond with ONLY one of: research, code, formatter +""" + + # Get decision from LLM + decision_messages = [ + SystemMessage(content=system_prompt), + HumanMessage(content=decision_prompt) + ] + + response = llm.invoke(decision_messages) + decision = response.content.strip().lower() + + # Validate decision + valid_decisions = ["research", "code", "formatter"] + if decision not in valid_decisions: + print(f"⚠️ Invalid decision '{decision}', defaulting to 'research'") + decision = "research" + + # Prepare state updates + updates = { + "loop_counter": loop_counter + 1, + "next": decision + } + + # If we're done, create draft answer + if decision == "formatter": + # Create a comprehensive draft answer from gathered information + draft_prompt = f""" +Create a comprehensive answer based on all gathered information: + +Original Question: {user_query} + +Research Information: +{research_notes} + +Code Results: +{code_outputs} + +Instructions: +1. Synthesize all available information to answer the question +2. If computational results are available, include them +3. If research provides context, incorporate it +4. Provide a clear, direct answer to the user's question +5. Focus on accuracy and completeness + +What is your answer to the user's question? +""" + + draft_messages = [ + SystemMessage(content=system_prompt), + HumanMessage(content=draft_prompt) + ] + + try: + draft_response = llm.invoke(draft_messages) + draft_content = draft_response.content if hasattr(draft_response, 'content') else str(draft_response) + updates["draft_answer"] = draft_content + print(f"📝 Lead Agent: Created draft answer ({len(draft_content)} characters)") + except Exception as e: + print(f"⚠️ Error creating draft answer: {e}") + # Fallback - create a simple answer from available data + fallback_answer = f"Based on the available information:\n\nResearch: {research_notes}\nCalculations: {code_outputs}" + updates["draft_answer"] = fallback_answer + + # Log decision + print(f"🎯 Lead Agent Decision: {decision} (iteration {loop_counter + 1})") + + if span: + span.update_trace(output={"decision": decision, "updates": updates}) + + return Command( + goto=decision, + update=updates + ) + + except Exception as e: + print(f"❌ Lead Agent Error: {e}") + # On error, proceed to formatter with error message + return Command( + goto="formatter", + update={ + "draft_answer": f"I encountered an error while processing your request: {str(e)}", + "loop_counter": loop_counter + 1, + "next": "formatter" + } + ) \ No newline at end of file diff --git a/agents/research_agent.py b/agents/research_agent.py new file mode 100644 index 0000000000000000000000000000000000000000..82dcb168e532c966b6d827af06a36299d10e1d7d --- /dev/null +++ b/agents/research_agent.py @@ -0,0 +1,295 @@ +""" +Research Agent - Information gathering and research tasks + +The Research Agent is responsible for: +1. Gathering information from multiple sources (web, Wikipedia, arXiv) +2. Searching for relevant context and facts +3. Compiling research results in a structured format +4. Returning citations and source information +""" + +import os +from typing import Dict, Any, List +from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage +from langgraph.types import Command +from langchain_groq import ChatGroq +from langchain_core.tools import Tool +from observability import agent_span, tool_span +from dotenv import load_dotenv + +# Import tools from the existing tools.py +from tools import ( + get_tavily_tool, + get_wikipedia_tool, + get_arxiv_tool, + get_wikipedia_reader, + get_arxiv_reader +) + +load_dotenv("env.local") + + +def create_research_tools() -> List[Tool]: + """Create LangChain-compatible research tools""" + tools = [] + + try: + # Import LlamaIndex tools and convert them + from tools import get_tavily_tool, get_wikipedia_tool, get_arxiv_tool + + # Tavily web search + try: + tavily_spec = get_tavily_tool() + if tavily_spec: + # Convert to LangChain tool + def tavily_search(query: str) -> str: + try: + tavily_tools = tavily_spec.to_tool_list() + if tavily_tools: + result = tavily_tools[0].call({"input": query}) + return str(result) + except Exception as e: + return f"Search error: {str(e)}" + return "No search results found" + + tavily_tool = Tool( + name="web_search", + description="Search the web for current information and facts using Tavily API", + func=tavily_search + ) + tools.append(tavily_tool) + print(f"✅ Added Tavily web search tool") + except Exception as e: + print(f"⚠️ Could not load Tavily tools: {e}") + + # Wikipedia search + try: + wikipedia_tool = get_wikipedia_tool() + if wikipedia_tool: + def wikipedia_search(query: str) -> str: + try: + result = wikipedia_tool.call({"input": query}) + return str(result) + except Exception as e: + return f"Wikipedia search error: {str(e)}" + + wiki_tool = Tool( + name="wikipedia_search", + description="Search Wikipedia for encyclopedic information", + func=wikipedia_search + ) + tools.append(wiki_tool) + print("✅ Added Wikipedia tool") + except Exception as e: + print(f"⚠️ Could not load Wikipedia tool: {e}") + + # ArXiv search + try: + arxiv_tool = get_arxiv_tool() + if arxiv_tool: + def arxiv_search(query: str) -> str: + try: + result = arxiv_tool.call({"input": query}) + return str(result) + except Exception as e: + return f"ArXiv search error: {str(e)}" + + arxiv_lc_tool = Tool( + name="arxiv_search", + description="Search ArXiv for academic papers and research", + func=arxiv_search + ) + tools.append(arxiv_lc_tool) + print("✅ Added ArXiv tool") + except Exception as e: + print(f"⚠️ Could not load ArXiv tool: {e}") + + except Exception as e: + print(f"⚠️ Error setting up research tools: {e}") + + print(f"🔧 Research Agent loaded {len(tools)} tools") + return tools + + +def load_research_prompt() -> str: + """Load the research-specific prompt""" + try: + with open("archive/prompts/retrieval_prompt.txt", "r") as f: + return f.read() + except FileNotFoundError: + return """ +You are a research specialist focused on gathering accurate information. + +Your goals: +1. Search for factual, current, and relevant information +2. Use multiple sources to verify facts +3. Provide clear citations and sources +4. Structure findings in an organized manner + +When researching: +- Use web search for current information and facts +- Use Wikipedia for encyclopedic knowledge +- Use ArXiv for academic and technical topics +- Cross-reference information across sources +- Note any conflicting information found + +Format your response as: +### Research Results +- **Source 1**: [findings] +- **Source 2**: [findings] +- **Source 3**: [findings] + +### Key Facts +- Fact 1 +- Fact 2 +- Fact 3 + +### Citations +- Citation 1 +- Citation 2 +""" + + +def research_agent(state: Dict[str, Any]) -> Command: + """ + Research Agent node that gathers information using available tools. + + Returns Command with research results appended to research_notes. + """ + + print("🔍 Research Agent: Gathering information...") + + try: + # Get research prompt + research_prompt = load_research_prompt() + + # Initialize LLM with tools + llm = ChatGroq( + model="llama-3.3-70b-versatile", + temperature=0.3, # Slightly higher for research creativity + max_tokens=2048 + ) + + # Get research tools + tools = create_research_tools() + + # Bind tools to LLM if available + if tools: + llm_with_tools = llm.bind_tools(tools) + else: + llm_with_tools = llm + print("⚠️ No tools available, proceeding with LLM only") + + # Create agent span for tracing + with agent_span( + "research", + metadata={ + "tools_available": len(tools), + "user_id": state.get("user_id", "unknown"), + "session_id": state.get("session_id", "unknown") + } + ) as span: + + # Extract user query + messages = state.get("messages", []) + user_query = "" + for msg in messages: + if isinstance(msg, HumanMessage): + user_query = msg.content + break + + # Build research request + research_request = f""" +Please research the following question using available tools: + +Question: {user_query} + +Current research status: {len(state.get('research_notes', ''))} characters already gathered + +Instructions: +1. Search for factual information relevant to the question +2. Use multiple sources if possible for verification +3. Focus on accuracy and currency of information +4. Provide clear citations and sources +5. Structure your findings clearly + +Please gather comprehensive information to help answer this question. +""" + + # Create messages for research + research_messages = [ + SystemMessage(content=research_prompt), + HumanMessage(content=research_request) + ] + + # Get research response + if tools: + # Try using tools for research + response = llm_with_tools.invoke(research_messages) + + # If the response contains tool calls, execute them + if hasattr(response, 'tool_calls') and response.tool_calls: + print(f"🛠️ Executing {len(response.tool_calls)} tool calls") + + # Execute tool calls and collect results + tool_results = [] + for tool_call in response.tool_calls: + try: + # Find the tool + tool = next((t for t in tools if t.name == tool_call['name']), None) + if tool: + result = tool.run(tool_call['args']) + tool_results.append(f"**{tool.name}**: {result}") + except Exception as e: + print(f"⚠️ Tool {tool_call['name']} failed: {e}") + tool_results.append(f"**{tool_call['name']}**: Error - {str(e)}") + + # Compile research results + research_findings = "\n\n".join(tool_results) if tool_results else response.content + else: + research_findings = response.content + else: + # No tools available, use LLM knowledge only + research_findings = llm.invoke(research_messages).content + + # Format research results + formatted_results = f""" +### Research Iteration {state.get('loop_counter', 0) + 1} + +{research_findings} + +--- +""" + + print(f"📝 Research Agent: Gathered {len(formatted_results)} characters") + + # Update trace + if span: + span.update_trace(output={ + "research_length": len(formatted_results), + "findings_preview": formatted_results[:300] + "..." + }) + + # Return command to proceed back to lead agent for decision + return Command( + goto="lead", + update={ + "research_notes": formatted_results + } + ) + + except Exception as e: + print(f"❌ Research Agent Error: {e}") + + # Return with error information + error_result = f""" +### Research Error +An error occurred during research: {str(e)} + +""" + return Command( + goto="lead", + update={ + "research_notes": error_result + } + ) \ No newline at end of file diff --git a/archive/.cursor/rules/archive.mdc b/archive/.cursor/rules/archive.mdc new file mode 100644 index 0000000000000000000000000000000000000000..948753c395349c746e34dc985360362e0c5c66e1 --- /dev/null +++ b/archive/.cursor/rules/archive.mdc @@ -0,0 +1,6 @@ +--- +description: +globs: *.py,*.md +alwaysApply: false +--- +don't make any changes to these files, the files in this folder is only for reference, you can copy the file and then make changes to it, but never makes changes to the files in this folder. use the code for reference only. \ No newline at end of file diff --git a/ARCHITECTURE.md b/archive/ARCHITECTURE.md similarity index 100% rename from ARCHITECTURE.md rename to archive/ARCHITECTURE.md diff --git a/archive/README.md b/archive/README.md new file mode 100644 index 0000000000000000000000000000000000000000..45da6548516301d584016b4e7084fdbb1f8facbc --- /dev/null +++ b/archive/README.md @@ -0,0 +1 @@ +This folder contains previous attempts at agent creation which is not the correct workflow to create agents based on blogpost of https://www.anthropic.com/engineering/built-multi-agent-research-system and https://cognition.ai/blog/dont-build-multi-agents \ No newline at end of file diff --git a/prompts/critic_prompt.txt b/archive/prompts/critic_prompt.txt similarity index 100% rename from prompts/critic_prompt.txt rename to archive/prompts/critic_prompt.txt diff --git a/prompts/execution_prompt.txt b/archive/prompts/execution_prompt.txt similarity index 100% rename from prompts/execution_prompt.txt rename to archive/prompts/execution_prompt.txt diff --git a/prompts/retrieval_prompt.txt b/archive/prompts/retrieval_prompt.txt similarity index 100% rename from prompts/retrieval_prompt.txt rename to archive/prompts/retrieval_prompt.txt diff --git a/prompts/router_prompt.txt b/archive/prompts/router_prompt.txt similarity index 100% rename from prompts/router_prompt.txt rename to archive/prompts/router_prompt.txt diff --git a/prompts/system_prompt.txt b/archive/prompts/system_prompt.txt similarity index 100% rename from prompts/system_prompt.txt rename to archive/prompts/system_prompt.txt diff --git a/prompts/verification_prompt.txt b/archive/prompts/verification_prompt.txt similarity index 100% rename from prompts/verification_prompt.txt rename to archive/prompts/verification_prompt.txt diff --git a/src/__init__.py b/archive/src/__init__.py similarity index 100% rename from src/__init__.py rename to archive/src/__init__.py diff --git a/src/__pycache__/__init__.cpython-313.pyc b/archive/src/__pycache__/__init__.cpython-313.pyc similarity index 100% rename from src/__pycache__/__init__.cpython-313.pyc rename to archive/src/__pycache__/__init__.cpython-313.pyc diff --git a/src/__pycache__/langgraph_system.cpython-313.pyc b/archive/src/__pycache__/langgraph_system.cpython-313.pyc similarity index 95% rename from src/__pycache__/langgraph_system.cpython-313.pyc rename to archive/src/__pycache__/langgraph_system.cpython-313.pyc index 1f241f5082eec6f15c40caa96ed5db9a607eebeb..840dca8e4fc7893fa4595b9884785d8236b47d3c 100644 Binary files a/src/__pycache__/langgraph_system.cpython-313.pyc and b/archive/src/__pycache__/langgraph_system.cpython-313.pyc differ diff --git a/src/__pycache__/memory.cpython-313.pyc b/archive/src/__pycache__/memory.cpython-313.pyc similarity index 76% rename from src/__pycache__/memory.cpython-313.pyc rename to archive/src/__pycache__/memory.cpython-313.pyc index bc273de748cc90ee322eadcc19bb0b69e311a081..e5517f5409bde446e98e50e9e4e11d7993b5906a 100644 Binary files a/src/__pycache__/memory.cpython-313.pyc and b/archive/src/__pycache__/memory.cpython-313.pyc differ diff --git a/src/__pycache__/tracing.cpython-313.pyc b/archive/src/__pycache__/tracing.cpython-313.pyc similarity index 95% rename from src/__pycache__/tracing.cpython-313.pyc rename to archive/src/__pycache__/tracing.cpython-313.pyc index 9addbd912cb505fde5d26fb33b5b7a4b1528c81f..3b1a13a92d0ea4ab9d8a791e84cadf10b12b682a 100644 Binary files a/src/__pycache__/tracing.cpython-313.pyc and b/archive/src/__pycache__/tracing.cpython-313.pyc differ diff --git a/src/agents/__init__.py b/archive/src/agents/__init__.py similarity index 100% rename from src/agents/__init__.py rename to archive/src/agents/__init__.py diff --git a/src/agents/__pycache__/__init__.cpython-313.pyc b/archive/src/agents/__pycache__/__init__.cpython-313.pyc similarity index 90% rename from src/agents/__pycache__/__init__.cpython-313.pyc rename to archive/src/agents/__pycache__/__init__.cpython-313.pyc index f1d4bda5545d3c51ed12a62b686cb588b9102c20..4031683bc75285a43b49796fcf02ed5e50a044d3 100644 Binary files a/src/agents/__pycache__/__init__.cpython-313.pyc and b/archive/src/agents/__pycache__/__init__.cpython-313.pyc differ diff --git a/src/agents/__pycache__/critic_agent.cpython-313.pyc b/archive/src/agents/__pycache__/critic_agent.cpython-313.pyc similarity index 98% rename from src/agents/__pycache__/critic_agent.cpython-313.pyc rename to archive/src/agents/__pycache__/critic_agent.cpython-313.pyc index 40e6e03db331ec226a0cc5aed11fcea6f59a1bb1..3dd221ff31233a0319daec9b2b87a1313063d95d 100644 Binary files a/src/agents/__pycache__/critic_agent.cpython-313.pyc and b/archive/src/agents/__pycache__/critic_agent.cpython-313.pyc differ diff --git a/src/agents/__pycache__/execution_agent.cpython-313.pyc b/archive/src/agents/__pycache__/execution_agent.cpython-313.pyc similarity index 99% rename from src/agents/__pycache__/execution_agent.cpython-313.pyc rename to archive/src/agents/__pycache__/execution_agent.cpython-313.pyc index bb2b9d484fa8b82bb7752d044d9f803542ef357c..448a6664cf7ba51288347e1744a4aea53e3d201b 100644 Binary files a/src/agents/__pycache__/execution_agent.cpython-313.pyc and b/archive/src/agents/__pycache__/execution_agent.cpython-313.pyc differ diff --git a/src/agents/__pycache__/plan_node.cpython-313.pyc b/archive/src/agents/__pycache__/plan_node.cpython-313.pyc similarity index 97% rename from src/agents/__pycache__/plan_node.cpython-313.pyc rename to archive/src/agents/__pycache__/plan_node.cpython-313.pyc index 067cdc0e85137f13062fbdae4abbe16cae8ad29d..9b4174f75d618a8dd697ea0a9904c79509875b1e 100644 Binary files a/src/agents/__pycache__/plan_node.cpython-313.pyc and b/archive/src/agents/__pycache__/plan_node.cpython-313.pyc differ diff --git a/src/agents/__pycache__/retrieval_agent.cpython-313.pyc b/archive/src/agents/__pycache__/retrieval_agent.cpython-313.pyc similarity index 99% rename from src/agents/__pycache__/retrieval_agent.cpython-313.pyc rename to archive/src/agents/__pycache__/retrieval_agent.cpython-313.pyc index 1e0e51bf31b553b89b98fbb87f573ddda71ce531..eb11a317b96d5f771b8ce3d373d77272e8fb1238 100644 Binary files a/src/agents/__pycache__/retrieval_agent.cpython-313.pyc and b/archive/src/agents/__pycache__/retrieval_agent.cpython-313.pyc differ diff --git a/src/agents/__pycache__/router_node.cpython-313.pyc b/archive/src/agents/__pycache__/router_node.cpython-313.pyc similarity index 98% rename from src/agents/__pycache__/router_node.cpython-313.pyc rename to archive/src/agents/__pycache__/router_node.cpython-313.pyc index dd9de300ae7b1ea42f0b18d90b47367048919b8c..5e8c9e5937dd11da2f865e8847072cabed61314b 100644 Binary files a/src/agents/__pycache__/router_node.cpython-313.pyc and b/archive/src/agents/__pycache__/router_node.cpython-313.pyc differ diff --git a/src/agents/__pycache__/verification_node.cpython-313.pyc b/archive/src/agents/__pycache__/verification_node.cpython-313.pyc similarity index 70% rename from src/agents/__pycache__/verification_node.cpython-313.pyc rename to archive/src/agents/__pycache__/verification_node.cpython-313.pyc index f51406895f253bac7488423cb266b9db202359db..65e486e8f9885b6472ca64200811f3af102e3b09 100644 Binary files a/src/agents/__pycache__/verification_node.cpython-313.pyc and b/archive/src/agents/__pycache__/verification_node.cpython-313.pyc differ diff --git a/src/agents/critic_agent.py b/archive/src/agents/critic_agent.py similarity index 100% rename from src/agents/critic_agent.py rename to archive/src/agents/critic_agent.py diff --git a/src/agents/execution_agent.py b/archive/src/agents/execution_agent.py similarity index 100% rename from src/agents/execution_agent.py rename to archive/src/agents/execution_agent.py diff --git a/src/agents/plan_node.py b/archive/src/agents/plan_node.py similarity index 100% rename from src/agents/plan_node.py rename to archive/src/agents/plan_node.py diff --git a/src/agents/retrieval_agent.py b/archive/src/agents/retrieval_agent.py similarity index 100% rename from src/agents/retrieval_agent.py rename to archive/src/agents/retrieval_agent.py diff --git a/src/agents/router_node.py b/archive/src/agents/router_node.py similarity index 100% rename from src/agents/router_node.py rename to archive/src/agents/router_node.py diff --git a/src/agents/verification_node.py b/archive/src/agents/verification_node.py similarity index 100% rename from src/agents/verification_node.py rename to archive/src/agents/verification_node.py diff --git a/src/langgraph_system.py b/archive/src/langgraph_system.py similarity index 100% rename from src/langgraph_system.py rename to archive/src/langgraph_system.py diff --git a/src/memory.py b/archive/src/memory.py similarity index 100% rename from src/memory.py rename to archive/src/memory.py diff --git a/src/tracing.py b/archive/src/tracing.py similarity index 100% rename from src/tracing.py rename to archive/src/tracing.py diff --git a/langgraph_agent_system.py b/langgraph_agent_system.py new file mode 100644 index 0000000000000000000000000000000000000000..b8536ed860077c530a388df0289bbd2670c4381d --- /dev/null +++ b/langgraph_agent_system.py @@ -0,0 +1,197 @@ +""" +LangGraph Multi-Agent System Implementation + +This module implements a multi-agent system using LangGraph with the following components: +- LeadAgent: Orchestrates the workflow and makes decisions +- ResearchAgent: Handles information gathering and research tasks +- CodeAgent: Handles computational and code execution tasks +- AnswerFormatter: Formats final answers according to GAIA requirements +- Memory: Persistent storage for context and learning +""" + +import os +from typing import Dict, Any, TypedDict, Literal, Annotated, List +from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage +from langgraph.graph import StateGraph, START, END +from langgraph.types import Command +import operator +from dotenv import load_dotenv + +# Import our observability module +from observability import ( + start_root_span, + get_callback_handler, + flush_traces, + shutdown_observability +) + +# Load environment variables +load_dotenv("env.local") + +class AgentState(TypedDict): + """ + State schema for the multi-agent system following LangGraph best practices. + Treats every agent node as a pure function AgentState → Command. + """ + # Core conversation messages + messages: Annotated[List[BaseMessage], operator.add] + + # Working draft and evidence + draft_answer: str + research_notes: Annotated[str, operator.add] # Use add for accumulation + code_outputs: Annotated[str, operator.add] # Use add for accumulation + + # Loop control + loop_counter: int + max_iterations: int + + # Routing decisions + next: Literal["research", "code", "formatter", "__end__"] + + # Final formatted answer + final_answer: str + + # Metadata for tracing + user_id: str + session_id: str + + +# Removed setup_tracing function - now handled by observability module + + +def create_agent_graph(): + """ + Create the LangGraph workflow following the specified architecture: + lead -> research -> code -> lead (loop) -> formatter -> END + """ + from agents.lead_agent import lead_agent + from agents.research_agent import research_agent + from agents.code_agent import code_agent + from agents.answer_formatter import answer_formatter + + # Create the state graph + workflow = StateGraph(AgentState) + + # Add nodes + workflow.add_node("lead", lead_agent) + workflow.add_node("research", research_agent) + workflow.add_node("code", code_agent) + workflow.add_node("formatter", answer_formatter) + + # Add edges + workflow.add_edge(START, "lead") + + # Conditional edges from lead agent based on routing decisions + def route_from_lead(state: AgentState) -> str: + """Route from lead agent based on the 'next' field""" + # Check for termination conditions + if (state.get("loop_counter", 0) >= state.get("max_iterations", 3) or + state.get("final_answer")): + return "__end__" + return state.get("next", "research") + + workflow.add_conditional_edges( + "lead", + route_from_lead, + { + "research": "research", + "code": "code", + "formatter": "formatter", + "__end__": END + } + ) + + # Both research and code agents return to lead agent for next decision + workflow.add_edge("research", "lead") + workflow.add_edge("code", "lead") + workflow.add_edge("formatter", END) + + return workflow + + +async def run_agent_system( + query: str, + user_id: str = "default_user", + session_id: str = "default_session", + max_iterations: int = 3 +) -> str: + """ + Main entry point for the agent system. + + Args: + query: User question to answer + user_id: User identifier for tracing + session_id: Session identifier for tracing + max_iterations: Maximum number of research/code loops + + Returns: + Final formatted answer + """ + try: + # Get the global callback handler + callback_handler = get_callback_handler() + + # Create root span for the entire request + with start_root_span( + name="user-request", + user_id=user_id, + session_id=session_id, + metadata={"query": query, "max_iterations": max_iterations} + ) as root_span: + + # Create the workflow + workflow = create_agent_graph() + app = workflow.compile() + + # Initial state + initial_state: AgentState = { + "messages": [HumanMessage(content=query)], + "draft_answer": "", + "research_notes": "", + "code_outputs": "", + "loop_counter": 0, + "max_iterations": max_iterations, + "next": "research", # Start with research + "final_answer": "", + "user_id": user_id, + "session_id": session_id + } + + # Run the workflow with callback handler if available + if callback_handler: + final_state = await app.ainvoke( + initial_state, + config={"callbacks": [callback_handler]} + ) + else: + print("Warning: Running without Langfuse tracing") + final_state = await app.ainvoke(initial_state) + + # Update trace with output if span exists + if root_span: + root_span.update_trace(output={"final_answer": final_state["final_answer"]}) + + return final_state["final_answer"] + + except Exception as e: + print(f"Error in agent system: {e}") + return f"I apologize, but I encountered an error while processing your query: {str(e)}" + + finally: + # Flush traces in background + flush_traces(background=True) + + +if __name__ == "__main__": + import asyncio + + # Test the system + async def test(): + result = await run_agent_system( + "What is the capital of Maharashtra?", + user_id="test_user", + session_id="test_session" + ) + print(f"Final Answer: {result}") + + asyncio.run(test()) \ No newline at end of file diff --git a/memory_system.py b/memory_system.py new file mode 100644 index 0000000000000000000000000000000000000000..d15a862b279acce30c881fc3350aa2d5cfa907f9 --- /dev/null +++ b/memory_system.py @@ -0,0 +1,157 @@ +""" +Memory System for LangGraph Multi-Agent System + +Simplified memory management system that provides: +1. Vector store integration for long-term memory +2. Session-based caching for recent queries +3. Q&A pair storage for learning from interactions +4. Similar question retrieval for context +""" + +import os +import time +import hashlib +from typing import Optional, List, Dict, Any, Tuple +from dotenv import load_dotenv + +load_dotenv("env.local") + + +class MemoryManager: + """Manages memory for the multi-agent system""" + + def __init__(self): + self.vector_store = None + self.embeddings = None + + # In-memory caches + self.query_cache: Dict[str, Tuple[float, List]] = {} + self.processed_tasks: set[str] = set() + self.seen_hashes: set[str] = set() + + # TTL and similarity settings + self.ttl = 300 # 5 minutes cache + self.similarity_threshold = 0.85 + + self._initialize_vector_store() + + def _initialize_vector_store(self) -> None: + """Initialize vector store if credentials are available""" + try: + supabase_url = os.environ.get("SUPABASE_URL") + supabase_key = os.environ.get("SUPABASE_SERVICE_KEY") + + if not supabase_url or not supabase_key: + print("⚠️ Vector store disabled: Supabase credentials not found") + return + + # Try to import and initialize Supabase vector store + from langchain_community.vectorstores import SupabaseVectorStore + from langchain_huggingface import HuggingFaceEmbeddings + from supabase.client import Client, create_client + + self.embeddings = HuggingFaceEmbeddings( + model_name="sentence-transformers/all-mpnet-base-v2" + ) + + supabase: Client = create_client(supabase_url, supabase_key) + self.vector_store = SupabaseVectorStore( + client=supabase, + embedding=self.embeddings, + table_name="documents", + query_name="match_documents_langchain", + ) + print("✅ Vector store initialized successfully") + + except Exception as e: + print(f"⚠️ Vector store initialization failed: {e}") + + def similarity_search(self, query: str, k: int = 2) -> List[Any]: + """Search for similar questions with caching""" + if not self.vector_store: + return [] + + # Check cache first + q_hash = hashlib.sha256(query.encode()).hexdigest() + now = time.time() + + if q_hash in self.query_cache and now - self.query_cache[q_hash][0] < self.ttl: + print("💾 Memory cache hit") + return self.query_cache[q_hash][1] + + try: + print("🔍 Searching vector store for similar questions...") + similar_questions = self.vector_store.similarity_search_with_relevance_scores(query, k=k) + self.query_cache[q_hash] = (now, similar_questions) + return similar_questions + except Exception as e: + print(f"⚠️ Vector store search error: {e}") + return [] + + def should_ingest(self, query: str) -> bool: + """Determine if this Q&A should be stored in long-term memory""" + if not self.vector_store: + return False + + similar_questions = self.similarity_search(query, k=1) + top_score = similar_questions[0][1] if similar_questions else 0.0 + return top_score < self.similarity_threshold + + def ingest_qa_pair(self, question: str, answer: str, attachments: str = "") -> None: + """Store Q&A pair in long-term memory""" + if not self.vector_store: + print("⚠️ Vector store not available for storage") + return + + try: + payload = f"Question:\n{question}\n\nAnswer:\n{answer}" + if attachments: + payload += f"\n\nContext:\n{attachments}" + + hash_id = hashlib.sha256(payload.encode()).hexdigest() + if hash_id in self.seen_hashes: + print("⚠️ Duplicate Q&A pair - skipping storage") + return + + self.seen_hashes.add(hash_id) + self.vector_store.add_texts( + [payload], + metadatas=[{"hash_id": hash_id, "timestamp": str(time.time())}], + ids=[hash_id] # Use our hash as the ID to avoid UUID bigint issues + ) + print("✅ Stored Q&A pair in long-term memory") + except Exception as e: + print(f"⚠️ Error storing Q&A pair: {e}") + + def get_similar_qa(self, query: str) -> Optional[str]: + """Get similar Q&A for context""" + similar_questions = self.similarity_search(query, k=1) + if not similar_questions: + return None + + # Extract content from the search result + if isinstance(similar_questions[0], tuple): + doc, score = similar_questions[0] + if score > self.similarity_threshold: + return doc.page_content if hasattr(doc, 'page_content') else str(doc) + + return None + + def add_processed_task(self, task_id: str) -> None: + """Mark a task as processed""" + self.processed_tasks.add(task_id) + + def is_task_processed(self, task_id: str) -> bool: + """Check if a task has been processed""" + return task_id in self.processed_tasks + + def clear_session_cache(self) -> None: + """Clear session-specific caches""" + self.query_cache.clear() + self.processed_tasks.clear() + self.seen_hashes.clear() + print("🗑️ Session cache cleared") + + +# Global memory manager instance +memory_manager = MemoryManager() \ No newline at end of file diff --git a/observability.py b/observability.py new file mode 100644 index 0000000000000000000000000000000000000000..ffcb08762e402a296cfda32f64bd43c757d820ea --- /dev/null +++ b/observability.py @@ -0,0 +1,207 @@ +""" +Observability module for Langfuse v3 integration with OpenTelemetry support. + +This module provides: +- Single global CallbackHandler for LangChain integration +- Root span management for user requests +- Session and user tracking +- Background flushing for async operations +""" + +import os +import base64 +from typing import Optional, Dict, Any +from contextlib import contextmanager +from dotenv import load_dotenv + +# Langfuse v3 imports +from langfuse import get_client +from langfuse.langchain import CallbackHandler + +# OpenTelemetry imports for v3 compatibility +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +# Load environment variables +load_dotenv("env.local") + +# Global callback handler instance (singleton) +_langfuse_handler: Optional[CallbackHandler] = None +_tracer_provider: Optional[TracerProvider] = None + +def initialize_observability() -> bool: + """ + Initialize Langfuse observability with OTEL integration. + + Returns: + bool: True if initialization successful, False otherwise + """ + global _langfuse_handler, _tracer_provider + + try: + # Check required environment variables + required_vars = ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"] + missing_vars = [var for var in required_vars if not os.getenv(var)] + + if missing_vars: + print(f"Warning: Missing required environment variables: {missing_vars}") + return False + + # Setup OTEL integration for Langfuse v3 + langfuse_auth = base64.b64encode( + f"{os.getenv('LANGFUSE_PUBLIC_KEY')}:{os.getenv('LANGFUSE_SECRET_KEY')}".encode() + ).decode() + + # Configure OTEL environment + os.environ['OTEL_EXPORTER_OTLP_ENDPOINT'] = f"{os.getenv('LANGFUSE_HOST')}/api/public/otel" + os.environ['OTEL_EXPORTER_OTLP_HEADERS'] = f"Authorization=Basic {langfuse_auth}" + + # Setup OpenTelemetry tracer provider + _tracer_provider = TracerProvider() + _tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) + trace.set_tracer_provider(_tracer_provider) + + # Create single global callback handler + _langfuse_handler = CallbackHandler() + + print("✅ Langfuse observability initialized successfully") + return True + + except Exception as e: + print(f"❌ Failed to initialize observability: {e}") + return False + +def get_callback_handler() -> Optional[CallbackHandler]: + """ + Get the global Langfuse callback handler. + + Returns: + CallbackHandler or None if not initialized + """ + global _langfuse_handler + + if _langfuse_handler is None: + if initialize_observability(): + return _langfuse_handler + return None + + return _langfuse_handler + +@contextmanager +def start_root_span( + name: str, + user_id: str, + session_id: str, + metadata: Optional[Dict[str, Any]] = None +): + """ + Context manager for creating root spans with user and session tracking. + + Args: + name: Span name (e.g., "user-request") + user_id: User identifier for session tracking + session_id: Session identifier for conversation continuity + metadata: Optional additional metadata + + Yields: + Langfuse span context + """ + try: + # Create root span with v3 API + client = get_client() + with client.start_as_current_span(name=name) as span: + # Update trace with user and session information + span.update_trace( + user_id=user_id, + session_id=session_id, + tags=[ + os.getenv("ENV", "dev"), # Environment tag + "multi-agent-system" # System identifier + ] + ) + + # Add metadata if provided + if metadata: + span.update_trace(metadata=metadata) + + yield span + + except Exception as e: + print(f"Warning: Failed to create root span: {e}") + # Yield None so code doesn't break + yield None + +def flush_traces(background: bool = True) -> None: + """ + Flush pending traces to Langfuse. + + Args: + background: Whether to flush in background (non-blocking) + """ + try: + client = get_client() + client.flush() + except Exception as e: + print(f"Warning: Failed to flush traces: {e}") + +def shutdown_observability() -> None: + """ + Clean shutdown of observability components. + """ + global _tracer_provider + + try: + # Flush any remaining traces + flush_traces(background=False) + + # Shutdown tracer provider + if _tracer_provider: + _tracer_provider.shutdown() + + except Exception as e: + print(f"Warning: Error during observability shutdown: {e}") + +# Agent span helpers for consistent naming +@contextmanager +def agent_span(agent_name: str, metadata: Optional[Dict[str, Any]] = None): + """ + Context manager for agent-level spans. + + Args: + agent_name: Name of the agent (e.g., "lead", "research", "code") + metadata: Optional metadata for the span + """ + span_name = f"agent/{agent_name}" + + try: + client = get_client() + with client.start_as_current_span(name=span_name) as span: + if metadata: + span.update_trace(metadata=metadata) + yield span + except Exception as e: + print(f"Warning: Failed to create agent span for {agent_name}: {e}") + yield None + +@contextmanager +def tool_span(tool_name: str, metadata: Optional[Dict[str, Any]] = None): + """ + Context manager for tool-level spans. + + Args: + tool_name: Name of the tool (e.g., "tavily_search", "calculator") + metadata: Optional metadata for the span + """ + span_name = f"tool/{tool_name}" + + try: + client = get_client() + with client.start_as_current_span(name=span_name) as span: + if metadata: + span.update_trace(metadata=metadata) + yield span + except Exception as e: + print(f"Warning: Failed to create tool span for {tool_name}: {e}") + yield None \ No newline at end of file diff --git a/quick_random_agent_test.py b/quick_random_agent_test.py index 6f7d6a03b77d1e14569c518f829b369faa088c48..1db3926f0a38ce960e04583556c5d5e5e72a114e 100644 --- a/quick_random_agent_test.py +++ b/quick_random_agent_test.py @@ -2,6 +2,7 @@ import os import sys import tempfile import requests +import asyncio from dotenv import load_dotenv # Load environment variables @@ -10,21 +11,13 @@ load_dotenv() # Add the current directory to Python path sys.path.append(os.path.dirname(os.path.abspath(__file__))) -# Import the new agent system -from new_langraph_agent import run_agent, cleanup -from src.tracing import get_langfuse_callback_handler +# Import the latest agent system +from langgraph_agent_system import run_agent_system +from observability import flush_traces, shutdown_observability # Default API URL - Using the same URL as the original basic_agent.py DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" -# Initialize Langfuse CallbackHandler for LangGraph/Langchain (tracing) -try: - langfuse_handler = get_langfuse_callback_handler() - print("✅ Langfuse handler initialized successfully") -except Exception as e: - print(f"Warning: Could not initialize Langfuse handler: {e}") - langfuse_handler = None - def fetch_random_question(api_base: str = DEFAULT_API_URL): """Return JSON of /random-question.""" resp = requests.get(f"{api_base}/random-question", timeout=30) @@ -54,8 +47,8 @@ def maybe_download_file(task_id: str, api_base: str = DEFAULT_API_URL) -> str | return None -def main(): - print("Random Agent Test - New LangGraph Architecture") +async def main(): + print("Random Agent Test - Latest LangGraph Multi-Agent System") print("=" * 60) try: @@ -72,9 +65,13 @@ def main(): if attachment_path: question_text += f"\n\nAttachment available at: {attachment_path}" - # Run the new agent system - print("\n=== Running LangGraph Agent System ===") - result = run_agent(question_text) + # Run the latest agent system + print("\n=== Running Latest LangGraph Multi-Agent System ===") + result = await run_agent_system( + query=question_text, + user_id="test_user", + session_id=f"session_{task_id}" + ) print("\n=== Agent Answer ===") print(result) @@ -87,11 +84,12 @@ def main(): finally: # Cleanup try: - cleanup() + flush_traces(background=False) + shutdown_observability() print("\n✅ Agent cleanup completed") except Exception as e: print(f"⚠️ Cleanup warning: {e}") if __name__ == "__main__": - main() \ No newline at end of file + asyncio.run(main()) \ No newline at end of file diff --git a/quick_specific_agent_test.py b/quick_specific_agent_test.py index 2154cb197fe8f53988114339a0ca1a2e67d907f0..205f19a55b5468cc4e1503eca1abcf983d951114 100644 --- a/quick_specific_agent_test.py +++ b/quick_specific_agent_test.py @@ -2,6 +2,7 @@ import os import sys import tempfile import requests +import asyncio from dotenv import load_dotenv # Load environment variables @@ -10,22 +11,14 @@ load_dotenv() # Add the current directory to Python path sys.path.append(os.path.dirname(os.path.abspath(__file__))) -# Import the new agent system -from new_langraph_agent import run_agent, cleanup -from src.tracing import get_langfuse_callback_handler +# Import the latest agent system +from langgraph_agent_system import run_agent_system +from observability import flush_traces, shutdown_observability # Default API URL and Task ID - Using the same URL as the original basic_agent.py DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" DEFAULT_TASK_ID = "f918266a-b3e0-4914-865d-4faa564f1aef" -# Initialize Langfuse CallbackHandler for LangGraph/Langchain (tracing) -try: - langfuse_handler = get_langfuse_callback_handler() - print("✅ Langfuse handler initialized successfully") -except Exception as e: - print(f"Warning: Could not initialize Langfuse handler: {e}") - langfuse_handler = None - def fetch_question_by_id(task_id: str, api_base: str = DEFAULT_API_URL): """Return JSON of a question for a given task_id. @@ -69,8 +62,8 @@ def maybe_download_file(task_id: str, api_base: str = DEFAULT_API_URL) -> str | return None -def main(): - print("Specific Agent Test - New LangGraph Architecture") +async def main(): + print("Specific Agent Test - Latest LangGraph Multi-Agent System") print("=" * 60) try: @@ -93,14 +86,14 @@ def main(): if attachment_path: question_text += f"\n\nAttachment available at: {attachment_path}" - # Run the new agent system - print("\n=== Running LangGraph Agent System ===") - - # Set environment variables for user/session tracking - os.environ["USER_ID"] = "test_user" - os.environ["SESSION_ID"] = f"session_{task_id}" + # Run the latest agent system + print("\n=== Running Latest LangGraph Multi-Agent System ===") - result = run_agent(question_text) + result = await run_agent_system( + query=question_text, + user_id="test_user", + session_id=f"session_{task_id}" + ) print("\n=== Agent Answer ===") print(result) @@ -113,11 +106,12 @@ def main(): finally: # Cleanup try: - cleanup() + flush_traces(background=False) + shutdown_observability() print("\n✅ Agent cleanup completed") except Exception as e: print(f"⚠️ Cleanup warning: {e}") if __name__ == "__main__": - main() \ No newline at end of file + asyncio.run(main()) \ No newline at end of file diff --git a/test_new_multi_agent_system.py b/test_new_multi_agent_system.py new file mode 100644 index 0000000000000000000000000000000000000000..f57027d6b5c070fd1e64744e4037428387f8a495 --- /dev/null +++ b/test_new_multi_agent_system.py @@ -0,0 +1,135 @@ +""" +Test script for the new LangGraph Multi-Agent System + +This script tests the complete workflow: +- Lead Agent orchestration +- Research Agent information gathering +- Code Agent computational tasks +- Answer Formatter GAIA compliance +- Memory system integration +""" + +import asyncio +import os +from dotenv import load_dotenv +from langgraph_agent_system import run_agent_system + +# Load environment variables +load_dotenv("env.local") + + +async def test_simple_factual_question(): + """Test with a simple factual question that should primarily use research""" + print("🧪 Testing simple factual question...") + + query = "What is the capital of Maharashtra?" + result = await run_agent_system( + query=query, + user_id="test_user_1", + session_id="test_session_1" + ) + + print(f"Query: {query}") + print(f"Result: {result}") + print("-" * 50) + return result + + +async def test_computational_question(): + """Test with a computational question that should use both research and code""" + print("🧪 Testing computational question...") + + query = "What is 25 + 17 * 3?" + result = await run_agent_system( + query=query, + user_id="test_user_2", + session_id="test_session_2" + ) + + print(f"Query: {query}") + print(f"Result: {result}") + print("-" * 50) + return result + + +async def test_complex_question(): + """Test with a complex question requiring both research and computation""" + print("🧪 Testing complex question...") + + query = "How many seconds are there in a week? Show the calculation." + result = await run_agent_system( + query=query, + user_id="test_user_3", + session_id="test_session_3" + ) + + print(f"Query: {query}") + print(f"Result: {result}") + print("-" * 50) + return result + + +async def test_list_question(): + """Test with a question that should return a list""" + print("🧪 Testing list question...") + + query = "What are the first 3 prime numbers?" + result = await run_agent_system( + query=query, + user_id="test_user_4", + session_id="test_session_4" + ) + + print(f"Query: {query}") + print(f"Result: {result}") + print("-" * 50) + return result + + +async def run_all_tests(): + """Run all test cases""" + print("🚀 Starting Multi-Agent System Tests") + print("=" * 60) + + tests = [ + test_simple_factual_question, + test_computational_question, + test_complex_question, + test_list_question + ] + + results = [] + for test_func in tests: + try: + result = await test_func() + results.append(("PASS", test_func.__name__, result)) + except Exception as e: + print(f"❌ {test_func.__name__} failed: {e}") + results.append(("FAIL", test_func.__name__, str(e))) + + # Summary + print("=" * 60) + print("🏁 Test Results Summary:") + for status, test_name, result in results: + status_emoji = "✅" if status == "PASS" else "❌" + print(f"{status_emoji} {test_name}: {status}") + if status == "PASS": + print(f" Result: {result[:100]}...") + + passed = sum(1 for status, _, _ in results if status == "PASS") + total = len(results) + print(f"\n📊 Tests passed: {passed}/{total}") + + +if __name__ == "__main__": + # Check environment setup + required_env_vars = ["GROQ_API_KEY", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST"] + missing_vars = [var for var in required_env_vars if not os.getenv(var)] + + if missing_vars: + print(f"❌ Missing required environment variables: {missing_vars}") + print("Please set up your environment variables in env.local") + exit(1) + + # Run tests + asyncio.run(run_all_tests()) \ No newline at end of file diff --git a/test_observability.py b/test_observability.py new file mode 100644 index 0000000000000000000000000000000000000000..e5d6b3a04e5c5adaf5133868a0aed1d4474b4e18 --- /dev/null +++ b/test_observability.py @@ -0,0 +1,191 @@ +""" +Test script for Langfuse v3 observability integration. + +This script tests the observability module and ensures that: +1. Observability can be initialized properly +2. Root spans are created correctly +3. Agent spans are nested properly +4. Tool spans work as expected +""" + +import asyncio +import os +from dotenv import load_dotenv + +# Load environment variables +load_dotenv("env.local") + +def test_observability_initialization(): + """Test that observability can be initialized""" + print("🧪 Testing observability initialization...") + + from observability import initialize_observability, get_callback_handler + + # Test initialization + success = initialize_observability() + if success: + print("✅ Observability initialized successfully") + else: + print("❌ Observability initialization failed") + return False + + # Test callback handler + handler = get_callback_handler() + if handler: + print("✅ Callback handler obtained successfully") + else: + print("❌ Failed to get callback handler") + return False + + return True + +def test_span_creation(): + """Test that spans can be created properly""" + print("\n🧪 Testing span creation...") + + from observability import start_root_span, agent_span, tool_span + + try: + # Test root span + with start_root_span( + name="test-request", + user_id="test_user", + session_id="test_session", + metadata={"test": True} + ) as root_span: + print("✅ Root span created successfully") + + # Test agent span + with agent_span("test_agent", metadata={"agent_type": "test"}) as agent_span_ctx: + print("✅ Agent span created successfully") + + # Test tool span + with tool_span("test_tool", metadata={"tool_type": "test"}) as tool_span_ctx: + print("✅ Tool span created successfully") + + print("✅ All spans created and closed successfully") + return True + + except Exception as e: + print(f"❌ Span creation failed: {e}") + return False + +async def test_agent_system_integration(): + """Test the full agent system with observability""" + print("\n🧪 Testing agent system integration...") + + try: + from langgraph_agent_system import run_agent_system + + # Test a simple question + result = await run_agent_system( + query="What is 2 + 2?", + user_id="test_user_integration", + session_id="test_session_integration", + max_iterations=1 # Keep it short for testing + ) + + if result and isinstance(result, str): + print(f"✅ Agent system ran successfully") + print(f"📝 Result: {result[:100]}...") + return True + else: + print("❌ Agent system returned invalid result") + return False + + except Exception as e: + print(f"❌ Agent system integration test failed: {e}") + return False + +def test_flush_and_cleanup(): + """Test flushing and cleanup functions""" + print("\n🧪 Testing flush and cleanup...") + + try: + from observability import flush_traces, shutdown_observability + + # Test flush + flush_traces(background=False) + print("✅ Traces flushed successfully") + + # Test shutdown + shutdown_observability() + print("✅ Observability shutdown successfully") + + return True + + except Exception as e: + print(f"❌ Flush and cleanup test failed: {e}") + return False + +async def main(): + """Run all tests""" + print("🚀 Starting Langfuse v3 observability tests...\n") + + tests = [ + ("Observability Initialization", test_observability_initialization), + ("Span Creation", test_span_creation), + ("Agent System Integration", test_agent_system_integration), + ("Flush and Cleanup", test_flush_and_cleanup) + ] + + results = [] + + for test_name, test_func in tests: + print(f"\n{'='*50}") + print(f"Running: {test_name}") + print(f"{'='*50}") + + try: + if asyncio.iscoroutinefunction(test_func): + result = await test_func() + else: + result = test_func() + results.append((test_name, result)) + except Exception as e: + print(f"❌ Test {test_name} failed with exception: {e}") + results.append((test_name, False)) + + # Summary + print(f"\n{'='*50}") + print("TEST RESULTS SUMMARY") + print(f"{'='*50}") + + passed = 0 + total = len(results) + + for test_name, result in results: + status = "✅ PASSED" if result else "❌ FAILED" + print(f"{test_name}: {status}") + if result: + passed += 1 + + print(f"\nOverall: {passed}/{total} tests passed") + + if passed == total: + print("🎉 All tests passed! Langfuse v3 observability is working correctly.") + else: + print("⚠️ Some tests failed. Check the output above for details.") + + # Check environment variables + print(f"\n{'='*50}") + print("ENVIRONMENT CHECK") + print(f"{'='*50}") + + required_env_vars = [ + "LANGFUSE_PUBLIC_KEY", + "LANGFUSE_SECRET_KEY", + "LANGFUSE_HOST" + ] + + for var in required_env_vars: + value = os.getenv(var) + if value: + print(f"✅ {var}: {'*' * min(len(value), 10)}...") + else: + print(f"❌ {var}: Not set") + + print(f"\n🔗 If tests passed, check your Langfuse dashboard at: {os.getenv('LANGFUSE_HOST', 'https://cloud.langfuse.com')}") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tools.py b/tools.py index 6576dcb8321086491abd52bef02a984c8a293e13..4e1955638bd54f8e132a4b89b4c075947888b2ed 100644 --- a/tools.py +++ b/tools.py @@ -5,6 +5,7 @@ from llama_index.readers.wikipedia import WikipediaReader from llama_index.readers.papers import ArxivReader from llama_index.core.tools.ondemand_loader_tool import OnDemandLoaderTool from huggingface_hub import list_models +from observability import tool_span # Math functions @@ -34,15 +35,28 @@ def modulus(a: int, b: int) -> int: def get_hub_stats(author: str) -> str: """Fetches the most downloaded model from a specific author on the Hugging Face Hub.""" - try: - models = list(list_models(author=author, sort="downloads", direction=-1, limit=1)) - if models: - model = models[0] - return f"The most downloaded model by {author} is {model.id} with {model.downloads:,} downloads." - else: - return f"No models found for author {author}." - except Exception as e: - return f"Error fetching models for {author}: {str(e)}" + with tool_span("huggingface_hub_stats", metadata={"author": author}) as span: + try: + models = list(list_models(author=author, sort="downloads", direction=-1, limit=1)) + if models: + model = models[0] + result = f"The most downloaded model by {author} is {model.id} with {model.downloads:,} downloads." + if span: + span.set_attribute("model_found", True) + span.set_attribute("model_id", model.id) + span.set_attribute("downloads", model.downloads) + return result + else: + result = f"No models found for author {author}." + if span: + span.set_attribute("model_found", False) + return result + except Exception as e: + error_result = f"Error fetching models for {author}: {str(e)}" + if span: + span.record_exception(e) + span.set_attribute("error", True) + return error_result # Tool initializations (as functions to be called in agent)