from typing import Dict, Any from flow_modules.aiflows.AbstractBossFlowModule import CtrlExMemFlow from aiflows.base_flows import CircularFlow class CtrlExMem_JarvisFlow(CtrlExMemFlow): """This class inherits from the CtrlExMemFlow class from AbstractBossFlowModule. See: https://huggingface.co/aiflows/AbstractBossFlowModule/blob/main/CtrlExMemFlow.py *Input Interface*: - `plan` - `memory_files` - `logs` - `goal` *Output Interface*: - `result` - `summary` *Configuration Parameters*: - `input_interface`: the input interface of the flow - `output_interface`: the output interface of the flow - `subflows_config`: the subflows configuration of the flow - `topology`: the topology of the subflows Take notice that: 1. In the controller, we only keep the previous 3 messages for memory management, that will be: a. The assistant message (controller's last command) b. Manually updated new system prompt (new logs, new plans, etc.) c. The user message (result, feedback) 2. Each time one executor from the branch is executed, the logs is updated, this means: a. The logs file of Jarvis is updated. b. After MemoryReading at the end of each run of the loop, the logs in the flow_state is updated. c. The next time the controller is called, the updated logs is injected into the system prompts. 3. In the prompts of the controller, when the controller realizes one step of the plan is done, we ask the controller to revise what was done and mark the current step as done. This means: a. The plan file is updated. b. The plan in the flow_state is updated. c. The next time the controller is called, the updated plan is injected into the system prompts. This is basically how the memory management works, to allow for more space for llm execution, and make sure the llm does not forget important information. """ def _on_reach_max_round(self): """This function is called when the maximum amount of rounds is reached before the Jarvis flow has done the job. """ self._state_update_dict({ "result": "the maximum amount of rounds was reached before the Jarvis flow has done the job", "summary": "JarvisFlow: the maximum amount of rounds was reached before the flow has done the job", "status": "unfinished" }) @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: """This function is called when the JarvisFlow receives a message from one of its branches. This function processes the message and decides whether the JarvisFlow should continue or finish. :param output_payload: the output payload of the branch :type output_payload: Dict[str, Any] :param src_flow: the source flow of the message :type src_flow: str :return: the updated output payload :rtype: Dict[str, Any] """ command = output_payload["command"] if command == "finish": return { "EARLY_EXIT": True, "result": output_payload["command_args"]["summary"], "summary": "Jarvis: " + output_payload["command_args"]["summary"], "status": "finished" } elif command == "manual_finish": # ~~~ return the manual quit status ~~~ return { "EARLY_EXIT": True, "result": "JarvisFlow was terminated explicitly by the user, process is unfinished", "summary": "Jarvis: process terminated by the user explicitly, nothing generated", "status": "unfinished" } elif command == "update_plan": keys_to_fetch_from_state = ["memory_files"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) output_payload["command_args"]["memory_files"] = fetched_state["memory_files"] return output_payload elif command == "re_plan": keys_to_fetch_from_state = ["plan", "memory_files"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) output_payload["command_args"]["plan_file_location"] = fetched_state["memory_files"]["plan"] output_payload["command_args"]["plan"] = fetched_state["plan"] return output_payload else: return output_payload