File size: 3,100 Bytes
bd7cdc8
 
 
77e3c49
bd7cdc8
 
76cad08
 
 
 
 
 
 
 
 
 
 
bd7cdc8
 
8161031
bd7cdc8
 
 
 
 
 
 
 
 
 
8161031
bd7cdc8
 
 
 
 
 
 
8161031
bd7cdc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76cad08
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from typing import Dict, Any

from flow_modules.Tachi67.AbstractBossFlowModule import CtrlExMemFlow
from aiflows.base_flows import CircularFlow

class CtrlExMem_ExtLib(CtrlExMemFlow):
    """This class inherits from the CtrlExMemFlow class from AbstractBossFlowModule.
    See: https://huggingface.co/Tachi67/AbstractBossFlowModule/blob/main/CtrlExMemFlow.py
    *Input Interface*:
    - `plan`
    - `logs`
    - `memory_files`
    - `goal`
    *Output Interface*
    - `result` (str): The result of the flow, the result will be returned to the caller.
    - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow.
    """
    def _on_reach_max_round(self):
        self._state_update_dict({
            "result": "the maximum amount of rounds was reached before the extend library flow has done the job",
            "summary": "ExtendLibraryFlow: the maximum amount of rounds was reached before the flow has done the job",
            "status": "unfinished"
        })

    @CircularFlow.output_msg_payload_processor
    def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
        command = output_payload["command"]
        if command == "finish":
            return {
                "EARLY_EXIT": True,
                "result": output_payload["command_args"]["summary"],
                "summary": "ExtendLibrary: " + output_payload["command_args"]["summary"],
                "status": "finished"
            }
        elif command == "manual_finish":
            # ~~~ return the manual quit status ~~~
            return {
                "EARLY_EXIT": True,
                "result": "ExtendLibraryFlow was terminated explicitly by the user, process is unfinished",
                "summary": "ExtendLibrary: process terminated by the user explicitly, nothing generated",
                "status": "unfinished"
            }
        elif command == "save_code":
            keys_to_fetch_from_state = ["code", "memory_files"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            output_payload["command_args"]["code"] = fetched_state["code"]
            output_payload["command_args"]["memory_files"] = fetched_state["memory_files"]
            return output_payload

        elif command == "update_plan":
            keys_to_fetch_from_state = ["memory_files"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            output_payload["command_args"]["memory_files"] = fetched_state["memory_files"]
            return output_payload

        elif command == "re_plan":
            keys_to_fetch_from_state = ["plan", "memory_files"]
            fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
            output_payload["command_args"]["plan_file_location"] = fetched_state["memory_files"]["plan"]
            output_payload["command_args"]["plan"] = fetched_state["plan"]
            return output_payload

        else:
            return output_payload