from copy import deepcopy from typing import Any, Dict from aiflows.base_flows import SequentialFlow from aiflows.utils import logging logging.set_verbosity_debug() log = logging.get_logger(__name__) class InteractivePlanGenFlow(SequentialFlow): """This flow writes code in an interactive manner. It is a sequential flow composed of: 1. PlanGenerator: generates plan based on the goal. 2. PlanFileEditor: writes the generated plan to a temp file for the user to see, edit and provide feedback. 3. ParseFeedback: opens up the temp file with vscode and parses the feedback from the user. *Input Interface*: - `goal` - `plan_file_location` *Output Interface*: - `plan` - `feedback` - `temp_plan_file_location` *Configuration Parameters*: - `input_interface`: the input interface of the flow. - `output_interface`: the output interface of the flow. - `subflows_config`: the configuration of the subflows. - `early_exit_key`: the key in the state dict that indicates whether the flow should exit early. - `topology`: the topology of the subflows. """ REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology"] def __init__( self, **kwargs ): """ This function initializes the flow. :param kwargs: the configuration of the flow. :type kwargs: Dict[str, Any] """ super().__init__(**kwargs) @classmethod def instantiate_from_config(cls, config): """ This function instantiates the flow from a configuration. :param config: the configuration of the flow. :type config: Dict[str, Any] :return: the instantiated flow. :rtype: InteractivePlanGenFlow """ flow_config = deepcopy(config) kwargs = {"flow_config": flow_config} # ~~~ Set up subflows ~~~ kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) # ~~~ Instantiate flow ~~~ return cls(**kwargs) def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """ This function runs the flow. :param input_data: the input data of the flow. :type input_data: Dict[str, Any] :return: the output data of the flow. :rtype: Dict[str, Any] """ # ~~~ sets the input_data in the flow_state dict ~~~ self._state_update_dict(update_data=input_data) max_rounds = self.flow_config.get("max_rounds", 1) if max_rounds is None: log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") self._sequential_run(max_rounds=max_rounds) output = self._get_output_from_state() return output