|
|
"""Execution Agent - Handles code execution and computational tasks""" |
|
|
from typing import Dict, Any, List |
|
|
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage |
|
|
from langchain_core.tools import tool |
|
|
from langchain_groq import ChatGroq |
|
|
from code_agent import run_agent |
|
|
from src.tracing import get_langfuse_callback_handler |
|
|
|
|
|
|
|
|
@tool |
|
|
def run_python(input: str) -> str: |
|
|
"""Execute Python code in a restricted sandbox (code-interpreter). |
|
|
|
|
|
Pass **any** coding or file-manipulation task here and the agent will |
|
|
compute the answer by running Python. The entire standard library is NOT |
|
|
available; heavy networking is disabled. Suitable for: math, data-frames, |
|
|
small file parsing, algorithmic questions. |
|
|
""" |
|
|
return run_agent(input) |
|
|
|
|
|
|
|
|
def load_execution_prompt() -> str: |
|
|
"""Load the execution prompt from file""" |
|
|
try: |
|
|
with open("./prompts/execution_prompt.txt", "r", encoding="utf-8") as f: |
|
|
return f.read().strip() |
|
|
except FileNotFoundError: |
|
|
return """You are a specialized execution agent. Use the run_python tool to execute code and solve computational problems.""" |
|
|
|
|
|
|
|
|
def get_execution_tools() -> List: |
|
|
"""Get list of tools available to the execution agent""" |
|
|
return [run_python] |
|
|
|
|
|
|
|
|
def execute_tool_calls(tool_calls: list, tools: list) -> list: |
|
|
"""Execute tool calls and return results""" |
|
|
tool_messages = [] |
|
|
|
|
|
|
|
|
tool_map = {tool.name: tool for tool in tools} |
|
|
|
|
|
for tool_call in tool_calls: |
|
|
tool_name = tool_call['name'] |
|
|
tool_args = tool_call['args'] |
|
|
tool_call_id = tool_call['id'] |
|
|
|
|
|
if tool_name in tool_map: |
|
|
try: |
|
|
print(f"Execution Agent: Executing {tool_name} with args: {str(tool_args)[:200]}...") |
|
|
result = tool_map[tool_name].invoke(tool_args) |
|
|
tool_messages.append( |
|
|
ToolMessage( |
|
|
content=str(result), |
|
|
tool_call_id=tool_call_id |
|
|
) |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error executing {tool_name}: {e}") |
|
|
tool_messages.append( |
|
|
ToolMessage( |
|
|
content=f"Error executing {tool_name}: {e}", |
|
|
tool_call_id=tool_call_id |
|
|
) |
|
|
) |
|
|
else: |
|
|
tool_messages.append( |
|
|
ToolMessage( |
|
|
content=f"Unknown tool: {tool_name}", |
|
|
tool_call_id=tool_call_id |
|
|
) |
|
|
) |
|
|
|
|
|
return tool_messages |
|
|
|
|
|
|
|
|
def needs_code_execution(query: str) -> bool: |
|
|
"""Heuristic to determine if a query requires code execution""" |
|
|
code_indicators = [ |
|
|
"calculate", "compute", "algorithm", "fibonacci", "math", "data", |
|
|
"programming", "code", "function", "sort", "csv", "json", "pandas", |
|
|
"plot", "graph", "analyze", "process", "file", "manipulation" |
|
|
] |
|
|
query_lower = query.lower() |
|
|
return any(indicator in query_lower for indicator in code_indicators) |
|
|
|
|
|
|
|
|
def execution_agent(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Execution agent that handles computational and code execution tasks |
|
|
""" |
|
|
print("Execution Agent: Processing computational request") |
|
|
|
|
|
try: |
|
|
|
|
|
execution_prompt = load_execution_prompt() |
|
|
|
|
|
|
|
|
llm = ChatGroq(model="qwen-qwq-32b", temperature=0.1) |
|
|
tools = get_execution_tools() |
|
|
llm_with_tools = llm.bind_tools(tools) |
|
|
|
|
|
|
|
|
callback_handler = get_langfuse_callback_handler() |
|
|
callbacks = [callback_handler] if callback_handler else [] |
|
|
|
|
|
|
|
|
messages = state.get("messages", []) |
|
|
|
|
|
|
|
|
execution_messages = [SystemMessage(content=execution_prompt)] |
|
|
|
|
|
|
|
|
user_query = None |
|
|
for msg in reversed(messages): |
|
|
if msg.type == "human": |
|
|
user_query = msg.content |
|
|
break |
|
|
|
|
|
|
|
|
if user_query and needs_code_execution(user_query): |
|
|
guidance_msg = HumanMessage( |
|
|
content=f"""Task requiring code execution: {user_query} |
|
|
|
|
|
Please analyze this computational task and use the run_python tool to solve it step by step. |
|
|
Break down complex problems into smaller steps and provide clear explanations.""" |
|
|
) |
|
|
execution_messages.append(guidance_msg) |
|
|
|
|
|
|
|
|
for msg in messages: |
|
|
if msg.type != "system": |
|
|
execution_messages.append(msg) |
|
|
|
|
|
|
|
|
response = llm_with_tools.invoke(execution_messages, config={"callbacks": callbacks}) |
|
|
|
|
|
|
|
|
if response.tool_calls: |
|
|
print(f"Execution Agent: LLM requested {len(response.tool_calls)} tool calls") |
|
|
|
|
|
|
|
|
tool_messages = execute_tool_calls(response.tool_calls, tools) |
|
|
|
|
|
|
|
|
execution_messages.extend([response] + tool_messages) |
|
|
|
|
|
|
|
|
final_response = llm.invoke(execution_messages, config={"callbacks": callbacks}) |
|
|
|
|
|
return { |
|
|
**state, |
|
|
"messages": execution_messages + [final_response], |
|
|
"agent_response": final_response, |
|
|
"current_step": "verification" |
|
|
} |
|
|
else: |
|
|
|
|
|
return { |
|
|
**state, |
|
|
"messages": execution_messages + [response], |
|
|
"agent_response": response, |
|
|
"current_step": "verification" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Execution Agent Error: {e}") |
|
|
error_response = AIMessage(content=f"I encountered an error while processing your computational request: {e}") |
|
|
return { |
|
|
**state, |
|
|
"messages": state.get("messages", []) + [error_response], |
|
|
"agent_response": error_response, |
|
|
"current_step": "verification" |
|
|
} |