Spaces:
Runtime error
Runtime error
"""Chain that takes in an input and produces an action and action input.""" | |
from __future__ import annotations | |
import asyncio | |
import json | |
import logging | |
import time | |
from abc import abstractmethod | |
from pathlib import Path | |
from typing import ( | |
Any, | |
Callable, | |
Dict, | |
List, | |
Optional, | |
Sequence, | |
Tuple, | |
Union, | |
) | |
import yaml | |
from langchain_core.agents import ( | |
AgentAction, | |
AgentFinish, | |
) | |
from langchain_core.exceptions import ( | |
OutputParserException, | |
) | |
from langchain_core.language_models import BaseLanguageModel | |
from langchain_core.messages import BaseMessage | |
from langchain_core.output_parsers import ( | |
BaseOutputParser, | |
) | |
from langchain_core.prompts import ( | |
BasePromptTemplate, | |
) | |
from langchain_core.prompts.few_shot import FewShotPromptTemplate | |
from langchain_core.prompts.prompt import PromptTemplate | |
from langchain_core.pydantic_v1 import BaseModel, root_validator | |
from langchain_core.runnables import Runnable | |
from langchain_core.utils.input import get_color_mapping | |
from langchain.agents.agent_iterator import AgentExecutorIterator | |
from langchain.agents.agent_types import AgentType | |
from langchain.agents.tools import InvalidTool | |
from langchain.callbacks.base import BaseCallbackManager | |
from langchain.callbacks.manager import ( | |
AsyncCallbackManagerForChainRun, | |
AsyncCallbackManagerForToolRun, | |
CallbackManagerForChainRun, | |
CallbackManagerForToolRun, | |
Callbacks, | |
) | |
from langchain.chains.base import Chain | |
from langchain.chains.llm import LLMChain | |
from langchain.tools.base import BaseTool | |
from langchain.utilities.asyncio import asyncio_timeout | |
logger = logging.getLogger(__name__) | |
class BaseSingleActionAgent(BaseModel): | |
"""Base Single Action Agent class.""" | |
def return_values(self) -> List[str]: | |
"""Return values of the agent.""" | |
return ["output"] | |
def get_allowed_tools(self) -> Optional[List[str]]: | |
return None | |
def plan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[AgentAction, AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with observations | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
async def aplan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[AgentAction, AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with observations | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
def input_keys(self) -> List[str]: | |
"""Return the input keys. | |
:meta private: | |
""" | |
def return_stopped_response( | |
self, | |
early_stopping_method: str, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
**kwargs: Any, | |
) -> AgentFinish: | |
"""Return response when agent has been stopped due to max iterations.""" | |
if early_stopping_method == "force": | |
# `force` just returns a constant string | |
return AgentFinish( | |
{"output": "Agent stopped due to iteration limit or time limit."}, "" | |
) | |
else: | |
raise ValueError( | |
f"Got unsupported early_stopping_method `{early_stopping_method}`" | |
) | |
def from_llm_and_tools( | |
cls, | |
llm: BaseLanguageModel, | |
tools: Sequence[BaseTool], | |
callback_manager: Optional[BaseCallbackManager] = None, | |
**kwargs: Any, | |
) -> BaseSingleActionAgent: | |
raise NotImplementedError | |
def _agent_type(self) -> str: | |
"""Return Identifier of agent type.""" | |
raise NotImplementedError | |
def dict(self, **kwargs: Any) -> Dict: | |
"""Return dictionary representation of agent.""" | |
_dict = super().dict() | |
try: | |
_type = self._agent_type | |
except NotImplementedError: | |
_type = None | |
if isinstance(_type, AgentType): | |
_dict["_type"] = str(_type.value) | |
elif _type is not None: | |
_dict["_type"] = _type | |
return _dict | |
def save(self, file_path: Union[Path, str]) -> None: | |
"""Save the agent. | |
Args: | |
file_path: Path to file to save the agent to. | |
Example: | |
.. code-block:: python | |
# If working with agent executor | |
agent.agent.save(file_path="path/agent.yaml") | |
""" | |
# Convert file to Path object. | |
if isinstance(file_path, str): | |
save_path = Path(file_path) | |
else: | |
save_path = file_path | |
directory_path = save_path.parent | |
directory_path.mkdir(parents=True, exist_ok=True) | |
# Fetch dictionary to save | |
agent_dict = self.dict() | |
if "_type" not in agent_dict: | |
raise NotImplementedError(f"Agent {self} does not support saving") | |
if save_path.suffix == ".json": | |
with open(file_path, "w") as f: | |
json.dump(agent_dict, f, indent=4) | |
elif save_path.suffix == ".yaml": | |
with open(file_path, "w") as f: | |
yaml.dump(agent_dict, f, default_flow_style=False) | |
else: | |
raise ValueError(f"{save_path} must be json or yaml") | |
def tool_run_logging_kwargs(self) -> Dict: | |
return {} | |
class BaseMultiActionAgent(BaseModel): | |
"""Base Multi Action Agent class.""" | |
def return_values(self) -> List[str]: | |
"""Return values of the agent.""" | |
return ["output"] | |
def get_allowed_tools(self) -> Optional[List[str]]: | |
return None | |
def plan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[List[AgentAction], AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with the observations. | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Actions specifying what tool to use. | |
""" | |
async def aplan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[List[AgentAction], AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with the observations. | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Actions specifying what tool to use. | |
""" | |
def input_keys(self) -> List[str]: | |
"""Return the input keys. | |
:meta private: | |
""" | |
def return_stopped_response( | |
self, | |
early_stopping_method: str, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
**kwargs: Any, | |
) -> AgentFinish: | |
"""Return response when agent has been stopped due to max iterations.""" | |
if early_stopping_method == "force": | |
# `force` just returns a constant string | |
return AgentFinish({"output": "Agent stopped due to max iterations."}, "") | |
else: | |
raise ValueError( | |
f"Got unsupported early_stopping_method `{early_stopping_method}`" | |
) | |
def _agent_type(self) -> str: | |
"""Return Identifier of agent type.""" | |
raise NotImplementedError | |
def dict(self, **kwargs: Any) -> Dict: | |
"""Return dictionary representation of agent.""" | |
_dict = super().dict() | |
try: | |
_dict["_type"] = str(self._agent_type) | |
except NotImplementedError: | |
pass | |
return _dict | |
def save(self, file_path: Union[Path, str]) -> None: | |
"""Save the agent. | |
Args: | |
file_path: Path to file to save the agent to. | |
Example: | |
.. code-block:: python | |
# If working with agent executor | |
agent.agent.save(file_path="path/agent.yaml") | |
""" | |
# Convert file to Path object. | |
if isinstance(file_path, str): | |
save_path = Path(file_path) | |
else: | |
save_path = file_path | |
# Fetch dictionary to save | |
agent_dict = self.dict() | |
if "_type" not in agent_dict: | |
raise NotImplementedError(f"Agent {self} does not support saving.") | |
directory_path = save_path.parent | |
directory_path.mkdir(parents=True, exist_ok=True) | |
if save_path.suffix == ".json": | |
with open(file_path, "w") as f: | |
json.dump(agent_dict, f, indent=4) | |
elif save_path.suffix == ".yaml": | |
with open(file_path, "w") as f: | |
yaml.dump(agent_dict, f, default_flow_style=False) | |
else: | |
raise ValueError(f"{save_path} must be json or yaml") | |
def tool_run_logging_kwargs(self) -> Dict: | |
return {} | |
class AgentOutputParser(BaseOutputParser[Union[AgentAction, AgentFinish]]): | |
"""Base class for parsing agent output into agent action/finish.""" | |
def parse(self, text: str) -> Union[AgentAction, AgentFinish]: | |
"""Parse text into agent action/finish.""" | |
class MultiActionAgentOutputParser( | |
BaseOutputParser[Union[List[AgentAction], AgentFinish]] | |
): | |
"""Base class for parsing agent output into agent actions/finish.""" | |
def parse(self, text: str) -> Union[List[AgentAction], AgentFinish]: | |
"""Parse text into agent actions/finish.""" | |
class RunnableAgent(BaseSingleActionAgent): | |
"""Agent powered by runnables.""" | |
runnable: Runnable[dict, Union[AgentAction, AgentFinish]] | |
"""Runnable to call to get agent action.""" | |
_input_keys: List[str] = [] | |
"""Input keys.""" | |
class Config: | |
"""Configuration for this pydantic object.""" | |
arbitrary_types_allowed = True | |
def return_values(self) -> List[str]: | |
"""Return values of the agent.""" | |
return [] | |
def input_keys(self) -> List[str]: | |
"""Return the input keys. | |
Returns: | |
List of input keys. | |
""" | |
return self._input_keys | |
def plan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[AgentAction, AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with the observations. | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} | |
output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) | |
return output | |
async def aplan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[ | |
AgentAction, | |
AgentFinish, | |
]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with observations | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} | |
output = await self.runnable.ainvoke(inputs, config={"callbacks": callbacks}) | |
return output | |
class RunnableMultiActionAgent(BaseMultiActionAgent): | |
"""Agent powered by runnables.""" | |
runnable: Runnable[dict, Union[List[AgentAction], AgentFinish]] | |
"""Runnable to call to get agent actions.""" | |
_input_keys: List[str] = [] | |
"""Input keys.""" | |
class Config: | |
"""Configuration for this pydantic object.""" | |
arbitrary_types_allowed = True | |
def return_values(self) -> List[str]: | |
"""Return values of the agent.""" | |
return [] | |
def input_keys(self) -> List[str]: | |
"""Return the input keys. | |
Returns: | |
List of input keys. | |
""" | |
return self._input_keys | |
def plan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[ | |
List[AgentAction], | |
AgentFinish, | |
]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with the observations. | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} | |
output = self.runnable.invoke(inputs, config={"callbacks": callbacks}) | |
return output | |
async def aplan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[ | |
List[AgentAction], | |
AgentFinish, | |
]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with observations | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
inputs = {**kwargs, **{"intermediate_steps": intermediate_steps}} | |
output = await self.runnable.ainvoke(inputs, config={"callbacks": callbacks}) | |
return output | |
class LLMSingleActionAgent(BaseSingleActionAgent): | |
"""Base class for single action agents.""" | |
llm_chain: LLMChain | |
"""LLMChain to use for agent.""" | |
output_parser: AgentOutputParser | |
"""Output parser to use for agent.""" | |
stop: List[str] | |
"""List of strings to stop on.""" | |
def input_keys(self) -> List[str]: | |
"""Return the input keys. | |
Returns: | |
List of input keys. | |
""" | |
return list(set(self.llm_chain.input_keys) - {"intermediate_steps"}) | |
def dict(self, **kwargs: Any) -> Dict: | |
"""Return dictionary representation of agent.""" | |
_dict = super().dict() | |
del _dict["output_parser"] | |
return _dict | |
def plan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[AgentAction, AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with the observations. | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
output = self.llm_chain.run( | |
intermediate_steps=intermediate_steps, | |
stop=self.stop, | |
callbacks=callbacks, | |
**kwargs, | |
) | |
return self.output_parser.parse(output) | |
async def aplan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[AgentAction, AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with observations | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
output = await self.llm_chain.arun( | |
intermediate_steps=intermediate_steps, | |
stop=self.stop, | |
callbacks=callbacks, | |
**kwargs, | |
) | |
return self.output_parser.parse(output) | |
def tool_run_logging_kwargs(self) -> Dict: | |
return { | |
"llm_prefix": "", | |
"observation_prefix": "" if len(self.stop) == 0 else self.stop[0], | |
} | |
class Agent(BaseSingleActionAgent): | |
"""Agent that calls the language model and deciding the action. | |
This is driven by an LLMChain. The prompt in the LLMChain MUST include | |
a variable called "agent_scratchpad" where the agent can put its | |
intermediary work. | |
""" | |
llm_chain: LLMChain | |
output_parser: AgentOutputParser | |
allowed_tools: Optional[List[str]] = None | |
def dict(self, **kwargs: Any) -> Dict: | |
"""Return dictionary representation of agent.""" | |
_dict = super().dict() | |
del _dict["output_parser"] | |
return _dict | |
def get_allowed_tools(self) -> Optional[List[str]]: | |
return self.allowed_tools | |
def return_values(self) -> List[str]: | |
return ["output"] | |
def _fix_text(self, text: str) -> str: | |
"""Fix the text.""" | |
raise ValueError("fix_text not implemented for this agent.") | |
def _stop(self) -> List[str]: | |
return [ | |
f"\n{self.observation_prefix.rstrip()}", | |
f"\n\t{self.observation_prefix.rstrip()}", | |
] | |
def _construct_scratchpad( | |
self, intermediate_steps: List[Tuple[AgentAction, str]] | |
) -> Union[str, List[BaseMessage]]: | |
"""Construct the scratchpad that lets the agent continue its thought process.""" | |
thoughts = "" | |
for action, observation in intermediate_steps: | |
thoughts += action.log | |
thoughts += f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}" | |
return thoughts | |
def plan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[AgentAction, AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with observations | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
full_inputs = self.get_full_inputs(intermediate_steps, **kwargs) | |
full_output = self.llm_chain.predict(callbacks=callbacks, **full_inputs) | |
return self.output_parser.parse(full_output) | |
async def aplan( | |
self, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> Union[AgentAction, AgentFinish]: | |
"""Given input, decided what to do. | |
Args: | |
intermediate_steps: Steps the LLM has taken to date, | |
along with observations | |
callbacks: Callbacks to run. | |
**kwargs: User inputs. | |
Returns: | |
Action specifying what tool to use. | |
""" | |
full_inputs = self.get_full_inputs(intermediate_steps, **kwargs) | |
full_output = await self.llm_chain.apredict(callbacks=callbacks, **full_inputs) | |
agent_output = await self.output_parser.aparse(full_output) | |
return agent_output | |
def get_full_inputs( | |
self, intermediate_steps: List[Tuple[AgentAction, str]], **kwargs: Any | |
) -> Dict[str, Any]: | |
"""Create the full inputs for the LLMChain from intermediate steps.""" | |
thoughts = self._construct_scratchpad(intermediate_steps) | |
new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop} | |
full_inputs = {**kwargs, **new_inputs} | |
return full_inputs | |
def input_keys(self) -> List[str]: | |
"""Return the input keys. | |
:meta private: | |
""" | |
return list(set(self.llm_chain.input_keys) - {"agent_scratchpad"}) | |
def validate_prompt(cls, values: Dict) -> Dict: | |
"""Validate that prompt matches format.""" | |
prompt = values["llm_chain"].prompt | |
if "agent_scratchpad" not in prompt.input_variables: | |
logger.warning( | |
"`agent_scratchpad` should be a variable in prompt.input_variables." | |
" Did not find it, so adding it at the end." | |
) | |
prompt.input_variables.append("agent_scratchpad") | |
if isinstance(prompt, PromptTemplate): | |
prompt.template += "\n{agent_scratchpad}" | |
elif isinstance(prompt, FewShotPromptTemplate): | |
prompt.suffix += "\n{agent_scratchpad}" | |
else: | |
raise ValueError(f"Got unexpected prompt type {type(prompt)}") | |
return values | |
def observation_prefix(self) -> str: | |
"""Prefix to append the observation with.""" | |
def llm_prefix(self) -> str: | |
"""Prefix to append the LLM call with.""" | |
def create_prompt(cls, tools: Sequence[BaseTool]) -> BasePromptTemplate: | |
"""Create a prompt for this class.""" | |
def _validate_tools(cls, tools: Sequence[BaseTool]) -> None: | |
"""Validate that appropriate tools are passed in.""" | |
pass | |
def _get_default_output_parser(cls, **kwargs: Any) -> AgentOutputParser: | |
"""Get default output parser for this class.""" | |
def from_llm_and_tools( | |
cls, | |
llm: BaseLanguageModel, | |
tools: Sequence[BaseTool], | |
callback_manager: Optional[BaseCallbackManager] = None, | |
output_parser: Optional[AgentOutputParser] = None, | |
**kwargs: Any, | |
) -> Agent: | |
"""Construct an agent from an LLM and tools.""" | |
cls._validate_tools(tools) | |
llm_chain = LLMChain( | |
llm=llm, | |
prompt=cls.create_prompt(tools), | |
callback_manager=callback_manager, | |
) | |
tool_names = [tool.name for tool in tools] | |
_output_parser = output_parser or cls._get_default_output_parser() | |
return cls( | |
llm_chain=llm_chain, | |
allowed_tools=tool_names, | |
output_parser=_output_parser, | |
**kwargs, | |
) | |
def return_stopped_response( | |
self, | |
early_stopping_method: str, | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
**kwargs: Any, | |
) -> AgentFinish: | |
"""Return response when agent has been stopped due to max iterations.""" | |
if early_stopping_method == "force": | |
# `force` just returns a constant string | |
return AgentFinish( | |
{"output": "Agent stopped due to iteration limit or time limit."}, "" | |
) | |
elif early_stopping_method == "generate": | |
# Generate does one final forward pass | |
thoughts = "" | |
for action, observation in intermediate_steps: | |
thoughts += action.log | |
thoughts += ( | |
f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}" | |
) | |
# Adding to the previous steps, we now tell the LLM to make a final pred | |
thoughts += ( | |
"\n\nI now need to return a final answer based on the previous steps:" | |
) | |
new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop} | |
full_inputs = {**kwargs, **new_inputs} | |
full_output = self.llm_chain.predict(**full_inputs) | |
# We try to extract a final answer | |
parsed_output = self.output_parser.parse(full_output) | |
if isinstance(parsed_output, AgentFinish): | |
# If we can extract, we send the correct stuff | |
return parsed_output | |
else: | |
# If we can extract, but the tool is not the final tool, | |
# we just return the full output | |
return AgentFinish({"output": full_output}, full_output) | |
else: | |
raise ValueError( | |
"early_stopping_method should be one of `force` or `generate`, " | |
f"got {early_stopping_method}" | |
) | |
def tool_run_logging_kwargs(self) -> Dict: | |
return { | |
"llm_prefix": self.llm_prefix, | |
"observation_prefix": self.observation_prefix, | |
} | |
class ExceptionTool(BaseTool): | |
"""Tool that just returns the query.""" | |
name: str = "_Exception" | |
"""Name of the tool.""" | |
description: str = "Exception tool" | |
"""Description of the tool.""" | |
def _run( | |
self, | |
query: str, | |
run_manager: Optional[CallbackManagerForToolRun] = None, | |
) -> str: | |
return query | |
async def _arun( | |
self, | |
query: str, | |
run_manager: Optional[AsyncCallbackManagerForToolRun] = None, | |
) -> str: | |
return query | |
class AgentExecutor(Chain): | |
"""Agent that is using tools.""" | |
agent: Union[BaseSingleActionAgent, BaseMultiActionAgent] | |
"""The agent to run for creating a plan and determining actions | |
to take at each step of the execution loop.""" | |
tools: Sequence[BaseTool] | |
"""The valid tools the agent can call.""" | |
return_intermediate_steps: bool = False | |
"""Whether to return the agent's trajectory of intermediate steps | |
at the end in addition to the final output.""" | |
max_iterations: Optional[int] = 15 | |
"""The maximum number of steps to take before ending the execution | |
loop. | |
Setting to 'None' could lead to an infinite loop.""" | |
max_execution_time: Optional[float] = None | |
"""The maximum amount of wall clock time to spend in the execution | |
loop. | |
""" | |
early_stopping_method: str = "force" | |
"""The method to use for early stopping if the agent never | |
returns `AgentFinish`. Either 'force' or 'generate'. | |
`"force"` returns a string saying that it stopped because it met a | |
time or iteration limit. | |
`"generate"` calls the agent's LLM Chain one final time to generate | |
a final answer based on the previous steps. | |
""" | |
handle_parsing_errors: Union[ | |
bool, str, Callable[[OutputParserException], str] | |
] = False | |
"""How to handle errors raised by the agent's output parser. | |
Defaults to `False`, which raises the error. | |
If `true`, the error will be sent back to the LLM as an observation. | |
If a string, the string itself will be sent to the LLM as an observation. | |
If a callable function, the function will be called with the exception | |
as an argument, and the result of that function will be passed to the agent | |
as an observation. | |
""" | |
trim_intermediate_steps: Union[ | |
int, Callable[[List[Tuple[AgentAction, str]]], List[Tuple[AgentAction, str]]] | |
] = -1 | |
def from_agent_and_tools( | |
cls, | |
agent: Union[BaseSingleActionAgent, BaseMultiActionAgent], | |
tools: Sequence[BaseTool], | |
callbacks: Callbacks = None, | |
**kwargs: Any, | |
) -> AgentExecutor: | |
"""Create from agent and tools.""" | |
return cls( | |
agent=agent, | |
tools=tools, | |
callbacks=callbacks, | |
**kwargs, | |
) | |
def validate_tools(cls, values: Dict) -> Dict: | |
"""Validate that tools are compatible with agent.""" | |
agent = values["agent"] | |
tools = values["tools"] | |
allowed_tools = agent.get_allowed_tools() | |
if allowed_tools is not None: | |
if set(allowed_tools) != set([tool.name for tool in tools]): | |
raise ValueError( | |
f"Allowed tools ({allowed_tools}) different than " | |
f"provided tools ({[tool.name for tool in tools]})" | |
) | |
return values | |
def validate_return_direct_tool(cls, values: Dict) -> Dict: | |
"""Validate that tools are compatible with agent.""" | |
agent = values["agent"] | |
tools = values["tools"] | |
if isinstance(agent, BaseMultiActionAgent): | |
for tool in tools: | |
if tool.return_direct: | |
raise ValueError( | |
"Tools that have `return_direct=True` are not allowed " | |
"in multi-action agents" | |
) | |
return values | |
def validate_runnable_agent(cls, values: Dict) -> Dict: | |
"""Convert runnable to agent if passed in.""" | |
agent = values["agent"] | |
if isinstance(agent, Runnable): | |
try: | |
output_type = agent.OutputType | |
except Exception as _: | |
multi_action = False | |
else: | |
multi_action = output_type == Union[List[AgentAction], AgentFinish] | |
if multi_action: | |
values["agent"] = RunnableMultiActionAgent(runnable=agent) | |
else: | |
values["agent"] = RunnableAgent(runnable=agent) | |
return values | |
def save(self, file_path: Union[Path, str]) -> None: | |
"""Raise error - saving not supported for Agent Executors.""" | |
raise ValueError( | |
"Saving not supported for agent executors. " | |
"If you are trying to save the agent, please use the " | |
"`.save_agent(...)`" | |
) | |
def save_agent(self, file_path: Union[Path, str]) -> None: | |
"""Save the underlying agent.""" | |
return self.agent.save(file_path) | |
def iter( | |
self, | |
inputs: Any, | |
callbacks: Callbacks = None, | |
*, | |
include_run_info: bool = False, | |
async_: bool = False, | |
) -> AgentExecutorIterator: | |
"""Enables iteration over steps taken to reach final output.""" | |
return AgentExecutorIterator( | |
self, | |
inputs, | |
callbacks, | |
tags=self.tags, | |
include_run_info=include_run_info, | |
async_=async_, | |
) | |
def input_keys(self) -> List[str]: | |
"""Return the input keys. | |
:meta private: | |
""" | |
return self.agent.input_keys | |
def output_keys(self) -> List[str]: | |
"""Return the singular output key. | |
:meta private: | |
""" | |
if self.return_intermediate_steps: | |
return self.agent.return_values + ["intermediate_steps"] | |
else: | |
return self.agent.return_values | |
def lookup_tool(self, name: str) -> BaseTool: | |
"""Lookup tool by name.""" | |
return {tool.name: tool for tool in self.tools}[name] | |
def _should_continue(self, iterations: int, time_elapsed: float) -> bool: | |
if self.max_iterations is not None and iterations >= self.max_iterations: | |
return False | |
if ( | |
self.max_execution_time is not None | |
and time_elapsed >= self.max_execution_time | |
): | |
return False | |
return True | |
def _return( | |
self, | |
output: AgentFinish, | |
intermediate_steps: list, | |
run_manager: Optional[CallbackManagerForChainRun] = None, | |
) -> Dict[str, Any]: | |
if run_manager: | |
run_manager.on_agent_finish(output, color="green", verbose=self.verbose) | |
final_output = output.return_values | |
if self.return_intermediate_steps: | |
final_output["intermediate_steps"] = intermediate_steps | |
return final_output | |
async def _areturn( | |
self, | |
output: AgentFinish, | |
intermediate_steps: list, | |
run_manager: Optional[AsyncCallbackManagerForChainRun] = None, | |
) -> Dict[str, Any]: | |
if run_manager: | |
await run_manager.on_agent_finish( | |
output, color="green", verbose=self.verbose | |
) | |
final_output = output.return_values | |
if self.return_intermediate_steps: | |
final_output["intermediate_steps"] = intermediate_steps | |
return final_output | |
def _take_next_step( | |
self, | |
name_to_tool_map: Dict[str, BaseTool], | |
color_mapping: Dict[str, str], | |
inputs: Dict[str, str], | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
run_manager: Optional[CallbackManagerForChainRun] = None, | |
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: | |
"""Take a single step in the thought-action-observation loop. | |
Override this to take control of how the agent makes and acts on choices. | |
""" | |
try: | |
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) | |
# Call the LLM to see what to do. | |
output = self.agent.plan( | |
intermediate_steps, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**inputs, | |
) | |
except OutputParserException as e: | |
if isinstance(self.handle_parsing_errors, bool): | |
raise_error = not self.handle_parsing_errors | |
else: | |
raise_error = False | |
if raise_error: | |
raise ValueError( | |
"An output parsing error occurred. " | |
"In order to pass this error back to the agent and have it try " | |
"again, pass `handle_parsing_errors=True` to the AgentExecutor. " | |
f"This is the error: {str(e)}" | |
) | |
text = str(e) | |
if isinstance(self.handle_parsing_errors, bool): | |
if e.send_to_llm: | |
observation = str(e.observation) | |
text = str(e.llm_output) | |
else: | |
observation = "Invalid or incomplete response" | |
elif isinstance(self.handle_parsing_errors, str): | |
observation = self.handle_parsing_errors | |
elif callable(self.handle_parsing_errors): | |
observation = self.handle_parsing_errors(e) | |
else: | |
raise ValueError("Got unexpected type of `handle_parsing_errors`") | |
output = AgentAction("_Exception", observation, text) | |
if run_manager: | |
run_manager.on_agent_action(output, color="green") | |
tool_run_kwargs = self.agent.tool_run_logging_kwargs() | |
observation = ExceptionTool().run( | |
output.tool_input, | |
verbose=self.verbose, | |
color=None, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**tool_run_kwargs, | |
) | |
return [(output, observation)] | |
# If the tool chosen is the finishing tool, then we end and return. | |
if isinstance(output, AgentFinish): | |
return output | |
actions: List[AgentAction] | |
if isinstance(output, AgentAction): | |
actions = [output] | |
else: | |
actions = output | |
result = [] | |
for agent_action in actions: | |
if run_manager: | |
run_manager.on_agent_action(agent_action, color="green") | |
# Otherwise we lookup the tool | |
if agent_action.tool in name_to_tool_map: | |
tool = name_to_tool_map[agent_action.tool] | |
return_direct = tool.return_direct | |
color = color_mapping[agent_action.tool] | |
tool_run_kwargs = self.agent.tool_run_logging_kwargs() | |
if return_direct: | |
tool_run_kwargs["llm_prefix"] = "" | |
# We then call the tool on the tool input to get an observation | |
observation = tool.run( | |
agent_action.tool_input, | |
verbose=self.verbose, | |
color=color, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**tool_run_kwargs, | |
) | |
else: | |
tool_run_kwargs = self.agent.tool_run_logging_kwargs() | |
observation = InvalidTool().run( | |
{ | |
"requested_tool_name": agent_action.tool, | |
"available_tool_names": list(name_to_tool_map.keys()), | |
}, | |
verbose=self.verbose, | |
color=None, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**tool_run_kwargs, | |
) | |
result.append((agent_action, observation)) | |
return result | |
async def _atake_next_step( | |
self, | |
name_to_tool_map: Dict[str, BaseTool], | |
color_mapping: Dict[str, str], | |
inputs: Dict[str, str], | |
intermediate_steps: List[Tuple[AgentAction, str]], | |
run_manager: Optional[AsyncCallbackManagerForChainRun] = None, | |
) -> Union[AgentFinish, List[Tuple[AgentAction, str]]]: | |
"""Take a single step in the thought-action-observation loop. | |
Override this to take control of how the agent makes and acts on choices. | |
""" | |
try: | |
intermediate_steps = self._prepare_intermediate_steps(intermediate_steps) | |
# Call the LLM to see what to do. | |
output = await self.agent.aplan( | |
intermediate_steps, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**inputs, | |
) | |
except OutputParserException as e: | |
if isinstance(self.handle_parsing_errors, bool): | |
raise_error = not self.handle_parsing_errors | |
else: | |
raise_error = False | |
if raise_error: | |
raise ValueError( | |
"An output parsing error occurred. " | |
"In order to pass this error back to the agent and have it try " | |
"again, pass `handle_parsing_errors=True` to the AgentExecutor. " | |
f"This is the error: {str(e)}" | |
) | |
text = str(e) | |
if isinstance(self.handle_parsing_errors, bool): | |
if e.send_to_llm: | |
observation = str(e.observation) | |
text = str(e.llm_output) | |
else: | |
observation = "Invalid or incomplete response" | |
elif isinstance(self.handle_parsing_errors, str): | |
observation = self.handle_parsing_errors | |
elif callable(self.handle_parsing_errors): | |
observation = self.handle_parsing_errors(e) | |
else: | |
raise ValueError("Got unexpected type of `handle_parsing_errors`") | |
output = AgentAction("_Exception", observation, text) | |
tool_run_kwargs = self.agent.tool_run_logging_kwargs() | |
observation = await ExceptionTool().arun( | |
output.tool_input, | |
verbose=self.verbose, | |
color=None, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**tool_run_kwargs, | |
) | |
return [(output, observation)] | |
# If the tool chosen is the finishing tool, then we end and return. | |
if isinstance(output, AgentFinish): | |
return output | |
actions: List[AgentAction] | |
if isinstance(output, AgentAction): | |
actions = [output] | |
else: | |
actions = output | |
async def _aperform_agent_action( | |
agent_action: AgentAction, | |
) -> Tuple[AgentAction, str]: | |
if run_manager: | |
await run_manager.on_agent_action( | |
agent_action, verbose=self.verbose, color="green" | |
) | |
# Otherwise we lookup the tool | |
if agent_action.tool in name_to_tool_map: | |
tool = name_to_tool_map[agent_action.tool] | |
return_direct = tool.return_direct | |
color = color_mapping[agent_action.tool] | |
tool_run_kwargs = self.agent.tool_run_logging_kwargs() | |
if return_direct: | |
tool_run_kwargs["llm_prefix"] = "" | |
# We then call the tool on the tool input to get an observation | |
observation = await tool.arun( | |
agent_action.tool_input, | |
verbose=self.verbose, | |
color=color, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**tool_run_kwargs, | |
) | |
else: | |
tool_run_kwargs = self.agent.tool_run_logging_kwargs() | |
observation = await InvalidTool().arun( | |
{ | |
"requested_tool_name": agent_action.tool, | |
"available_tool_names": list(name_to_tool_map.keys()), | |
}, | |
verbose=self.verbose, | |
color=None, | |
callbacks=run_manager.get_child() if run_manager else None, | |
**tool_run_kwargs, | |
) | |
return agent_action, observation | |
# Use asyncio.gather to run multiple tool.arun() calls concurrently | |
result = await asyncio.gather( | |
*[_aperform_agent_action(agent_action) for agent_action in actions] | |
) | |
return list(result) | |
def _call( | |
self, | |
inputs: Dict[str, str], | |
run_manager: Optional[CallbackManagerForChainRun] = None, | |
) -> Dict[str, Any]: | |
"""Run text through and get agent response.""" | |
# Construct a mapping of tool name to tool for easy lookup | |
name_to_tool_map = {tool.name: tool for tool in self.tools} | |
# We construct a mapping from each tool to a color, used for logging. | |
color_mapping = get_color_mapping( | |
[tool.name for tool in self.tools], excluded_colors=["green", "red"] | |
) | |
intermediate_steps: List[Tuple[AgentAction, str]] = [] | |
# Let's start tracking the number of iterations and time elapsed | |
iterations = 0 | |
time_elapsed = 0.0 | |
start_time = time.time() | |
# We now enter the agent loop (until it returns something). | |
while self._should_continue(iterations, time_elapsed): | |
next_step_output = self._take_next_step( | |
name_to_tool_map, | |
color_mapping, | |
inputs, | |
intermediate_steps, | |
run_manager=run_manager, | |
) | |
if isinstance(next_step_output, AgentFinish): | |
return self._return( | |
next_step_output, intermediate_steps, run_manager=run_manager | |
) | |
intermediate_steps.extend(next_step_output) | |
if len(next_step_output) == 1: | |
next_step_action = next_step_output[0] | |
# See if tool should return directly | |
tool_return = self._get_tool_return(next_step_action) | |
if tool_return is not None: | |
return self._return( | |
tool_return, intermediate_steps, run_manager=run_manager | |
) | |
iterations += 1 | |
time_elapsed = time.time() - start_time | |
output = self.agent.return_stopped_response( | |
self.early_stopping_method, intermediate_steps, **inputs | |
) | |
return self._return(output, intermediate_steps, run_manager=run_manager) | |
async def _acall( | |
self, | |
inputs: Dict[str, str], | |
run_manager: Optional[AsyncCallbackManagerForChainRun] = None, | |
) -> Dict[str, str]: | |
"""Run text through and get agent response.""" | |
# Construct a mapping of tool name to tool for easy lookup | |
name_to_tool_map = {tool.name: tool for tool in self.tools} | |
# We construct a mapping from each tool to a color, used for logging. | |
color_mapping = get_color_mapping( | |
[tool.name for tool in self.tools], excluded_colors=["green"] | |
) | |
intermediate_steps: List[Tuple[AgentAction, str]] = [] | |
# Let's start tracking the number of iterations and time elapsed | |
iterations = 0 | |
time_elapsed = 0.0 | |
start_time = time.time() | |
# We now enter the agent loop (until it returns something). | |
async with asyncio_timeout(self.max_execution_time): | |
try: | |
while self._should_continue(iterations, time_elapsed): | |
next_step_output = await self._atake_next_step( | |
name_to_tool_map, | |
color_mapping, | |
inputs, | |
intermediate_steps, | |
run_manager=run_manager, | |
) | |
if isinstance(next_step_output, AgentFinish): | |
return await self._areturn( | |
next_step_output, | |
intermediate_steps, | |
run_manager=run_manager, | |
) | |
intermediate_steps.extend(next_step_output) | |
if len(next_step_output) == 1: | |
next_step_action = next_step_output[0] | |
# See if tool should return directly | |
tool_return = self._get_tool_return(next_step_action) | |
if tool_return is not None: | |
return await self._areturn( | |
tool_return, intermediate_steps, run_manager=run_manager | |
) | |
iterations += 1 | |
time_elapsed = time.time() - start_time | |
output = self.agent.return_stopped_response( | |
self.early_stopping_method, intermediate_steps, **inputs | |
) | |
return await self._areturn( | |
output, intermediate_steps, run_manager=run_manager | |
) | |
except TimeoutError: | |
# stop early when interrupted by the async timeout | |
output = self.agent.return_stopped_response( | |
self.early_stopping_method, intermediate_steps, **inputs | |
) | |
return await self._areturn( | |
output, intermediate_steps, run_manager=run_manager | |
) | |
def _get_tool_return( | |
self, next_step_output: Tuple[AgentAction, str] | |
) -> Optional[AgentFinish]: | |
"""Check if the tool is a returning tool.""" | |
agent_action, observation = next_step_output | |
name_to_tool_map = {tool.name: tool for tool in self.tools} | |
return_value_key = "output" | |
if len(self.agent.return_values) > 0: | |
return_value_key = self.agent.return_values[0] | |
# Invalid tools won't be in the map, so we return False. | |
if agent_action.tool in name_to_tool_map: | |
if name_to_tool_map[agent_action.tool].return_direct: | |
return AgentFinish( | |
{return_value_key: observation}, | |
"", | |
) | |
return None | |
def _prepare_intermediate_steps( | |
self, intermediate_steps: List[Tuple[AgentAction, str]] | |
) -> List[Tuple[AgentAction, str]]: | |
if ( | |
isinstance(self.trim_intermediate_steps, int) | |
and self.trim_intermediate_steps > 0 | |
): | |
return intermediate_steps[-self.trim_intermediate_steps :] | |
elif callable(self.trim_intermediate_steps): | |
return self.trim_intermediate_steps(intermediate_steps) | |
else: | |
return intermediate_steps | |