from typing import Dict, Any from aiflows.utils import logging log = logging.get_logger(__name__) from flow_modules.aiflows.AbstractBossFlowModule import AbstractBossFlow class CoderFlow(AbstractBossFlow): """Coder flow is one executor branch of the Jarvis flow. At a higher level, it is a flow that writes and runs code given a goal. In the Jarvis flow, the Coder flow in invoked by the controller, The Coder flow receives the goal generated by the controller, writes and runs code in an interactive fashion. The Coder flow has the similar structure as the Jarvis flow (inherited from AbstractBossFlow). *Input Interface (expected input)* - `goal` (str): The goal from the caller (source flow, i.e. JarvisFlow) *Output Interface (expected output)* - `result` (str): The result of the flow, the result will be returned to the caller (i.e. JarvisFlow). - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow (i.e. JarvisFlow). *Configuration Parameters*: (Also see super class: AbstractBossFlow) - `memory_files` (dict): The memory files of the flow. The memory files are the files that the flow reads and writes. Typically it should contain plan, logs, and code_library. Typical workflow of Coder: 0. JarvisFlow calls Coder with a goal. 1. MemoryReading reads plans, logs and code library. 2. Planner makes plan based on goal. 3. Extend library with the goal given by the controller. 4. Run code with code (possibly calls the newly written function) given by the controller. 5. Finish and give an answer. """ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """The run function of the Coder flow. :param input_data: The input data of the flow. :type input_data: Dict[str, Any] :return: The output data of the flow. :rtype: Dict[str, Any] """ # ~~~ sets the input_data in the flow_state dict ~~~ self._state_update_dict(update_data=input_data) # ~~~ set the memory file to the flow state ~~~ self._state_update_dict(update_data={"memory_files": self.memory_files}) max_rounds = self.flow_config.get("max_rounds", 1) if max_rounds is None: log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") self._sequential_run(max_rounds=max_rounds) output = self._get_output_from_state() self.reset(full_reset=True, recursive=True, src_flow=self) return output