mjschock's picture
Enhance AgentRunner and graph functionality by introducing memory management and improved state handling. Update __call__ method to support both question input and resuming from interrupts, while adding new memory-related fields to track context, actions, and success/error counts. Refactor step callback logic for better user interaction and state management.
9bd791c unverified
raw
history blame
7.35 kB
"""Define the agent graph and its components."""
import logging
import os
from datetime import datetime
from typing import Dict, List, Optional, TypedDict, Union
import yaml
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, StateGraph
from langgraph.types import interrupt
from smolagents import CodeAgent, LiteLLMModel
from configuration import Configuration
from tools import tools
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Enable LiteLLM debug logging only if environment variable is set
import litellm
if os.getenv("LITELLM_DEBUG", "false").lower() == "true":
litellm.set_verbose = True
logger.setLevel(logging.DEBUG)
else:
litellm.set_verbose = False
logger.setLevel(logging.INFO)
# Configure LiteLLM to drop unsupported parameters
litellm.drop_params = True
# Load default prompt templates from local file
current_dir = os.path.dirname(os.path.abspath(__file__))
prompts_dir = os.path.join(current_dir, "prompts")
yaml_path = os.path.join(prompts_dir, "code_agent.yaml")
with open(yaml_path, "r") as f:
prompt_templates = yaml.safe_load(f)
# Initialize the model and agent using configuration
config = Configuration()
model = LiteLLMModel(
api_base=config.api_base,
api_key=config.api_key,
model_id=config.model_id,
)
agent = CodeAgent(
add_base_tools=True,
max_steps=1, # Execute one step at a time
model=model,
prompt_templates=prompt_templates,
tools=tools,
verbosity_level=logging.DEBUG,
)
class AgentState(TypedDict):
"""State for the agent graph."""
messages: List[Union[HumanMessage, AIMessage, SystemMessage]]
question: str
answer: Optional[str]
step_logs: List[Dict]
is_complete: bool
step_count: int
# Add memory-related fields
context: Dict[str, any] # For storing contextual information
memory_buffer: List[Dict] # For storing important information across steps
last_action: Optional[str] # Track the last action taken
action_history: List[Dict] # History of actions taken
error_count: int # Track error frequency
success_count: int # Track successful operations
class AgentNode:
"""Node that runs the agent."""
def __init__(self, agent: CodeAgent):
"""Initialize the agent node with an agent."""
self.agent = agent
def __call__(
self, state: AgentState, config: Optional[RunnableConfig] = None
) -> AgentState:
"""Run the agent on the current state."""
# Log current state
logger.info("Current state before processing:")
logger.info(f"Messages: {state['messages']}")
logger.info(f"Question: {state['question']}")
logger.info(f"Answer: {state['answer']}")
# Get configuration
cfg = Configuration.from_runnable_config(config)
logger.info(f"Using configuration: {cfg}")
# Log execution start
logger.info("Starting agent execution")
try:
# Run the agent
result = self.agent.run(state["question"])
# Update memory-related fields
new_state = state.copy()
new_state["messages"].append(AIMessage(content=result))
new_state["answer"] = result
new_state["step_count"] += 1
new_state["last_action"] = "agent_response"
new_state["action_history"].append(
{
"step": state["step_count"],
"action": "agent_response",
"result": result,
}
)
new_state["success_count"] += 1
# Store important information in memory buffer
if result:
new_state["memory_buffer"].append(
{
"step": state["step_count"],
"content": result,
"timestamp": datetime.now().isoformat(),
}
)
except Exception as e:
logger.error(f"Error during agent execution: {str(e)}")
new_state = state.copy()
new_state["error_count"] += 1
new_state["action_history"].append(
{"step": state["step_count"], "action": "error", "error": str(e)}
)
raise
# Log updated state
logger.info("Updated state after processing:")
logger.info(f"Messages: {new_state['messages']}")
logger.info(f"Question: {new_state['question']}")
logger.info(f"Answer: {new_state['answer']}")
return new_state
class StepCallbackNode:
"""Node that handles step callbacks and user interaction."""
def __call__(
self, state: AgentState, config: Optional[RunnableConfig] = None
) -> AgentState:
"""Handle step callback and user interaction."""
# Get configuration
cfg = Configuration.from_runnable_config(config)
# Log the step
step_log = {
"step": state["step_count"],
"messages": [msg.content for msg in state["messages"]],
"question": state["question"],
"answer": state["answer"],
}
state["step_logs"].append(step_log)
try:
# Use interrupt for user input and unpack the tuple
interrupt_result = interrupt(
"Press 'c' to continue, 'q' to quit, or 'i' for more info: "
)
user_input = interrupt_result[0] # Get the actual user input
if user_input.lower() == "q":
state["is_complete"] = True
return state
elif user_input.lower() == "i":
logger.info(f"Current step: {state['step_count']}")
logger.info(f"Question: {state['question']}")
logger.info(f"Current answer: {state['answer']}")
return state
elif user_input.lower() == "c":
return state
else:
logger.warning("Invalid input. Please use 'c', 'q', or 'i'.")
return state
except Exception as e:
logger.warning(f"Error during interrupt: {str(e)}")
return state
def build_agent_graph(agent: AgentNode) -> StateGraph:
"""Build the agent graph."""
# Initialize the graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("agent", agent)
workflow.add_node("callback", StepCallbackNode())
# Add edges
workflow.add_edge("agent", "callback")
# Add conditional edges for callback
def should_continue(state: AgentState) -> str:
"""Determine the next node based on state."""
if state["is_complete"]:
return END
# If we have an answer and no errors, we're done
if state["answer"] and state["error_count"] == 0:
return END
# Otherwise continue to agent
return "agent"
workflow.add_conditional_edges(
"callback", should_continue, {END: END, "agent": "agent"}
)
# Set entry point
workflow.set_entry_point("agent")
return workflow.compile()
# Initialize the agent graph
agent_graph = build_agent_graph(AgentNode(agent))