Tachi67 commited on
Commit
b8b19eb
·
1 Parent(s): 76cad08

Update ControllerFlow_ExtLib.py

Browse files
Files changed (1) hide show
  1. ControllerFlow_ExtLib.py +127 -65
ControllerFlow_ExtLib.py CHANGED
@@ -1,65 +1,127 @@
1
- from typing import Dict, Any
2
-
3
- from flow_modules.Tachi67.AbstractBossFlowModule import CtrlExMemFlow
4
- from aiflows.base_flows import CircularFlow
5
-
6
- class CtrlExMem_ExtLib(CtrlExMemFlow):
7
- """This class inherits from the CtrlExMemFlow class from AbstractBossFlowModule.
8
- See: https://huggingface.co/Tachi67/AbstractBossFlowModule/blob/main/CtrlExMemFlow.py
9
- *Input Interface*:
10
- - `plan`
11
- - `logs`
12
- - `memory_files`
13
- - `goal`
14
- *Output Interface*
15
- - `result` (str): The result of the flow, the result will be returned to the caller.
16
- - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow.
17
-
18
- """
19
- def _on_reach_max_round(self):
20
- self._state_update_dict({
21
- "result": "the maximum amount of rounds was reached before the extend library flow has done the job",
22
- "summary": "ExtendLibraryFlow: the maximum amount of rounds was reached before the flow has done the job",
23
- "status": "unfinished"
24
- })
25
-
26
- @CircularFlow.output_msg_payload_processor
27
- def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
28
- command = output_payload["command"]
29
- if command == "finish":
30
- return {
31
- "EARLY_EXIT": True,
32
- "result": output_payload["command_args"]["summary"],
33
- "summary": "ExtendLibrary: " + output_payload["command_args"]["summary"],
34
- "status": "finished"
35
- }
36
- elif command == "manual_finish":
37
- # ~~~ return the manual quit status ~~~
38
- return {
39
- "EARLY_EXIT": True,
40
- "result": "ExtendLibraryFlow was terminated explicitly by the user, process is unfinished",
41
- "summary": "ExtendLibrary: process terminated by the user explicitly, nothing generated",
42
- "status": "unfinished"
43
- }
44
- elif command == "save_code":
45
- keys_to_fetch_from_state = ["code", "memory_files"]
46
- fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
47
- output_payload["command_args"]["code"] = fetched_state["code"]
48
- output_payload["command_args"]["memory_files"] = fetched_state["memory_files"]
49
- return output_payload
50
-
51
- elif command == "update_plan":
52
- keys_to_fetch_from_state = ["memory_files"]
53
- fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
54
- output_payload["command_args"]["memory_files"] = fetched_state["memory_files"]
55
- return output_payload
56
-
57
- elif command == "re_plan":
58
- keys_to_fetch_from_state = ["plan", "memory_files"]
59
- fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
60
- output_payload["command_args"]["plan_file_location"] = fetched_state["memory_files"]["plan"]
61
- output_payload["command_args"]["plan"] = fetched_state["plan"]
62
- return output_payload
63
-
64
- else:
65
- return output_payload
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from copy import deepcopy
3
+ from typing import Any, Dict, List
4
+
5
+ from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow
6
+
7
+ from dataclasses import dataclass
8
+
9
+
10
+ @dataclass
11
+ class Command:
12
+ name: str
13
+ description: str
14
+ input_args: List[str]
15
+
16
+ class ControllerFlow_ExtLib(ChatAtomicFlow):
17
+ def __init__(
18
+ self,
19
+ commands: List[Command],
20
+ **kwargs):
21
+ super().__init__(**kwargs)
22
+ self.system_message_prompt_template = self.system_message_prompt_template.partial(
23
+ commands=self._build_commands_manual(commands),
24
+ plan_file_location="no location yet",
25
+ plan="no plan yet",
26
+ logs="no logs yet",
27
+ )
28
+ self.hint_for_model = """
29
+ Make sure your response is in the following format:
30
+ Response Format:
31
+ {
32
+ "command": "call one of the commands you have e.g. `write_code`",
33
+ "command_args": {
34
+ "arg name": "value"
35
+ }
36
+ }
37
+ """
38
+
39
+ @staticmethod
40
+ def _build_commands_manual(commands: List[Command]) -> str:
41
+ ret = ""
42
+ for i, command in enumerate(commands):
43
+ command_input_json_schema = json.dumps(
44
+ {input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args})
45
+ ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n"
46
+ return ret
47
+
48
+ def _get_plan_file_location(self, input_data: Dict[str, Any]):
49
+ assert "memory_files" in input_data, "memory_files not passed to Extlib/Controller"
50
+ assert "plan" in input_data["memory_files"], "plan not in memory files"
51
+ return input_data["memory_files"]["plan"]
52
+
53
+ def _get_plan_content(self, input_data: Dict[str, Any]):
54
+ assert "plan" in input_data, "plan not passed to Extlib/Controller"
55
+ plan_content = input_data["plan"]
56
+ if len(plan_content) == 0:
57
+ plan_content = 'No plan yet'
58
+ return plan_content
59
+
60
+ def _get_logs_content(self, input_data: Dict[str, Any]):
61
+ assert "logs" in input_data, "logs not passed to Extlib/Controller"
62
+ logs_content = input_data["logs"]
63
+ if len(logs_content) == 0:
64
+ logs_content = "No logs yet"
65
+ return logs_content
66
+
67
+ @classmethod
68
+ def instantiate_from_config(cls, config):
69
+ flow_config = deepcopy(config)
70
+
71
+ kwargs = {"flow_config": flow_config}
72
+
73
+ # ~~~ Set up prompts ~~~
74
+ kwargs.update(cls._set_up_prompts(flow_config))
75
+
76
+ # ~~~Set up backend ~~~
77
+ kwargs.update(cls._set_up_backend(flow_config))
78
+
79
+ # ~~~ Set up commands ~~~
80
+ commands = flow_config["commands"]
81
+ commands = [
82
+ Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in
83
+ commands.items()
84
+ ]
85
+ kwargs.update({"commands": commands})
86
+
87
+ # ~~~ Instantiate flow ~~~
88
+ return cls(**kwargs)
89
+
90
+ def _update_prompts_and_input(self, input_data: Dict[str, Any]):
91
+ if 'goal' in input_data:
92
+ input_data['goal'] += self.hint_for_model
93
+ if 'result' in input_data:
94
+ input_data['result'] += self.hint_for_model
95
+ plan_file_location = self._get_plan_file_location(input_data)
96
+ plan_content = self._get_plan_content(input_data)
97
+ logs_content = self._get_logs_content(input_data)
98
+ self.system_message_prompt_template = self.system_message_prompt_template.partial(
99
+ plan_file_location=plan_file_location,
100
+ plan=plan_content,
101
+ logs=logs_content
102
+ )
103
+
104
+ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
105
+ self._update_prompts_and_input(input_data)
106
+
107
+ # ~~~when conversation is initialized, append the updated system prompts to the chat history ~~~
108
+ if self._is_conversation_initialized():
109
+ updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
110
+ self._state_update_add_chat_message(content=updated_system_message_content,
111
+ role=self.flow_config["system_name"])
112
+
113
+ while True:
114
+ api_output = super().run(input_data)["api_output"].strip()
115
+ try:
116
+ start = api_output.index("{")
117
+ end = api_output.rindex("}") + 1
118
+ json_str = api_output[start:end]
119
+ return json.loads(json_str)
120
+ except (json.decoder.JSONDecodeError, json.JSONDecodeError):
121
+ updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
122
+ self._state_update_add_chat_message(content=updated_system_message_content,
123
+ role=self.flow_config["system_name"])
124
+ new_goal = "The previous respond cannot be parsed with json.loads. Next time, do not provide any comments or code blocks. Make sure your next response is purely json parsable."
125
+ new_input_data = input_data.copy()
126
+ new_input_data['result'] = new_goal
127
+ input_data = new_input_data