|
import sys |
|
from typing import Dict, Any |
|
|
|
from aiflows.utils import logging |
|
|
|
logging.set_verbosity_debug() |
|
|
|
log = logging.get_logger(__name__) |
|
from aiflows.interfaces import KeyInterface |
|
from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerExecutorFlow |
|
|
|
class AutoGPTFlow(ControllerExecutorFlow): |
|
""" This class implements a (very basic) AutoGPT flow. It is a flow that consists of multiple sub-flows that are executed circularly. It Contains the following subflows: |
|
|
|
- A Controller Flow: A Flow that controls which subflow of the Executor Flow to execute next. |
|
- A Memory Flow: A Flow used to save and retrieve messages or memories which might be useful for the Controller Flow. |
|
- A HumanFeedback Flow: A flow use to get feedback from the user/human. |
|
- A Executor Flow: A Flow that executes commands generated by the Controller Flow. Typically it's a branching flow (see BranchingFlow) and the commands are which branch to execute next. |
|
|
|
An illustration of the flow is as follows: |
|
|
|
| -------> Memory Flow -------> Controller Flow ------->| |
|
^ | |
|
| | |
|
| v |
|
| <----- HumanFeedback Flow <------- Executor Flow <----| |
|
|
|
*Configuration Parameters*: |
|
|
|
- `name` (str): The name of the flow. Default is "AutoGPTFlow". |
|
- `description` (str): A description of the flow. Default is "An example implementation of AutoGPT with Flows." |
|
- `max_rounds` (int): The maximum number of rounds the circular flow can run for. Default is 30. |
|
- `early_exit_key` (str): The key that is used to terminate the flow early. Default is "EARLY_EXIT". |
|
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default: |
|
- `Controller` (Dict[str,Any]): The configuration of the Controller Flow. By default the controller flow is a ControllerAtomicFlow (see ControllerExecutorFlowModule). It's default values are |
|
defined in ControllerAtomicFlow.yaml of the ControllerExecutorFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: |
|
- `finish` (Dict[str,Any]): The configuration of the finish command (used to terminate the flow early when the controller has accomplished its goal). |
|
- `description` (str): The description of the command. Default is "The finish command is used to terminate the flow early when the controller has accomplished its goal." |
|
- `input_args` (List[str]): The list of expected keys to run the finish command. Default is ["answer"]. |
|
- `human_message_prompt_template`(Dict[str,Any]): The prompt template used to generate the message that is shown to the user/human when the finish command is executed. Default is: |
|
- `template` (str): The template of the humand message prompt (see AutoGPTFlow.yaml for default template) |
|
- `input_variables` (List[str]): The list of variables to be included in the template. Default is ["observation", "human_feedback", "memory"]. |
|
- `ìnput_interface_initialized` (List[str]): The input interface that Controller Flow expects except for the first time in the flow. Default is ["observation", "human_feedback", "memory"]. |
|
- `Executor` (Dict[str,Any]): The configuration of the Executor Flow. By default the executor flow is a Branching Flow (see BranchingFlow). It's default values are the default values of the BranchingFlow. Fields to define: |
|
- `subflows_config` (Dict[str,Any]): A Dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. Each subflow is a branch of the branching flow. |
|
- `HumanFeedback` (Dict[str,Any]): The configuration of the HumanFeedback Flow. By default the human feedback flow is a HumanStandardInputFlow (see HumanStandardInputFlowModule ). |
|
It's default values are specified in the REAMDE.md of HumanStandardInputFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: |
|
- `request_multi_line_input_flag` (bool): Flag to request multi-line input. Default is False. |
|
- `query_message_prompt_template` (Dict[str,Any]): The prompt template presented to the user/human to request input. Default is: |
|
- `template` (str): The template of the query message prompt (see AutoGPTFlow.yaml for default template) |
|
- `input_variables` (List[str]): The list of variables to be included in the template. Default is ["goal","command","command_args",observation"] |
|
- input_interface_initialized (List[str]): The input interface that HumanFeeback Flow expects except for the first time in the flow. Default is ["goal","command","command_args",observation"] |
|
- `Memory` (Dict[str,Any]): The configuration of the Memory Flow. By default the memory flow is a ChromaDBFlow (see VectorStoreFlowModule). It's default values are defined in ChromaDBFlow.yaml of the VectorStoreFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: |
|
- `n_results`: The number of results to retrieve from the memory. Default is 2. |
|
- `topology` (List[Dict[str,Any]]): The topology of the flow which is "circular". By default, the topology is the one shown in the illustration above (the topology is also described in AutoGPTFlow.yaml). |
|
|
|
|
|
*Input Interface*: |
|
|
|
- `goal` (str): The goal of the flow. |
|
|
|
*Output Interface*: |
|
|
|
- `answer` (str): The answer of the flow. |
|
- `status` (str): The status of the flow. It can be "finished" or "unfinished". |
|
|
|
:param flow_config: The configuration of the flow. Contains the parameters described above and the parameters required by the parent class (CircularFlow). |
|
:type flow_config: Dict[str,Any] |
|
:param subflows: A list of subflows constituating the circular flow. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config). |
|
:type subflows: List[Flow] |
|
""" |
|
|
|
|
|
def __init__(self, **kwargs): |
|
super().__init__( **kwargs) |
|
self.rename_human_output_interface = KeyInterface( |
|
keys_to_rename={"human_input": "human_feedback"} |
|
) |
|
|
|
self.input_interface_controller = KeyInterface( |
|
keys_to_select = ["goal","observation","human_feedback", "memory"], |
|
) |
|
|
|
self.input_interface_human_feedback = KeyInterface( |
|
keys_to_select = ["goal","command","command_args","observation"], |
|
) |
|
|
|
self.memory_read_ouput_interface = KeyInterface( |
|
additional_transformations = [self.prepare_memory_read_output], |
|
keys_to_select = ["memory"], |
|
) |
|
|
|
self.human_feedback_ouput_interface = KeyInterface( |
|
keys_to_rename={"human_input": "human_feedback"}, |
|
keys_to_select = ["human_feedback"], |
|
) |
|
|
|
self.next_flow_to_call = { |
|
None: "MemoryRead", |
|
"MemoryRead": "Controller", |
|
"Controller": "Executor", |
|
"Executor": "HumanFeedback", |
|
"HumanFeedback": "MemoryWrite", |
|
"MemoryWrite": "MemoryRead", |
|
} |
|
|
|
def set_up_flow_state(self): |
|
super().set_up_flow_state() |
|
self.flow_state["early_exit_flag"] = False |
|
|
|
def prepare_memory_read_output(self,data_dict: Dict[str, Any],**kwargs): |
|
|
|
retrieved_memories = data_dict["retrieved"][0][1:] |
|
return {"memory": "\n".join(retrieved_memories)} |
|
|
|
def _get_memory_key(self): |
|
""" This method returns the memory key that is used to retrieve memories from the ChromaDB model. |
|
|
|
:param flow_state: The state of the flow |
|
:type flow_state: Dict[str, Any] |
|
:return: The current context |
|
:rtype: str |
|
""" |
|
goal = self.flow_state.get("goal") |
|
last_command = self.flow_state.get("command") |
|
last_command_args = self.flow_state.get("command_args") |
|
last_observation = self.flow_state.get("observation") |
|
last_human_feedback = self.flow_state.get("human_feedback") |
|
|
|
if last_command is None: |
|
return "" |
|
|
|
assert goal is not None, goal |
|
assert last_command_args is not None, last_command_args |
|
assert last_observation is not None, last_observation |
|
|
|
current_context = \ |
|
f""" |
|
== Goal == |
|
{goal} |
|
|
|
== Command == |
|
{last_command} |
|
== Args |
|
{last_command_args} |
|
== Result |
|
{last_observation} |
|
|
|
== Human Feedback == |
|
{last_human_feedback} |
|
""" |
|
|
|
return current_context |
|
|
|
def prepare_memory_read_input(self) -> Dict[str, Any]: |
|
|
|
query = self._get_memory_key() |
|
|
|
return { |
|
"operation": "read", |
|
"content": query |
|
} |
|
|
|
def prepare_memory_write_input(self) -> Dict[str, Any]: |
|
|
|
query = self._get_memory_key() |
|
|
|
return { |
|
"operation": "write", |
|
"content": str(query) |
|
} |
|
|
|
|
|
def call_memory_read(self): |
|
memory_read_input = self.prepare_memory_read_input() |
|
|
|
message = self._package_input_message( |
|
data = memory_read_input, |
|
dst_flow = "Memory", |
|
) |
|
|
|
self.subflows["Memory"].send_message_async( |
|
message, |
|
pipe_to = self.flow_config["flow_ref"] |
|
) |
|
|
|
def call_memory_write(self): |
|
memory_write_input = self.prepare_memory_write_input() |
|
|
|
message = self._package_input_message( |
|
data = memory_write_input, |
|
dst_flow = "Memory", |
|
) |
|
|
|
self.subflows["Memory"].send_message_async( |
|
message, |
|
pipe_to = self.flow_config["flow_ref"] |
|
) |
|
|
|
def call_human_feedback(self): |
|
|
|
message = self._package_input_message( |
|
data = self.input_interface_human_feedback(self.flow_state), |
|
dst_flow = "HumanFeedback", |
|
) |
|
|
|
self.subflows["HumanFeedback"].send_message_async( |
|
message, |
|
pipe_to = self.flow_config["flow_ref"] |
|
) |
|
|
|
def register_data_to_state(self, input_message): |
|
|
|
|
|
|
|
|
|
|
|
|
|
last_called = self.flow_state["last_called"] |
|
|
|
if last_called is None: |
|
self.flow_state["input_message"] = input_message |
|
self.flow_state["goal"] = input_message.data["goal"] |
|
|
|
elif last_called == "Executor": |
|
self.flow_state["observation"] = input_message.data |
|
|
|
elif last_called == "Controller": |
|
self._state_update_dict( |
|
{ |
|
"command": input_message.data["command"], |
|
"command_args": input_message.data["command_args"] |
|
} |
|
) |
|
|
|
|
|
if self.flow_state["command"] == "finish": |
|
|
|
self._state_update_dict( |
|
{ |
|
"EARLY_EXIT": True, |
|
"answer": self.flow_state["command_args"]["answer"], |
|
"status": "finished" |
|
} |
|
) |
|
self.flow_state["early_exit_flag"] = True |
|
|
|
|
|
elif last_called == "MemoryRead": |
|
self._state_update_dict( |
|
self.memory_read_ouput_interface(input_message).data |
|
) |
|
|
|
elif last_called == "HumanFeedback": |
|
self._state_update_dict( |
|
self.human_feedback_ouput_interface(input_message).data |
|
) |
|
|
|
|
|
if self.flow_state["human_feedback"].strip().lower() == "q": |
|
|
|
self._state_update_dict( |
|
{ |
|
"EARLY_EXIT": True, |
|
"answer": "The user has chosen to exit before a final answer was generated.", |
|
"status": "unfinished", |
|
} |
|
) |
|
|
|
self.flow_state["early_exit_flag"] = True |
|
|
|
|
|
|
|
|
|
|
|
def run(self,input_message): |
|
|
|
self.register_data_to_state(input_message) |
|
|
|
flow_to_call = self.get_next_flow_to_call() |
|
|
|
if self.flow_state.get("early_exit_flag",False): |
|
self.generate_reply() |
|
|
|
elif flow_to_call == "MemoryRead": |
|
self.call_memory_read() |
|
|
|
elif flow_to_call == "Controller": |
|
self.call_controller() |
|
|
|
elif flow_to_call == "Executor": |
|
self.call_executor() |
|
|
|
elif flow_to_call == "HumanFeedback": |
|
self.call_human_feedback() |
|
|
|
elif flow_to_call == "MemoryWrite": |
|
self.call_memory_write() |
|
self.flow_state["current_round"] += 1 |
|
|
|
else: |
|
self._on_reach_max_round() |
|
self.generate_reply() |
|
|
|
self.flow_state["last_called"] = self.get_next_flow_to_call() |
|
|
|
|