|
|
""" |
|
|
Subagent System Test |
|
|
|
|
|
Tests the specialized subagent orchestration system with tool filtering and routing. |
|
|
|
|
|
Usage: |
|
|
# Test subagent system (uses .env NUTHATCH_USE_STDIO setting) |
|
|
python tests/test_subagents.py |
|
|
|
|
|
# Test individual specialists |
|
|
python tests/test_subagents.py --specialist image_identifier |
|
|
python tests/test_subagents.py --specialist species_explorer |
|
|
python tests/test_subagents.py --specialist taxonomy_specialist |
|
|
|
|
|
Configuration (.env): |
|
|
# Ensure STDIO mode for testing |
|
|
NUTHATCH_USE_STDIO=true |
|
|
NUTHATCH_API_KEY=<your-key> |
|
|
|
|
|
# Required for Modal classifier |
|
|
MODAL_MCP_URL=<your-modal-url> |
|
|
BIRD_CLASSIFIER_API_KEY=<your-modal-key> |
|
|
|
|
|
# Required for LLM |
|
|
OPENAI_API_KEY=<your-key> |
|
|
""" |
|
|
import asyncio |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
parent_dir = Path(__file__).parent.parent |
|
|
sys.path.insert(0, str(parent_dir)) |
|
|
|
|
|
from langgraph_agent import AgentFactory |
|
|
from langgraph_agent.config import AgentConfig |
|
|
from langgraph_agent.subagent_config import SubAgentConfig |
|
|
from langgraph_agent.subagent_factory import SubAgentFactory |
|
|
from langgraph_agent.mcp_clients import MCPClientManager |
|
|
from langchain_openai import ChatOpenAI |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
def validate_config(): |
|
|
"""Validate required configuration.""" |
|
|
errors = [] |
|
|
|
|
|
if not AgentConfig.MODAL_MCP_URL: |
|
|
errors.append("MODAL_MCP_URL not set") |
|
|
if not AgentConfig.BIRD_CLASSIFIER_API_KEY: |
|
|
errors.append("BIRD_CLASSIFIER_API_KEY not set") |
|
|
if not AgentConfig.NUTHATCH_API_KEY: |
|
|
errors.append("NUTHATCH_API_KEY not set") |
|
|
if not AgentConfig.OPENAI_API_KEY: |
|
|
errors.append("OPENAI_API_KEY not set") |
|
|
|
|
|
if not AgentConfig.NUTHATCH_USE_STDIO: |
|
|
print("\n⚠️ WARNING: NUTHATCH_USE_STDIO is False") |
|
|
print(" For this test, STDIO mode is recommended") |
|
|
print(" Set NUTHATCH_USE_STDIO=true in .env\n") |
|
|
|
|
|
if errors: |
|
|
print("\n❌ Configuration Errors:") |
|
|
for error in errors: |
|
|
print(f" • {error}") |
|
|
print("\n💡 Check your .env file") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
async def test_tool_filtering(): |
|
|
"""Test that each subagent gets the correct filtered tool set.""" |
|
|
print("\n" + "="*70) |
|
|
print("TEST 1: Tool Filtering") |
|
|
print("="*70) |
|
|
|
|
|
|
|
|
client = await MCPClientManager.create_multi_server_client() |
|
|
all_tools = await MCPClientManager.get_tools(client) |
|
|
|
|
|
print(f"\n[ALL TOOLS]: {len(all_tools)} total tools available") |
|
|
for tool in all_tools: |
|
|
print(f" • {tool.name}") |
|
|
|
|
|
|
|
|
definitions = SubAgentConfig.get_subagent_definitions() |
|
|
|
|
|
for subagent_name, config in definitions.items(): |
|
|
print(f"\n[{subagent_name.upper()}]:") |
|
|
print(f" Name: {config['name']}") |
|
|
print(f" Expected tools: {len(config['tools'])}") |
|
|
print(f" Tools: {', '.join(config['tools'])}") |
|
|
|
|
|
|
|
|
allowed_tool_names = set(config['tools']) |
|
|
filtered_tools = [t for t in all_tools if t.name in allowed_tool_names] |
|
|
|
|
|
print(f" ✅ Filtered to: {len(filtered_tools)} tools") |
|
|
|
|
|
if len(filtered_tools) != len(config['tools']): |
|
|
print(f" ⚠️ Warning: Expected {len(config['tools'])} but got {len(filtered_tools)}") |
|
|
|
|
|
|
|
|
async def test_individual_subagent(subagent_name: str): |
|
|
"""Test a specific subagent with a sample query.""" |
|
|
print("\n" + "="*70) |
|
|
print(f"TEST 2: Individual Subagent - {subagent_name}") |
|
|
print("="*70) |
|
|
|
|
|
|
|
|
definitions = SubAgentConfig.get_subagent_definitions() |
|
|
|
|
|
if subagent_name not in definitions: |
|
|
print(f"\n❌ Unknown subagent: {subagent_name}") |
|
|
print(f"Available: {', '.join(definitions.keys())}") |
|
|
return |
|
|
|
|
|
config = definitions[subagent_name] |
|
|
print(f"\n[CONFIG]:") |
|
|
print(f" Name: {config['name']}") |
|
|
print(f" Description: {config['description']}") |
|
|
print(f" Tools: {', '.join(config['tools'])}") |
|
|
|
|
|
|
|
|
llm = ChatOpenAI( |
|
|
model=AgentConfig.DEFAULT_OPENAI_MODEL, |
|
|
temperature=config['temperature'], |
|
|
streaming=True |
|
|
) |
|
|
|
|
|
|
|
|
client = await MCPClientManager.create_multi_server_client() |
|
|
all_tools = await MCPClientManager.get_tools(client) |
|
|
|
|
|
print(f"\n[CREATING SUBAGENT]...") |
|
|
subagent = await SubAgentFactory.create_subagent( |
|
|
subagent_name, all_tools, llm |
|
|
) |
|
|
print(f"✅ Subagent created successfully") |
|
|
|
|
|
|
|
|
test_queries = { |
|
|
"image_identifier": "What bird is in this image?", |
|
|
"species_explorer": "Tell me about Northern Cardinals", |
|
|
"taxonomy_specialist": "What birds are in the Cardinalidae family?" |
|
|
} |
|
|
|
|
|
query = test_queries.get(subagent_name, "Help me identify birds") |
|
|
|
|
|
print(f"\n[TEST QUERY]: {query}") |
|
|
print(f"[RESPONSE]:") |
|
|
print("-" * 70) |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
result = await subagent.ainvoke({ |
|
|
"messages": [{"role": "user", "content": query}] |
|
|
}) |
|
|
|
|
|
if result and "messages" in result: |
|
|
for msg in result["messages"]: |
|
|
if hasattr(msg, 'content'): |
|
|
print(msg.content) |
|
|
else: |
|
|
print(result) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Test query failed (expected for image_identifier without image): {e}") |
|
|
|
|
|
|
|
|
async def test_router(): |
|
|
"""Test the routing logic.""" |
|
|
print("\n" + "="*70) |
|
|
print("TEST 3: Router Logic") |
|
|
print("="*70) |
|
|
|
|
|
test_cases = [ |
|
|
("What bird is this?", "image_identifier"), |
|
|
("Identify this photo", "image_identifier"), |
|
|
("Tell me about cardinals", "species_explorer"), |
|
|
("Find birds with red feathers", "species_explorer"), |
|
|
("Show me audio of a robin", "species_explorer"), |
|
|
("What families exist?", "taxonomy_specialist"), |
|
|
("Show me endangered birds", "taxonomy_specialist"), |
|
|
] |
|
|
|
|
|
print("\n[ROUTING TESTS]:") |
|
|
print(f"Testing {len(test_cases)} queries...") |
|
|
|
|
|
for query, expected_route in test_cases: |
|
|
|
|
|
content = query.lower() |
|
|
|
|
|
if any(word in content for word in ["identify", "what bird", "classify", "image", "photo"]): |
|
|
route = "image_identifier" |
|
|
elif any(word in content for word in ["audio", "sound", "call", "song", "find", "search"]): |
|
|
route = "species_explorer" |
|
|
elif any(word in content for word in ["family", "families", "conservation", "endangered", "taxonomy"]): |
|
|
route = "taxonomy_specialist" |
|
|
else: |
|
|
route = "species_explorer" |
|
|
|
|
|
status = "✅" if route == expected_route else "❌" |
|
|
print(f"\n {status} Query: '{query}'") |
|
|
print(f" Expected: {expected_route}") |
|
|
print(f" Got: {route}") |
|
|
|
|
|
|
|
|
async def test_full_orchestrator(): |
|
|
"""Test the full subagent orchestrator.""" |
|
|
print("\n" + "="*70) |
|
|
print("TEST 4: Full Orchestrator") |
|
|
print("="*70) |
|
|
|
|
|
|
|
|
SubAgentConfig.USE_SUBAGENTS = True |
|
|
|
|
|
print(f"\n[CONFIG]:") |
|
|
print(f" Subagents enabled: {SubAgentConfig.USE_SUBAGENTS}") |
|
|
print(f" OpenAI model: {AgentConfig.DEFAULT_OPENAI_MODEL}") |
|
|
print(f" Temperature: {AgentConfig.OPENAI_TEMPERATURE}") |
|
|
|
|
|
print(f"\n[CREATING ORCHESTRATOR]...") |
|
|
try: |
|
|
orchestrator = await AgentFactory.create_subagent_orchestrator( |
|
|
model=AgentConfig.DEFAULT_OPENAI_MODEL, |
|
|
api_key=AgentConfig.OPENAI_API_KEY, |
|
|
provider="openai", |
|
|
mode="Specialized Subagents (3 Specialists)" |
|
|
) |
|
|
print(f"✅ Orchestrator created successfully") |
|
|
print(f" Type: {type(orchestrator)}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Orchestrator creation failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
async def run_all_tests(): |
|
|
"""Run all subagent tests.""" |
|
|
print("\n" + "="*70) |
|
|
print("SUBAGENT SYSTEM TEST SUITE") |
|
|
print("="*70) |
|
|
|
|
|
if not validate_config(): |
|
|
print("\n❌ Test suite aborted due to configuration errors") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
await test_tool_filtering() |
|
|
|
|
|
|
|
|
for subagent_name in ["image_identifier", "species_explorer", "taxonomy_specialist"]: |
|
|
await test_individual_subagent(subagent_name) |
|
|
|
|
|
|
|
|
await test_router() |
|
|
|
|
|
|
|
|
await test_full_orchestrator() |
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("✅ ALL TESTS COMPLETED") |
|
|
print("="*70) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\n❌ Test suite failed: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import sys |
|
|
|
|
|
if len(sys.argv) > 1: |
|
|
if sys.argv[1] == "--specialist" and len(sys.argv) > 2: |
|
|
|
|
|
specialist_name = sys.argv[2] |
|
|
asyncio.run(test_individual_subagent(specialist_name)) |
|
|
elif sys.argv[1] == "--router": |
|
|
|
|
|
asyncio.run(test_router()) |
|
|
elif sys.argv[1] == "--tools": |
|
|
|
|
|
asyncio.run(test_tool_filtering()) |
|
|
else: |
|
|
print("Usage:") |
|
|
print(" python tests/test_subagents.py # Run all tests") |
|
|
print(" python tests/test_subagents.py --specialist <name> # Test one specialist") |
|
|
print(" python tests/test_subagents.py --router # Test routing only") |
|
|
print(" python tests/test_subagents.py --tools # Test tool filtering") |
|
|
else: |
|
|
|
|
|
asyncio.run(run_all_tests()) |
|
|
|