File size: 2,797 Bytes
e0487cd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12790c9
 
 
 
 
 
 
 
e0487cd
 
 
 
 
 
 
12790c9
 
 
 
 
e0487cd
 
 
 
12790c9
 
 
 
 
 
 
e0487cd
 
 
 
 
 
 
 
 
 
 
12790c9
 
 
 
 
 
 
e0487cd
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from copy import deepcopy
from typing import Any, Dict

from aiflows.base_flows import SequentialFlow
from aiflows.utils import logging

logging.set_verbosity_debug()
log = logging.get_logger(__name__)

class InteractivePlanGenFlow(SequentialFlow):
    """This flow writes code in an interactive manner. It is a sequential flow composed of:
    1. PlanGenerator: generates plan based on the goal.
    2. PlanFileEditor: writes the generated plan to a temp file for the user to see, edit and provide feedback.
    3. ParseFeedback: opens up the temp file with vscode and parses the feedback from the user.
    
    *Input Interface*:
    - `goal`
    - `plan_file_location`
    
    *Output Interface*:
    - `plan`
    - `feedback`
    - `temp_plan_file_location`

    *Configuration Parameters*:
    - `input_interface`: the input interface of the flow.
    - `output_interface`: the output interface of the flow.
    - `subflows_config`: the configuration of the subflows.
    - `early_exit_key`: the key in the state dict that indicates whether the flow should exit early.
    - `topology`: the topology of the subflows.

    """
    REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology"]

    def __init__(
            self,
            **kwargs
    ):
        """
        This function initializes the flow.
        :param kwargs: the configuration of the flow.
        :type kwargs: Dict[str, Any]
        """
        super().__init__(**kwargs)

    @classmethod
    def instantiate_from_config(cls, config):
        """
        This function instantiates the flow from a configuration.
        :param config: the configuration of the flow.
        :type config: Dict[str, Any]
        :return: the instantiated flow.
        :rtype: InteractivePlanGenFlow
        """
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Set up subflows ~~~
        kwargs.update({"subflows": cls._set_up_subflows(flow_config)})

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        This function runs the flow.
        :param input_data: the input data of the flow.
        :type input_data: Dict[str, Any]
        :return: the output data of the flow.
        :rtype: Dict[str, Any]
        """
        # ~~~ sets the input_data in the flow_state dict ~~~
        self._state_update_dict(update_data=input_data)

        max_rounds = self.flow_config.get("max_rounds", 1)
        if max_rounds is None:
            log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")

        self._sequential_run(max_rounds=max_rounds)

        output = self._get_output_from_state()

        return output