File size: 4,893 Bytes
e67043b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import time
import types
from typing import Any, Dict, List, Tuple, Union
from langchain.agents import AgentExecutor
from langchain.input import get_color_mapping
from langchain.schema import AgentAction, AgentFinish
# from translator import Translator


# class AgentExecutorWithTranslation(AgentExecutor):
#     translator: Translator = Translator()

#     def prep_outputs(
#         self,
#         inputs: Dict[str, str],
#         outputs: Dict[str, str],
#         return_only_outputs: bool = False,
#     ) -> Dict[str, str]:
#         try:
#             outputs = super().prep_outputs(inputs, outputs, return_only_outputs)
#         except ValueError as e:
#             return outputs
#         else:
#             if "input" in outputs:
#                 outputs = self.translator(outputs)
#             return outputs


class Executor():
    def _call(self, inputs: Dict[str, str]) -> Dict[str, Any]:
        """Run text through and get agent response."""
        # Construct a mapping of tool name to tool for easy lookup
        name_to_tool_map = {tool.name: tool for tool in self.tools}
        # We construct a mapping from each tool to a color, used for logging.
        color_mapping = get_color_mapping(
            [tool.name for tool in self.tools], excluded_colors=["green"]
        )
        intermediate_steps: List[Tuple[AgentAction, str]] = []
        # Let's start tracking the iterations the agent has gone through
        iterations = 0
        time_elapsed = 0.0
        start_time = time.time()
        # We now enter the agent loop (until it returns something).
        while self._should_continue(iterations, time_elapsed):
            next_step_output = self._take_next_step(
                name_to_tool_map, color_mapping, inputs, intermediate_steps
            )
            if isinstance(next_step_output, AgentFinish):
                yield self._return(next_step_output, intermediate_steps)
                return

            for i, output in enumerate(next_step_output):
                agent_action = output[0]
                tool_logo = None
                for tool in self.tools:
                    if tool.name == agent_action.tool:
                        tool_logo = tool.tool_logo_md
                if isinstance(output[1], types.GeneratorType):
                    logo = f"{tool_logo}" if tool_logo is not None else ""
                    yield (
                        AgentAction("", agent_action.tool_input, agent_action.log),
                        f"Further use other tool {logo} to answer the question.",
                    )
                    for out in output[1]:
                        yield out
                    next_step_output[i] = (agent_action, out)
                else:
                    for tool in self.tools:
                        if tool.name == agent_action.tool:
                            yield (
                                AgentAction(
                                    tool_logo, agent_action.tool_input, agent_action.log
                                ),
                                output[1],
                            )

            intermediate_steps.extend(next_step_output)
            if len(next_step_output) == 1:
                next_step_action = next_step_output[0]
                # See if tool should return directly
                tool_return = self._get_tool_return(next_step_action)
                if tool_return is not None:
                    yield self._return(tool_return, intermediate_steps)
                    return
            iterations += 1
            time_elapsed = time.time() - start_time
        output = self.agent.return_stopped_response(
            self.early_stopping_method, intermediate_steps, **inputs
        )
        yield self._return(output, intermediate_steps)
        return

    def __call__(
        self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False
    ) -> Dict[str, Any]:
        """Run the logic of this chain and add to output if desired.

        Args:
            inputs: Dictionary of inputs, or single input if chain expects
                only one param.
            return_only_outputs: boolean for whether to return only outputs in the
                response. If True, only new keys generated by this chain will be
                returned. If False, both input keys and new keys generated by this
                chain will be returned. Defaults to False.

        """
        inputs = self.prep_inputs(inputs)

        try:
            for output in self._call(inputs):
                if type(output) is dict:
                    output = self.prep_outputs(inputs, output, return_only_outputs)
                yield output
        except (KeyboardInterrupt, Exception) as e:
            raise e
        return self.prep_outputs(inputs, output, return_only_outputs)
        return output