from typing import Dict, Iterator, List, Optional, Tuple, Union from langchain.agents import AgentExecutor from langchain.agents.agent import ExceptionTool from langchain.agents.tools import InvalidTool from langchain.callbacks.manager import CallbackManagerForChainRun from langchain_core.agents import AgentAction, AgentFinish, AgentStep from langchain_core.exceptions import OutputParserException from langchain_core.tools import BaseTool from ..tools.cache_tools import CacheTools from .cache.cache_hit import CacheHit class CrewAgentExecutor(AgentExecutor): def _iter_next_step( self, name_to_tool_map: Dict[str, BaseTool], color_mapping: Dict[str, str], inputs: Dict[str, str], intermediate_steps: List[Tuple[AgentAction, str]], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]: """Take a single step in the thought-action-observation loop. Override this to take control of how the agent makes and acts on choices. """ try: intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) # Call the LLM to see what to do. output = self.agent.plan( intermediate_steps, callbacks=run_manager.get_child() if run_manager else None, **inputs, ) except OutputParserException as e: if isinstance(self.handle_parsing_errors, bool): raise_error = not self.handle_parsing_errors else: raise_error = False if raise_error: raise ValueError( "An output parsing error occurred. " "In order to pass this error back to the agent and have it try " "again, pass `handle_parsing_errors=True` to the AgentExecutor. " f"This is the error: {str(e)}" ) text = str(e) if isinstance(self.handle_parsing_errors, bool): if e.send_to_llm: observation = str(e.observation) text = str(e.llm_output) else: observation = "Invalid or incomplete response" elif isinstance(self.handle_parsing_errors, str): observation = self.handle_parsing_errors elif callable(self.handle_parsing_errors): observation = self.handle_parsing_errors(e) else: raise ValueError("Got unexpected type of `handle_parsing_errors`") output = AgentAction("_Exception", observation, text) if run_manager: run_manager.on_agent_action(output, color="green") tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = ExceptionTool().run( output.tool_input, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) yield AgentStep(action=output, observation=observation) return # If the tool chosen is the finishing tool, then we end and return. if isinstance(output, AgentFinish): yield output return # Override tool usage to use CacheTools if isinstance(output, CacheHit): cache = output.cache action = output.action tool = CacheTools(cache_handler=cache).tool() output = action.copy() output.tool_input = f"tool:{action.tool}|input:{action.tool_input}" output.tool = tool.name name_to_tool_map[tool.name] = tool color_mapping[tool.name] = color_mapping[action.tool] actions: List[AgentAction] if isinstance(output, AgentAction): actions = [output] else: actions = output for agent_action in actions: yield agent_action for agent_action in actions: if run_manager: run_manager.on_agent_action(agent_action, color="green") # Otherwise we lookup the tool if agent_action.tool in name_to_tool_map: tool = name_to_tool_map[agent_action.tool] return_direct = tool.return_direct color = color_mapping[agent_action.tool] tool_run_kwargs = self.agent.tool_run_logging_kwargs() if return_direct: tool_run_kwargs["llm_prefix"] = "" # We then call the tool on the tool input to get an observation observation = tool.run( agent_action.tool_input, verbose=self.verbose, color=color, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) else: tool_run_kwargs = self.agent.tool_run_logging_kwargs() observation = InvalidTool().run( { "requested_tool_name": agent_action.tool, "available_tool_names": list(name_to_tool_map.keys()), }, verbose=self.verbose, color=None, callbacks=run_manager.get_child() if run_manager else None, **tool_run_kwargs, ) yield AgentStep(action=agent_action, observation=observation)