File size: 3,911 Bytes
46dd24e
 
 
 
 
 
 
80c0c76
a5ead65
46dd24e
 
 
a8d670e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80c0c76
a8d670e
 
46dd24e
 
 
 
 
 
 
 
 
 
 
 
a8d670e
 
 
 
 
 
46dd24e
 
 
 
 
 
 
 
 
 
 
 
a8d670e
 
 
 
 
 
 
46dd24e
 
 
 
 
 
 
 
 
 
a5ead65
a8d670e
 
 
 
 
 
 
a5ead65
46dd24e
a5ead65
11dd0a0
a5ead65
 
 
 
11dd0a0
46dd24e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from __future__ import annotations
from copy import deepcopy
from typing import Any, Dict

import hydra
from langchain.tools import BaseTool

from aiflows.base_flows import AtomicFlow
from aiflows.messages import FlowMessage


class LCToolFlow(AtomicFlow):
    r""" A flow that runs a tool using langchain. For example, a tool could be to excute a query with the duckduckgo search engine.
    
    *Configuration Parameters*:
    
    - `name` (str): The name of the flow. Default: "search"
    - `description` (str): A description of the flow. This description is used to generate the help message of the flow.
    Default: "useful when you need to look for the answer online, especially for recent events."
    - `keep_raw_response` (bool): If True, the raw response of the tool is kept. Default: False
    - `clear_flow_namespase_on_run_end` (bool): If True, the flow namespace is cleared at the end of the run. Default: False
    - `backend` (Dict[str, Any]): The configuration of the backend. Default: langchain.tools.DuckDuckGoSearchRun
    - Other parameters are inherited from the default configuration of AtomicFlow (see AtomicFlow)
    
    *Input Interface*:
    
    - `query` (str): the query to run the tool on
        
    *Output Interface*:
    
    - `observation` (str): the observation returned by the tool
        
    :param backend: The backend of the flow. It is a tool that is run by the flow. (e.g. duckduckgo search engine)
    :type backend: BaseTool
    :param \**kwargs: Additional arguments to pass to the flow. See :class:`aiflows.base_flows.AtomicFlow` for more details.
    """
    
    REQUIRED_KEYS_CONFIG = ["backend"]

    SUPPORTS_CACHING: bool = False

    backend: BaseTool

    def __init__(self, backend: BaseTool, **kwargs) -> None:
        super().__init__(**kwargs)
        self.backend = backend
        
    @classmethod
    def _set_up_backend(cls, config: Dict[str, Any]) -> BaseTool:
        """ This method sets up the backend of the flow.
        
        :param config: The configuration of the backend.
        :type config: Dict[str, Any]
        :return: The backend of the flow.
        """
        if config["_target_"].startswith("."):
            # assumption: cls is associated with relative data_transformation_configs
            # for example, CF_Code and CF_Code.yaml should be in the same directory,
            # and all _target_ in CF_Code.yaml should be relative
            cls_parent_module = ".".join(cls.__module__.split(".")[:-1])
            config["_target_"] = cls_parent_module + config["_target_"]
        tool = hydra.utils.instantiate(config, _convert_="partial")

        return tool

    @classmethod
    def instantiate_from_config(cls, config: Dict[str, Any]) -> LCToolFlow:
        """ This method instantiates the flow from a configuration file
        
        :param config: The configuration of the flow.
        :type config: Dict[str, Any]
        :return: The instantiated flow.
        :rtype: LCToolFlow
        """
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Set up LangChain backend ~~~
        kwargs["backend"] = cls._set_up_backend(config["backend"])

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)

    def run(self, input_message: FlowMessage):
        """ This method runs the flow. It runs the backend on the input data.
        
        :param input_data: The input data of the flow.
        :type input_data: Dict[str, Any]
        :return: The output data of the flow.
        :rtype: Dict[str, Any]
        """
        input_data = input_message.data
        observation = self.backend.run(tool_input=input_data)
        
        reply = self.package_output_message(
            input_message=input_message,
            response = {"observation": observation}
        )
        
        self.send_message(reply, is_reply=True)