File size: 8,923 Bytes
cbb225c 5fbaf2a 93c74cc cbb225c 1beb8a2 5fbaf2a cbb225c 5fbaf2a 40a0d94 e794544 1beb8a2 e794544 1beb8a2 e794544 5fbaf2a 1beb8a2 5fbaf2a 1beb8a2 7ba1689 1beb8a2 5fbaf2a 1beb8a2 e794544 1beb8a2 e794544 1beb8a2 5fbaf2a 1beb8a2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
from typing import Dict, Any
from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging
from .ControllerAtomicFlow import ControllerAtomicFlow
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
class ControllerExecutorFlow(CompositeFlow):
""" This class implements a ControllerExecutorFlow. It's composed of a ControllerAtomicFlow and an ExecutorFlow.
Where typically the ControllerAtomicFlow is uses a LLM to decide which command to call and the ExecutorFlow (branching flow) is used to execute the command.
It contains the following subflows:
- A Controller Atomic Flow: It is a flow that to decides which command to get closer to completing it's task of accomplishing a given goal.
- An Executor Flow: It is a branching flow that uses the executes the command instructed by the ControllerAtomicFlow.
An illustration of the flow is as follows:
goal -----|-----> ControllerFlow----->|-----> (anwser,status)
^ |
| |
| v
|<----- ExecutorFlow <------|
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "CtrlEx"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "ControllerExecutor (i.e., MRKL, ReAct) interaction implementation with Flows
that approaches the problem solving in two phases: one Flow chooses the next step and another Flow executes it.
This is repeated until the controller Flow concludes on an answer."
- `max_rounds` (int): The maximum number of rounds the flow can run for.
Default: 30.
- `subflows_config` (Dict[str,Any]): A dictionary of the subflows configurations. Default:
- `Controller`: The configuration of the Controller Flow. By default, it a ControllerAtomicFlow. Default parameters:
- `finish` (Dict[str,Any]): The configuration of the finish command. Default parameters:
- `description` (str): The description of the command.
Default: "Signal that the objective has been satisfied, and returns the answer to the user."
- `input_args` (List[str]): The input arguments of the command. Default: ["answer"]
- All other parameters are inherited from the default configuration of ControllerAtomicFlow (see ControllerAtomicFlow)
- `Executor`: The configuration of the Executor Flow. By default, it's a BranchingFlow. There are no default parameters, the flow
parameter to to be defined is:
- `subflows_config` (Dict[str,Any]): A dictionary of the configuration of the subflows of the branching flow.
These subflows are typically also the possible commands of the Controller Flow. Default: []
- `early_exit_key` (str): The key that is used to exit the flow. Default: "EARLY_EXIT"
- `topology` (str): The topology of the flow which is "circular".
By default, the topology is the one shown in the illustration above
(the topology is also described in ControllerExecutorFlow.yaml).
*Input Interface*:
- `goal` (str): The goal of the controller. Usually asked by the user/human (e.g. "I want to know the occupation and birth date of Michael Jordan.")
*Output Interface*:
- `answer` (str): The answer of the flow to the query (e.g. "Michael Jordan is a basketball player and business man. He was born on February 17, 1963.")
- `status` (str): The status of the flow. It can be "finished" or "unfinished". If the status is "unfinished", it's usually because the maximum amount of rounds was reached before the model found an answer.
:param flow_config: The configuration of the flow (see Configuration Parameters).
:param subflows: A list of subflows. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
"""
def __init__(self,**kwargs):
super().__init__(**kwargs)
self.input_interface_controller = KeyInterface(
keys_to_select = ["goal","observation"],
)
self.input_interface_first_round_controller = KeyInterface(
keys_to_select = ["goal"],
)
self.reply_interface = KeyInterface(
keys_to_select = ["answer","status", "EARLY_EXIT"],
)
self.next_flow_to_call = {
None: "Controller",
"Controller": "Executor",
"Executor": "Controller"
}
def generate_reply(self):
""" This method generates the reply of the flow. It's called when the flow is finished. """
reply = self._package_output_message(
input_message = self.flow_state["input_message"],
response = self.reply_interface(self.flow_state)
)
self.reply_to_message(reply,to=self.flow_state["input_message"])
def get_next_flow_to_call(self):
if self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]:
return None
return self.next_flow_to_call[self.flow_state["last_called"]]
def _on_reach_max_round(self):
""" This method is called when the flow reaches the maximum amount of rounds. It updates the state of the flow and starts the process of terminating the flow."""
self._state_update_dict({
"EARLY_EXIT": False,
"answer": "The maximum amount of rounds was reached before the model found an answer.",
"status": "unfinished"
})
def set_up_flow_state(self):
super().set_up_flow_state()
self.flow_state["last_called"] = None
self.flow_state["current_round"] = 0
def call_controller(self):
#first_round
if self.flow_state["last_called"] is None:
input_interface = self.input_interface_first_round_controller
else:
input_interface = self.input_interface_controller
message = self._package_input_message(
data = input_interface(self.flow_state),
dst_flow = "Controller"
)
self.subflows["Controller"].send_message_async(message, pipe_to= self.flow_config["flow_ref"])
def call_executor(self):
#detect and early exit
if self.flow_state["command"] == "finish":
self._state_update_dict(
{
"EARLY_EXIT": True,
"answer": self.flow_state["command_args"]["answer"],
"status": "finished"
}
)
self.generate_reply()
#call executor
else:
executor_branch_to_call = self.flow_state["command"]
message = self._package_input_message(
data = self.flow_state["command_args"],
dst_flow = executor_branch_to_call
)
self.subflows[executor_branch_to_call].send_message_async(message, pipe_to= self.flow_config["flow_ref"])
def register_data_to_state(self, input_message):
last_called = self.flow_state["last_called"]
if last_called is None:
self.flow_state["input_message"] = input_message
self.flow_state["goal"] = input_message.data["goal"]
elif last_called == "Executor":
self.flow_state["observation"] = input_message.data
else:
self._state_update_dict(
{
"command": input_message.data["command"],
"command_args": input_message.data["command_args"]
}
)
def run(self,input_message: FlowMessage):
""" Runs the WikiSearch Atomic Flow. It's used to execute a Wikipedia search and get page summaries.
:param input_message: The input message
:type input_message: FlowMessage
"""
self.register_data_to_state(input_message)
flow_to_call = self.get_next_flow_to_call()
if flow_to_call == "Controller":
self.call_controller()
elif flow_to_call == "Executor":
self.call_executor()
self.flow_state["current_round"] += 1
else:
self._on_reach_max_round()
self.generate_reply()
self.flow_state["last_called"] = flow_to_call
|