|
from copy import deepcopy |
|
from typing import Any, Dict |
|
|
|
from aiflows.base_flows import SequentialFlow |
|
from aiflows.utils import logging |
|
|
|
logging.set_verbosity_debug() |
|
log = logging.get_logger(__name__) |
|
|
|
class InteractivePlanGenFlow(SequentialFlow): |
|
"""This flow writes code in an interactive manner. It is a sequential flow composed of: |
|
1. PlanGenerator: generates plan based on the goal. |
|
2. PlanFileEditor: writes the generated plan to a temp file for the user to see, edit and provide feedback. |
|
3. ParseFeedback: opens up the temp file with vscode and parses the feedback from the user. |
|
|
|
*Input Interface*: |
|
- `goal` |
|
- `plan_file_location` |
|
|
|
*Output Interface*: |
|
- `plan` |
|
- `feedback` |
|
- `temp_plan_file_location` |
|
|
|
*Configuration Parameters*: |
|
- `input_interface`: the input interface of the flow. |
|
- `output_interface`: the output interface of the flow. |
|
- `subflows_config`: the configuration of the subflows. |
|
- `early_exit_key`: the key in the state dict that indicates whether the flow should exit early. |
|
- `topology`: the topology of the subflows. |
|
|
|
""" |
|
REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology"] |
|
|
|
def __init__( |
|
self, |
|
**kwargs |
|
): |
|
""" |
|
This function initializes the flow. |
|
:param kwargs: the configuration of the flow. |
|
:type kwargs: Dict[str, Any] |
|
""" |
|
super().__init__(**kwargs) |
|
|
|
@classmethod |
|
def instantiate_from_config(cls, config): |
|
""" |
|
This function instantiates the flow from a configuration. |
|
:param config: the configuration of the flow. |
|
:type config: Dict[str, Any] |
|
:return: the instantiated flow. |
|
:rtype: InteractivePlanGenFlow |
|
""" |
|
flow_config = deepcopy(config) |
|
|
|
kwargs = {"flow_config": flow_config} |
|
|
|
|
|
kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) |
|
|
|
|
|
return cls(**kwargs) |
|
|
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
This function runs the flow. |
|
:param input_data: the input data of the flow. |
|
:type input_data: Dict[str, Any] |
|
:return: the output data of the flow. |
|
:rtype: Dict[str, Any] |
|
""" |
|
|
|
self._state_update_dict(update_data=input_data) |
|
|
|
max_rounds = self.flow_config.get("max_rounds", 1) |
|
if max_rounds is None: |
|
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
|
|
|
self._sequential_run(max_rounds=max_rounds) |
|
|
|
output = self._get_output_from_state() |
|
|
|
return output |
|
|