Final_Assignment_Template / langgraph_agent_system.py
Humanlearning's picture
updated agent
f844f16
"""
LangGraph Multi-Agent System Implementation
This module implements a multi-agent system using LangGraph with the following components:
- LeadAgent: Orchestrates the workflow and makes decisions
- ResearchAgent: Handles information gathering and research tasks
- CodeAgent: Handles computational and code execution tasks
- AnswerFormatter: Formats final answers according to GAIA requirements
- Memory: Persistent storage for context and learning
"""
import os
from typing import Dict, Any, TypedDict, Literal, Annotated, List
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
import operator
from dotenv import load_dotenv
# Import our observability module
from observability import (
start_root_span,
get_callback_handler,
flush_traces,
shutdown_observability
)
# Load environment variables
load_dotenv("env.local")
class AgentState(TypedDict):
"""
State schema for the multi-agent system following LangGraph best practices.
Treats every agent node as a pure function AgentState → Command.
"""
# Core conversation messages
messages: Annotated[List[BaseMessage], operator.add]
# Working draft and evidence
draft_answer: str
research_notes: Annotated[str, operator.add] # Use add for accumulation
code_outputs: Annotated[str, operator.add] # Use add for accumulation
# Loop control
loop_counter: int
max_iterations: int
# Routing decisions
next: Literal["research", "code", "formatter", "__end__"]
# Final formatted answer
final_answer: str
# Metadata for tracing
user_id: str
session_id: str
# Removed setup_tracing function - now handled by observability module
def create_agent_graph():
"""
Create the LangGraph workflow following the specified architecture:
lead -> research -> code -> lead (loop) -> formatter -> END
"""
from agents.lead_agent import lead_agent
from agents.research_agent import research_agent
from agents.code_agent import code_agent
from agents.answer_formatter import answer_formatter
# Create the state graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("lead", lead_agent)
workflow.add_node("research", research_agent)
workflow.add_node("code", code_agent)
workflow.add_node("formatter", answer_formatter)
# Add edges
workflow.add_edge(START, "lead")
# Conditional edges from lead agent based on routing decisions
def route_from_lead(state: AgentState) -> str:
"""Route from lead agent based on the 'next' field"""
# Check for termination conditions
if (state.get("loop_counter", 0) >= state.get("max_iterations", 3) or
state.get("final_answer")):
return "__end__"
return state.get("next", "research")
workflow.add_conditional_edges(
"lead",
route_from_lead,
{
"research": "research",
"code": "code",
"formatter": "formatter",
"__end__": END
}
)
# Both research and code agents return to lead agent for next decision
workflow.add_edge("research", "lead")
workflow.add_edge("code", "lead")
workflow.add_edge("formatter", END)
return workflow
async def run_agent_system(
query: str,
user_id: str = "default_user",
session_id: str = "default_session",
max_iterations: int = 3
) -> str:
"""
Main entry point for the agent system.
Args:
query: User question to answer
user_id: User identifier for tracing
session_id: Session identifier for tracing
max_iterations: Maximum number of research/code loops
Returns:
Final formatted answer
"""
try:
# Get the global callback handler
callback_handler = get_callback_handler()
# Create root span for the entire request
with start_root_span(
name="user-request",
user_id=user_id,
session_id=session_id,
metadata={"query": query, "max_iterations": max_iterations}
) as root_span:
# Create the workflow
workflow = create_agent_graph()
app = workflow.compile()
# Initial state
initial_state: AgentState = {
"messages": [HumanMessage(content=query)],
"draft_answer": "",
"research_notes": "",
"code_outputs": "",
"loop_counter": 0,
"max_iterations": max_iterations,
"next": "research", # Start with research
"final_answer": "",
"user_id": user_id,
"session_id": session_id
}
# Run the workflow with callback handler if available
if callback_handler:
final_state = await app.ainvoke(
initial_state,
config={"callbacks": [callback_handler]}
)
else:
print("Warning: Running without Langfuse tracing")
final_state = await app.ainvoke(initial_state)
# Update trace with output if span exists
if root_span:
root_span.update_trace(output={"final_answer": final_state["final_answer"]})
return final_state["final_answer"]
except Exception as e:
print(f"Error in agent system: {e}")
return f"I apologize, but I encountered an error while processing your query: {str(e)}"
finally:
# Flush traces in background
flush_traces(background=True)
if __name__ == "__main__":
import asyncio
# Test the system
async def test():
result = await run_agent_system(
"What is the capital of Maharashtra?",
user_id="test_user",
session_id="test_session"
)
print(f"Final Answer: {result}")
asyncio.run(test())