|
from __future__ import annotations |
|
from copy import deepcopy |
|
from typing import Any, Dict |
|
|
|
import hydra |
|
from langchain.tools import BaseTool |
|
|
|
from aiflows.base_flows import AtomicFlow |
|
from aiflows.messages import FlowMessage |
|
|
|
|
|
class LCToolFlow(AtomicFlow): |
|
r""" A flow that runs a tool using langchain. For example, a tool could be to excute a query with the duckduckgo search engine. |
|
|
|
*Configuration Parameters*: |
|
|
|
- `name` (str): The name of the flow. Default: "search" |
|
- `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
|
Default: "useful when you need to look for the answer online, especially for recent events." |
|
- `keep_raw_response` (bool): If True, the raw response of the tool is kept. Default: False |
|
- `clear_flow_namespase_on_run_end` (bool): If True, the flow namespace is cleared at the end of the run. Default: False |
|
- `backend` (Dict[str, Any]): The configuration of the backend. Default: langchain.tools.DuckDuckGoSearchRun |
|
- Other parameters are inherited from the default configuration of AtomicFlow (see AtomicFlow) |
|
|
|
*Input Interface*: |
|
|
|
- `query` (str): the query to run the tool on |
|
|
|
*Output Interface*: |
|
|
|
- `observation` (str): the observation returned by the tool |
|
|
|
:param backend: The backend of the flow. It is a tool that is run by the flow. (e.g. duckduckgo search engine) |
|
:type backend: BaseTool |
|
:param \**kwargs: Additional arguments to pass to the flow. See :class:`aiflows.base_flows.AtomicFlow` for more details. |
|
""" |
|
|
|
REQUIRED_KEYS_CONFIG = ["backend"] |
|
|
|
SUPPORTS_CACHING: bool = False |
|
|
|
backend: BaseTool |
|
|
|
def __init__(self, backend: BaseTool, **kwargs) -> None: |
|
super().__init__(**kwargs) |
|
self.backend = backend |
|
|
|
@classmethod |
|
def _set_up_backend(cls, config: Dict[str, Any]) -> BaseTool: |
|
""" This method sets up the backend of the flow. |
|
|
|
:param config: The configuration of the backend. |
|
:type config: Dict[str, Any] |
|
:return: The backend of the flow. |
|
""" |
|
if config["_target_"].startswith("."): |
|
|
|
|
|
|
|
cls_parent_module = ".".join(cls.__module__.split(".")[:-1]) |
|
config["_target_"] = cls_parent_module + config["_target_"] |
|
tool = hydra.utils.instantiate(config, _convert_="partial") |
|
|
|
return tool |
|
|
|
@classmethod |
|
def instantiate_from_config(cls, config: Dict[str, Any]) -> LCToolFlow: |
|
""" This method instantiates the flow from a configuration file |
|
|
|
:param config: The configuration of the flow. |
|
:type config: Dict[str, Any] |
|
:return: The instantiated flow. |
|
:rtype: LCToolFlow |
|
""" |
|
flow_config = deepcopy(config) |
|
|
|
kwargs = {"flow_config": flow_config} |
|
|
|
|
|
kwargs["backend"] = cls._set_up_backend(config["backend"]) |
|
|
|
|
|
return cls(**kwargs) |
|
|
|
def run(self, input_message: FlowMessage): |
|
""" This method runs the flow. It runs the backend on the input data. |
|
|
|
:param input_data: The input data of the flow. |
|
:type input_data: Dict[str, Any] |
|
:return: The output data of the flow. |
|
:rtype: Dict[str, Any] |
|
""" |
|
input_data = input_message.data |
|
observation = self.backend.run(tool_input=input_data) |
|
|
|
reply = self._package_output_message( |
|
input_message=input_message, |
|
response = {"observation": observation} |
|
) |
|
|
|
self.reply_to_message(reply=reply, to=input_message) |
|
|
|
|