AutoGPTFlowModule / AutoGPTFlow.py
nbaldwin's picture
coflows update
e511f35
raw
history blame
13.8 kB
import sys
from typing import Dict, Any
from aiflows.utils import logging
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
from aiflows.interfaces import KeyInterface
from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerExecutorFlow
class AutoGPTFlow(ControllerExecutorFlow):
""" This class implements a (very basic) AutoGPT flow. It is a flow that consists of multiple sub-flows that are executed circularly. It Contains the following subflows:
- A Controller Flow: A Flow that controls which subflow of the Executor Flow to execute next.
- A Memory Flow: A Flow used to save and retrieve messages or memories which might be useful for the Controller Flow.
- A HumanFeedback Flow: A flow use to get feedback from the user/human.
- A Executor Flow: A Flow that executes commands generated by the Controller Flow. Typically it's a branching flow (see BranchingFlow) and the commands are which branch to execute next.
An illustration of the flow is as follows:
| -------> Memory Flow -------> Controller Flow ------->|
^ |
| |
| v
| <----- HumanFeedback Flow <------- Executor Flow <----|
*Configuration Parameters*:
- `name` (str): The name of the flow. Default is "AutoGPTFlow".
- `description` (str): A description of the flow. Default is "An example implementation of AutoGPT with Flows."
- `max_rounds` (int): The maximum number of rounds the circular flow can run for. Default is 30.
- `early_exit_key` (str): The key that is used to terminate the flow early. Default is "EARLY_EXIT".
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default:
- `Controller` (Dict[str,Any]): The configuration of the Controller Flow. By default the controller flow is a ControllerAtomicFlow (see ControllerExecutorFlowModule). It's default values are
defined in ControllerAtomicFlow.yaml of the ControllerExecutorFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml:
- `finish` (Dict[str,Any]): The configuration of the finish command (used to terminate the flow early when the controller has accomplished its goal).
- `description` (str): The description of the command. Default is "The finish command is used to terminate the flow early when the controller has accomplished its goal."
- `input_args` (List[str]): The list of expected keys to run the finish command. Default is ["answer"].
- `human_message_prompt_template`(Dict[str,Any]): The prompt template used to generate the message that is shown to the user/human when the finish command is executed. Default is:
- `template` (str): The template of the humand message prompt (see AutoGPTFlow.yaml for default template)
- `input_variables` (List[str]): The list of variables to be included in the template. Default is ["observation", "human_feedback", "memory"].
- `ìnput_interface_initialized` (List[str]): The input interface that Controller Flow expects except for the first time in the flow. Default is ["observation", "human_feedback", "memory"].
- `Executor` (Dict[str,Any]): The configuration of the Executor Flow. By default the executor flow is a Branching Flow (see BranchingFlow). It's default values are the default values of the BranchingFlow. Fields to define:
- `subflows_config` (Dict[str,Any]): A Dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. Each subflow is a branch of the branching flow.
- `HumanFeedback` (Dict[str,Any]): The configuration of the HumanFeedback Flow. By default the human feedback flow is a HumanStandardInputFlow (see HumanStandardInputFlowModule ).
It's default values are specified in the REAMDE.md of HumanStandardInputFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml:
- `request_multi_line_input_flag` (bool): Flag to request multi-line input. Default is False.
- `query_message_prompt_template` (Dict[str,Any]): The prompt template presented to the user/human to request input. Default is:
- `template` (str): The template of the query message prompt (see AutoGPTFlow.yaml for default template)
- `input_variables` (List[str]): The list of variables to be included in the template. Default is ["goal","command","command_args",observation"]
- input_interface_initialized (List[str]): The input interface that HumanFeeback Flow expects except for the first time in the flow. Default is ["goal","command","command_args",observation"]
- `Memory` (Dict[str,Any]): The configuration of the Memory Flow. By default the memory flow is a ChromaDBFlow (see VectorStoreFlowModule). It's default values are defined in ChromaDBFlow.yaml of the VectorStoreFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml:
- `n_results`: The number of results to retrieve from the memory. Default is 2.
- `topology` (List[Dict[str,Any]]): The topology of the flow which is "circular". By default, the topology is the one shown in the illustration above (the topology is also described in AutoGPTFlow.yaml).
*Input Interface*:
- `goal` (str): The goal of the flow.
*Output Interface*:
- `answer` (str): The answer of the flow.
- `status` (str): The status of the flow. It can be "finished" or "unfinished".
:param flow_config: The configuration of the flow. Contains the parameters described above and the parameters required by the parent class (CircularFlow).
:type flow_config: Dict[str,Any]
:param subflows: A list of subflows constituating the circular flow. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config).
:type subflows: List[Flow]
"""
def __init__(self, **kwargs):
super().__init__( **kwargs)
self.rename_human_output_interface = KeyInterface(
keys_to_rename={"human_input": "human_feedback"}
)
self.input_interface_controller = KeyInterface(
keys_to_select = ["goal","observation","human_feedback", "memory"],
)
self.input_interface_human_feedback = KeyInterface(
keys_to_select = ["goal","command","command_args","observation"],
)
self.memory_read_ouput_interface = KeyInterface(
additional_transformations = [self.prepare_memory_read_output],
keys_to_select = ["memory"],
)
self.human_feedback_ouput_interface = KeyInterface(
keys_to_rename={"human_input": "human_feedback"},
keys_to_select = ["human_feedback"],
)
self.next_flow_to_call = {
None: "MemoryRead",
"MemoryRead": "Controller",
"Controller": "Executor",
"Executor": "HumanFeedback",
"HumanFeedback": "MemoryWrite",
"MemoryWrite": "MemoryRead",
}
def set_up_flow_state(self):
super().set_up_flow_state()
self.flow_state["early_exit_flag"] = False
def prepare_memory_read_output(self,data_dict: Dict[str, Any],**kwargs):
retrieved_memories = data_dict["retrieved"][0][1:]
return {"memory": "\n".join(retrieved_memories)}
def _get_memory_key(self):
""" This method returns the memory key that is used to retrieve memories from the ChromaDB model.
:param flow_state: The state of the flow
:type flow_state: Dict[str, Any]
:return: The current context
:rtype: str
"""
goal = self.flow_state.get("goal")
last_command = self.flow_state.get("command")
last_command_args = self.flow_state.get("command_args")
last_observation = self.flow_state.get("observation")
last_human_feedback = self.flow_state.get("human_feedback")
if last_command is None:
return ""
assert goal is not None, goal
assert last_command_args is not None, last_command_args
assert last_observation is not None, last_observation
current_context = \
f"""
== Goal ==
{goal}
== Command ==
{last_command}
== Args
{last_command_args}
== Result
{last_observation}
== Human Feedback ==
{last_human_feedback}
"""
return current_context
def prepare_memory_read_input(self) -> Dict[str, Any]:
query = self._get_memory_key()
return {
"operation": "read",
"content": query
}
def prepare_memory_write_input(self) -> Dict[str, Any]:
query = self._get_memory_key()
return {
"operation": "write",
"content": str(query)
}
def call_memory_read(self):
memory_read_input = self.prepare_memory_read_input()
message = self._package_input_message(
data = memory_read_input,
dst_flow = "Memory",
)
self.subflows["Memory"].send_message_async(
message,
pipe_to = self.flow_config["flow_ref"]
)
def call_memory_write(self):
memory_write_input = self.prepare_memory_write_input()
message = self._package_input_message(
data = memory_write_input,
dst_flow = "Memory",
)
self.subflows["Memory"].send_message_async(
message,
pipe_to = self.flow_config["flow_ref"]
)
def call_human_feedback(self):
message = self._package_input_message(
data = self.input_interface_human_feedback(self.flow_state),
dst_flow = "HumanFeedback",
)
self.subflows["HumanFeedback"].send_message_async(
message,
pipe_to = self.flow_config["flow_ref"]
)
def register_data_to_state(self, input_message):
#Making this explicit so it's easier to understand
#I'm also showing different ways of writing to the state
# either programmatically or using the _state_update_dict and
# input and ouput interface methods
last_called = self.flow_state["last_called"]
if last_called is None:
self.flow_state["input_message"] = input_message
self.flow_state["goal"] = input_message.data["goal"]
elif last_called == "Executor":
self.flow_state["observation"] = input_message.data
elif last_called == "Controller":
self._state_update_dict(
{
"command": input_message.data["command"],
"command_args": input_message.data["command_args"]
}
)
#detect and early exit
if self.flow_state["command"] == "finish":
self._state_update_dict(
{
"EARLY_EXIT": True,
"answer": self.flow_state["command_args"]["answer"],
"status": "finished"
}
)
self.flow_state["early_exit_flag"] = True
elif last_called == "MemoryRead":
self._state_update_dict(
self.memory_read_ouput_interface(input_message).data
)
elif last_called == "HumanFeedback":
self._state_update_dict(
self.human_feedback_ouput_interface(input_message).data
)
#detect early exit
if self.flow_state["human_feedback"].strip().lower() == "q":
self._state_update_dict(
{
"EARLY_EXIT": True,
"answer": "The user has chosen to exit before a final answer was generated.",
"status": "unfinished",
}
)
self.flow_state["early_exit_flag"] = True
def run(self,input_message):
self.register_data_to_state(input_message)
flow_to_call = self.get_next_flow_to_call()
if self.flow_state.get("early_exit_flag",False):
self.generate_reply()
elif flow_to_call == "MemoryRead":
self.call_memory_read()
elif flow_to_call == "Controller":
self.call_controller()
elif flow_to_call == "Executor":
self.call_executor()
elif flow_to_call == "HumanFeedback":
self.call_human_feedback()
elif flow_to_call == "MemoryWrite":
self.call_memory_write()
self.flow_state["current_round"] += 1
else:
self._on_reach_max_round()
self.generate_reply()
self.flow_state["last_called"] = self.get_next_flow_to_call()