File size: 2,797 Bytes
e0487cd 12790c9 e0487cd 12790c9 e0487cd 12790c9 e0487cd 12790c9 e0487cd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
from copy import deepcopy
from typing import Any, Dict
from aiflows.base_flows import SequentialFlow
from aiflows.utils import logging
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
class InteractivePlanGenFlow(SequentialFlow):
"""This flow writes code in an interactive manner. It is a sequential flow composed of:
1. PlanGenerator: generates plan based on the goal.
2. PlanFileEditor: writes the generated plan to a temp file for the user to see, edit and provide feedback.
3. ParseFeedback: opens up the temp file with vscode and parses the feedback from the user.
*Input Interface*:
- `goal`
- `plan_file_location`
*Output Interface*:
- `plan`
- `feedback`
- `temp_plan_file_location`
*Configuration Parameters*:
- `input_interface`: the input interface of the flow.
- `output_interface`: the output interface of the flow.
- `subflows_config`: the configuration of the subflows.
- `early_exit_key`: the key in the state dict that indicates whether the flow should exit early.
- `topology`: the topology of the subflows.
"""
REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology"]
def __init__(
self,
**kwargs
):
"""
This function initializes the flow.
:param kwargs: the configuration of the flow.
:type kwargs: Dict[str, Any]
"""
super().__init__(**kwargs)
@classmethod
def instantiate_from_config(cls, config):
"""
This function instantiates the flow from a configuration.
:param config: the configuration of the flow.
:type config: Dict[str, Any]
:return: the instantiated flow.
:rtype: InteractivePlanGenFlow
"""
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Set up subflows ~~~
kwargs.update({"subflows": cls._set_up_subflows(flow_config)})
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""
This function runs the flow.
:param input_data: the input data of the flow.
:type input_data: Dict[str, Any]
:return: the output data of the flow.
:rtype: Dict[str, Any]
"""
# ~~~ sets the input_data in the flow_state dict ~~~
self._state_update_dict(update_data=input_data)
max_rounds = self.flow_config.get("max_rounds", 1)
if max_rounds is None:
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")
self._sequential_run(max_rounds=max_rounds)
output = self._get_output_from_state()
return output
|