|
import time |
|
from textwrap import dedent |
|
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union |
|
|
|
from langchain.agents import AgentExecutor |
|
from langchain.agents.agent import ExceptionTool |
|
from langchain.agents.tools import InvalidTool |
|
from langchain.callbacks.manager import CallbackManagerForChainRun |
|
from langchain_core.agents import AgentAction, AgentFinish, AgentStep |
|
from langchain_core.exceptions import OutputParserException |
|
from langchain_core.pydantic_v1 import root_validator |
|
from langchain_core.tools import BaseTool |
|
from langchain_core.utils.input import get_color_mapping |
|
|
|
from ..tools.cache_tools import CacheTools |
|
from .cache.cache_hit import CacheHit |
|
|
|
|
|
class CrewAgentExecutor(AgentExecutor): |
|
iterations: int = 0 |
|
max_iterations: Optional[int] = 15 |
|
force_answer_max_iterations: Optional[int] = None |
|
|
|
@root_validator() |
|
def set_force_answer_max_iterations(cls, values: Dict) -> Dict: |
|
values["force_answer_max_iterations"] = values["max_iterations"] - 2 |
|
return values |
|
|
|
def _should_force_answer(self) -> bool: |
|
return True if self.iterations == self.force_answer_max_iterations else False |
|
|
|
def _force_answer(self, output: AgentAction): |
|
return AgentStep( |
|
action=output, |
|
observation=dedent( |
|
"""\ |
|
I've used too many tools for this task. |
|
I'm going to give you my absolute BEST Final answer now and |
|
not use any more tools.""" |
|
), |
|
) |
|
|
|
def _call( |
|
self, |
|
inputs: Dict[str, str], |
|
run_manager: Optional[CallbackManagerForChainRun] = None, |
|
) -> Dict[str, Any]: |
|
"""Run text through and get agent response.""" |
|
|
|
name_to_tool_map = {tool.name: tool for tool in self.tools} |
|
|
|
color_mapping = get_color_mapping( |
|
[tool.name for tool in self.tools], excluded_colors=["green", "red"] |
|
) |
|
intermediate_steps: List[Tuple[AgentAction, str]] = [] |
|
|
|
self.iterations = 0 |
|
time_elapsed = 0.0 |
|
start_time = time.time() |
|
|
|
while self._should_continue(self.iterations, time_elapsed): |
|
next_step_output = self._take_next_step( |
|
name_to_tool_map, |
|
color_mapping, |
|
inputs, |
|
intermediate_steps, |
|
run_manager=run_manager, |
|
) |
|
if isinstance(next_step_output, AgentFinish): |
|
return self._return( |
|
next_step_output, intermediate_steps, run_manager=run_manager |
|
) |
|
|
|
intermediate_steps.extend(next_step_output) |
|
if len(next_step_output) == 1: |
|
next_step_action = next_step_output[0] |
|
|
|
tool_return = self._get_tool_return(next_step_action) |
|
if tool_return is not None: |
|
return self._return( |
|
tool_return, intermediate_steps, run_manager=run_manager |
|
) |
|
self.iterations += 1 |
|
time_elapsed = time.time() - start_time |
|
output = self.agent.return_stopped_response( |
|
self.early_stopping_method, intermediate_steps, **inputs |
|
) |
|
return self._return(output, intermediate_steps, run_manager=run_manager) |
|
|
|
def _iter_next_step( |
|
self, |
|
name_to_tool_map: Dict[str, BaseTool], |
|
color_mapping: Dict[str, str], |
|
inputs: Dict[str, str], |
|
intermediate_steps: List[Tuple[AgentAction, str]], |
|
run_manager: Optional[CallbackManagerForChainRun] = None, |
|
) -> Iterator[Union[AgentFinish, AgentAction, AgentStep]]: |
|
"""Take a single step in the thought-action-observation loop. |
|
|
|
Override this to take control of how the agent makes and acts on choices. |
|
""" |
|
try: |
|
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) |
|
|
|
|
|
output = self.agent.plan( |
|
intermediate_steps, |
|
callbacks=run_manager.get_child() if run_manager else None, |
|
**inputs, |
|
) |
|
if self._should_force_answer(): |
|
if isinstance(output, AgentAction): |
|
output = output |
|
else: |
|
output = output.action |
|
yield self._force_answer(output) |
|
return |
|
|
|
except OutputParserException as e: |
|
if isinstance(self.handle_parsing_errors, bool): |
|
raise_error = not self.handle_parsing_errors |
|
else: |
|
raise_error = False |
|
if raise_error: |
|
raise ValueError( |
|
"An output parsing error occurred. " |
|
"In order to pass this error back to the agent and have it try " |
|
"again, pass `handle_parsing_errors=True` to the AgentExecutor. " |
|
f"This is the error: {str(e)}" |
|
) |
|
text = str(e) |
|
if isinstance(self.handle_parsing_errors, bool): |
|
if e.send_to_llm: |
|
observation = str(e.observation) |
|
text = str(e.llm_output) |
|
else: |
|
observation = "Invalid or incomplete response" |
|
elif isinstance(self.handle_parsing_errors, str): |
|
observation = self.handle_parsing_errors |
|
elif callable(self.handle_parsing_errors): |
|
observation = self.handle_parsing_errors(e) |
|
else: |
|
raise ValueError("Got unexpected type of `handle_parsing_errors`") |
|
output = AgentAction("_Exception", observation, text) |
|
if run_manager: |
|
run_manager.on_agent_action(output, color="green") |
|
tool_run_kwargs = self.agent.tool_run_logging_kwargs() |
|
observation = ExceptionTool().run( |
|
output.tool_input, |
|
verbose=self.verbose, |
|
color=None, |
|
callbacks=run_manager.get_child() if run_manager else None, |
|
**tool_run_kwargs, |
|
) |
|
|
|
if self._should_force_answer(): |
|
yield self._force_answer(output) |
|
return |
|
|
|
yield AgentStep(action=output, observation=observation) |
|
return |
|
|
|
|
|
if isinstance(output, AgentFinish): |
|
yield output |
|
return |
|
|
|
|
|
if isinstance(output, CacheHit): |
|
cache = output.cache |
|
action = output.action |
|
tool = CacheTools(cache_handler=cache).tool() |
|
output = action.copy() |
|
output.tool_input = f"tool:{action.tool}|input:{action.tool_input}" |
|
output.tool = tool.name |
|
name_to_tool_map[tool.name] = tool |
|
color_mapping[tool.name] = color_mapping[action.tool] |
|
|
|
actions: List[AgentAction] |
|
actions = [output] if isinstance(output, AgentAction) else output |
|
yield from actions |
|
for agent_action in actions: |
|
if run_manager: |
|
run_manager.on_agent_action(agent_action, color="green") |
|
|
|
if agent_action.tool in name_to_tool_map: |
|
tool = name_to_tool_map[agent_action.tool] |
|
return_direct = tool.return_direct |
|
color = color_mapping[agent_action.tool] |
|
tool_run_kwargs = self.agent.tool_run_logging_kwargs() |
|
if return_direct: |
|
tool_run_kwargs["llm_prefix"] = "" |
|
|
|
observation = tool.run( |
|
agent_action.tool_input, |
|
verbose=self.verbose, |
|
color=color, |
|
callbacks=run_manager.get_child() if run_manager else None, |
|
**tool_run_kwargs, |
|
) |
|
else: |
|
tool_run_kwargs = self.agent.tool_run_logging_kwargs() |
|
observation = InvalidTool().run( |
|
{ |
|
"requested_tool_name": agent_action.tool, |
|
"available_tool_names": list(name_to_tool_map.keys()), |
|
}, |
|
verbose=self.verbose, |
|
color=None, |
|
callbacks=run_manager.get_child() if run_manager else None, |
|
**tool_run_kwargs, |
|
) |
|
yield AgentStep(action=agent_action, observation=observation) |
|
|