from copy import deepcopy from typing import Dict, Any from aiflows.base_flows import SequentialFlow from aiflows.utils import logging from abc import ABC logging.set_verbosity_debug() log = logging.get_logger(__name__) class AbstractBossFlow(SequentialFlow, ABC): """This class is an abstraction of memory-planner-controller-executor flow. At a higher level, it is an abstract agent empowered by multiple language models and subsequent tools like code interpreters, etc. It is designed to cooperate with memory management mechanisms, lm-powered planner and controller, and arbitrary executors. *Configuration Parameters* - `name` (str): Name of the flow. - `description` (str): Description of the flow. - `memory_files` (dict): A dictionary of memory files. The keys are the names of the memory files and the values are the path to the memory files. Typical memory files include plan, logs, code library. - `subflows_config`: - MemoryReading: reads the content of the memory files into the flow states for later use. - Planner: make a step-by-step plan based on the current goal. - CtrlExMem: controller-executor agent with memory reading and memory writing, it will execute the plan generated by the planner. - `early_exit_key` (str): The key in the flow state that indicates the early exit condition. - `topology` (list) : The topology of the flow. *Input Interface (expected input)* - `goal` (str): The goal from the caller (source flow) *Output Interface (expected output)* - `result` (str): The result of the flow, the result will be returned to the caller. - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow. :param memory_files: A dictionary of memory files. The keys are the names of the memory files and the values are the path to the memory files. :type memory_files: dict """ REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"] def __init__( self, memory_files: Dict[str, Any], **kwargs ): super().__init__(**kwargs) self.memory_files = memory_files @classmethod def instantiate_from_config(cls, config): """This method instantiates the flow from a configuration dictionary. :param config: The configuration dictionary. :type config: dict """ flow_config = deepcopy(config) kwargs = {"flow_config": flow_config} # ~~~ Set up memory file ~~~ memory_files = flow_config["memory_files"] kwargs.update({"memory_files": memory_files}) # ~~~ Set up subflows ~~~ kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) # ~~~ Instantiate flow ~~~ return cls(**kwargs) def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """This method runs the flow. :param input_data: The input data, the input_data is supposed to contain 'goal' :type input_data: dict """ # ~~~ sets the input_data in the flow_state dict ~~~ self._state_update_dict(update_data=input_data) # ~~~ set the memory file to the flow state ~~~ self._state_update_dict(update_data={"memory_files": self.memory_files}) max_rounds = self.flow_config.get("max_rounds", 1) if max_rounds is None: log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") self._sequential_run(max_rounds=max_rounds) output = self._get_output_from_state() return output