from copy import deepcopy from typing import Any, Dict from aiflows.base_flows import SequentialFlow from aiflows.utils import logging logging.set_verbosity_debug() log = logging.get_logger(__name__) class TestCodeFlow(SequentialFlow): REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"] def __init__( self, memory_files: Dict[str, Any], **kwargs ): super().__init__(**kwargs) self.memory_files = memory_files @classmethod def instantiate_from_config(cls, config): flow_config = deepcopy(config) kwargs = {"flow_config": flow_config} # ~~~ Set up memory file ~~~ memory_files = flow_config["memory_files"] kwargs.update({"memory_files": memory_files}) # ~~~ Set up subflows ~~~ kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) # ~~~ Instantiate flow ~~~ return cls(**kwargs) def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: # ~~~ sets the input_data in the flow_state dict ~~~ self._state_update_dict(update_data=input_data) # ~~~ set the memory file to the flow state ~~~ self._state_update_dict(update_data={"memory_files": self.memory_files}) max_rounds = self.flow_config.get("max_rounds", 1) if max_rounds is None: log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") self._sequential_run(max_rounds=max_rounds) output = self._get_output_from_state() return output