|
import sys |
|
from typing import Dict, Any |
|
|
|
from flows.base_flows import CircularFlow |
|
from flows.utils import logging |
|
|
|
logging.set_verbosity_debug() |
|
|
|
log = logging.get_logger(__name__) |
|
|
|
from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerAtomicFlow |
|
from flow_modules.aiflows.VectorStoreFlowModule import ChromaDBFlow |
|
|
|
class AutoGPTFlow(CircularFlow): |
|
""" This class implements a (very basic) AutoGPT flow. It is a flow that consists of multiple sub-flows that are executed circularly. It Contains the following subflows: |
|
|
|
- A Controller Flow: A Flow that controls which subflow of the Executor Flow to execute next. |
|
- A Memory Flow: A Flow used to save and retrieve messages or memories which might be useful for the Controller Flow. |
|
- A HumanFeedback Flow: A flow use to get feedback from the user/human. |
|
- A Executor Flow: A Flow that executes commands generated by the Controller Flow. Typically it's a branching flow (see BranchingFlow) and the commands are which branch to execute next. |
|
|
|
An illustration of the flow is as follows: |
|
|
|
| -------> Memory Flow -------> Controller Flow ------->| |
|
^ | |
|
| | |
|
| v |
|
| <----- HumanFeedback Flow <------- Executor Flow <----| |
|
|
|
*Configuration Parameters*: |
|
|
|
- `name` (str): The name of the flow. Default is "AutoGPTFlow". |
|
- `description` (str): A description of the flow. Default is "An example implementation of AutoGPT with Flows." |
|
- `max_rounds` (int): The maximum number of rounds the circular flow can run for. Default is 30. |
|
- `early_exit_key` (str): The key that is used to terminate the flow early. Default is "EARLY_EXIT". |
|
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default: |
|
- `Controller` (Dict[str,Any]): The configuration of the Controller Flow. By default the controller flow is a ControllerAtomicFlow (see ControllerExecutorFlowModule). It's default values are |
|
defined in ControllerAtomicFlow.yaml of the ControllerExecutorFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: |
|
- `finish` (Dict[str,Any]): The configuration of the finish command (used to terminate the flow early when the controller has accomplished its goal). |
|
- `description` (str): The description of the command. Default is "The finish command is used to terminate the flow early when the controller has accomplished its goal." |
|
- `input_args` (List[str]): The list of expected keys to run the finish command. Default is ["answer"]. |
|
- `human_message_prompt_template`(Dict[str,Any]): The prompt template used to generate the message that is shown to the user/human when the finish command is executed. Default is: |
|
- `template` (str): The template of the humand message prompt (see AutoGPTFlow.yaml for default template) |
|
- `input_variables` (List[str]): The list of variables to be included in the template. Default is ["observation", "human_feedback", "memory"]. |
|
- `ìnput_interface_initialized` (List[str]): The input interface that Controller Flow expects except for the first time in the flow. Default is ["observation", "human_feedback", "memory"]. |
|
- `Executor` (Dict[str,Any]): The configuration of the Executor Flow. By default the executor flow is a Branching Flow (see BranchingFlow). It's default values are the default values of the BranchingFlow. Fields to define: |
|
- `subflows_config` (Dict[str,Any]): A Dictionary of subflows configurations.The keys are the names of the subflows and the values are the configurations of the subflows. Each subflow is a branch of the branching flow. |
|
- `HumanFeedback` (Dict[str,Any]): The configuration of the HumanFeedback Flow. By default the human feedback flow is a HumanStandardInputFlow (see HumanStandardInputFlowModule ). |
|
It's default values are specified in the REAMDE.md of HumanStandardInputFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: |
|
- `request_multi_line_input_flag` (bool): Flag to request multi-line input. Default is False. |
|
- `query_message_prompt_template` (Dict[str,Any]): The prompt template presented to the user/human to request input. Default is: |
|
- `template` (str): The template of the query message prompt (see AutoGPTFlow.yaml for default template) |
|
- `input_variables` (List[str]): The list of variables to be included in the template. Default is ["goal","command","command_args",observation"] |
|
- input_interface_initialized (List[str]): The input interface that HumanFeeback Flow expects except for the first time in the flow. Default is ["goal","command","command_args",observation"] |
|
- `Memory` (Dict[str,Any]): The configuration of the Memory Flow. By default the memory flow is a ChromaDBFlow (see VectorStoreFlowModule). It's default values are defined in ChromaDBFlow.yaml of the VectorStoreFlowModule. Except for the following parameters who are overwritten by the AutoGPTFlow in AutoGPTFlow.yaml: |
|
- `n_results`: The number of results to retrieve from the memory. Default is 2. |
|
- `topology` (List[Dict[str,Any]]): The topology of the flow which is "circular". By default, the topology is the one shown in the illustration above (the topology is also described in AutoGPTFlow.yaml). |
|
|
|
|
|
*Input Interface*: |
|
|
|
- `goal` (str): The goal of the flow. |
|
|
|
*Output Interface*: |
|
|
|
- `answer` (str): The answer of the flow. |
|
- `status` (str): The status of the flow. It can be "finished" or "unfinished". |
|
|
|
:param flow_config: The configuration of the flow. Contains the parameters described above and the parameters required by the parent class (CircularFlow). |
|
:type flow_config: Dict[str,Any] |
|
:param subflows: A list of subflows constituating the circular flow. Required when instantiating the subflow programmatically (it replaces subflows_config from flow_config). |
|
:type subflows: List[Flow] |
|
""" |
|
def _on_reach_max_round(self): |
|
""" This method is called when the flow reaches the max_rounds.""" |
|
self._state_update_dict({ |
|
"answer": "The maximum amount of rounds was reached before the model found an answer.", |
|
"status": "unfinished" |
|
}) |
|
|
|
@staticmethod |
|
def _get_memory_key(flow_state): |
|
""" This method returns the memory key that is used to retrieve memories from the ChromaDB model. |
|
|
|
:param flow_state: The state of the flow |
|
:type flow_state: Dict[str, Any] |
|
:return: The current context |
|
:rtype: str |
|
""" |
|
goal = flow_state.get("goal") |
|
last_command = flow_state.get("command") |
|
last_command_args = flow_state.get("command_args") |
|
last_observation = flow_state.get("observation") |
|
last_human_feedback = flow_state.get("human_feedback") |
|
|
|
if last_command is None: |
|
return "" |
|
|
|
assert goal is not None, goal |
|
assert last_command_args is not None, last_command_args |
|
assert last_observation is not None, last_observation |
|
|
|
current_context = \ |
|
f""" |
|
== Goal == |
|
{goal} |
|
|
|
== Command == |
|
{last_command} |
|
== Args |
|
{last_command_args} |
|
== Result |
|
{last_observation} |
|
|
|
== Human Feedback == |
|
{last_human_feedback} |
|
""" |
|
|
|
return current_context |
|
|
|
@CircularFlow.input_msg_payload_builder |
|
def prepare_memory_read_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: |
|
""" This method prepares the input for the Memory Flow. It is called before the Memory Flow is called. |
|
A (very) basic example implementation of how the memory retrieval could be constructed. |
|
|
|
:param flow_state: The state of the flow |
|
:type flow_state: Dict[str, Any] |
|
:param dst_flow: The destination flow |
|
:type dst_flow: Flow |
|
:return: The input message for the Memory Flow |
|
:rtype: Dict[str, Any] |
|
""" |
|
query = self._get_memory_key(flow_state) |
|
|
|
return { |
|
"operation": "read", |
|
"content": query |
|
} |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def prepare_memory_read_output(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow): |
|
""" This method processes the output of the Memory Flow. It is called after the Memory Flow is called. |
|
|
|
:param output_payload: The output payload of the Memory Flow |
|
:type output_payload: Dict[str, Any] |
|
:param src_flow: The source flow |
|
:type src_flow: Flow |
|
:return: The processed output payload |
|
:rtype: Dict[str, Any] |
|
""" |
|
retrieved_memories = output_payload["retrieved"][0][1:] |
|
return {"memory": "\n".join(retrieved_memories)} |
|
|
|
@CircularFlow.input_msg_payload_builder |
|
def prepare_memory_write_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: |
|
""" This method prepares the input for the Memory Flow. It is called before the Memory Flow is called. |
|
A (very) basic example implementation of how the memory population could be constructed. |
|
|
|
:param flow_state: The state of the flow |
|
:type flow_state: Dict[str, Any] |
|
:param dst_flow: The destination flow |
|
:type dst_flow: Flow |
|
:return: The input message to write the Memory Flow |
|
:rtype: Dict[str, Any] |
|
""""" |
|
query = self._get_memory_key(flow_state) |
|
|
|
return { |
|
"operation": "write", |
|
"content": str(query) |
|
} |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ |
|
str, Any]: |
|
""" This method detects whether the Controller flow has generated a "finish" command or not to terminate the flow. . It is called after the Controller Flow is called. |
|
|
|
:param output_payload: The output payload of the Controller Flow |
|
:type output_payload: Dict[str, Any] |
|
:param src_flow: The source flow |
|
:type src_flow: Flow |
|
:return: The processed output payload |
|
:rtype: Dict[str, Any] |
|
""" |
|
command = output_payload["command"] |
|
if command == "finish": |
|
return { |
|
"EARLY_EXIT": True, |
|
"answer": output_payload["command_args"]["answer"], |
|
"status": "finished" |
|
} |
|
else: |
|
return output_payload |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def detect_finish_in_human_input(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ |
|
str, Any]: |
|
""" This method detects whether the HumanFeedback (the human/user) flow has generated a "finish" command or not to terminate the flow. It is called after the HumanFeedback Flow is called. |
|
|
|
:param output_payload: The output payload of the HumanFeedback Flow |
|
:type output_payload: Dict[str, Any] |
|
:param src_flow: The source flow |
|
:type src_flow: Flow |
|
:return: The processed output payload |
|
:rtype: Dict[str, Any] |
|
""" |
|
|
|
human_feedback = output_payload["human_input"] |
|
if human_feedback.strip().lower() == "q": |
|
return { |
|
"EARLY_EXIT": True, |
|
"answer": "The user has chosen to exit before a final answer was generated.", |
|
"status": "unfinished" |
|
} |
|
|
|
return {"human_feedback": human_feedback} |
|
|