from typing import Dict, Any from flow_modules.aiflows.AbstractBossFlowModule import CtrlExMemFlow from aiflows.base_flows import CircularFlow class CtrlExMem_CoderFlow(CtrlExMemFlow): """This class inherits from the CtrlExMemFlow class from AbstractBossFlowModule. See: https://huggingface.co/aiflows/AbstractBossFlowModule/blob/main/CtrlExMemFlow.py *Input Interface*: - `plan` - `logs` - `code_library`: the signatures and docstring of the functions in the code library. - `memory_files` - `goal` *Output Interface* - `result` (str): The result of the flow, the result will be returned to the caller. - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow. *Configuration Parameters*: - See the configuration parameters of the CtrlExMemFlow class from AbstractBoss (https://huggingface.co/aiflows/AbstractBossFlowModule/blob/main/CtrlExMemFlow.py). - `input_interface`: the input interface of the flow - `output_interface`: the output interface of the flow - `subflows_config`: the configuration of the subflows """ def _on_reach_max_round(self): """This method is called when the flow reaches the maximum amount of rounds. """ self._state_update_dict({ "result": "the maximum amount of rounds was reached before the coder flow has done the job", "summary": "CoderFlow: the maximum amount of rounds was reached before the flow has done the job", "status": "unfinished" }) @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: """This method is called when the flow receives a message from a subflow. This method will check the message and decide whether the flow should continue or finish. :param output_payload: the output payload of the subflow :param src_flow: the source flow of the message :return: the output payload of the flow :rtype: Dict[str, Any] """ command = output_payload["command"] if command == "finish": return { "EARLY_EXIT": True, "result": output_payload["command_args"]["summary"], "summary": "Coder: " + output_payload["command_args"]["summary"], "status": "finished" } elif command == "manual_finish": # ~~~ return the manual quit status ~~~ return { "EARLY_EXIT": True, "result": "CoderFlow was terminated explicitly by the user, process is unfinished", "summary": "Coder: process terminated by the user explicitly, nothing generated", "status": "unfinished" } elif command == "update_plan": keys_to_fetch_from_state = ["memory_files"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) output_payload["command_args"]["memory_files"] = fetched_state["memory_files"] return output_payload elif command == "re_plan": keys_to_fetch_from_state = ["plan", "memory_files"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) output_payload["command_args"]["plan_file_location"] = fetched_state["memory_files"]["plan"] output_payload["command_args"]["plan"] = fetched_state["plan"] return output_payload elif command == "run_code": keys_to_fetch_from_state = ["memory_files"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) output_payload["command_args"]["memory_files"] = fetched_state["memory_files"] return output_payload else: return output_payload