|
from typing import Dict, Any |
|
from aiflows.utils import logging |
|
log = logging.get_logger(__name__) |
|
|
|
from flow_modules.Tachi67.AbstractBossFlowModule import AbstractBossFlow |
|
|
|
class CoderFlow(AbstractBossFlow): |
|
"""Coder flow is one executor branch of the Jarvis flow. At a higher level, it is a flow that |
|
writes and runs code given a goal. In the Jarvis flow, the Coder flow in invoked by the controller, |
|
The Coder flow receives the goal generated by the controller, writes and runs code in an interactive fashion. |
|
|
|
The Coder flow has the similar structure as the Jarvis flow (inherited from AbstractBossFlow). |
|
|
|
|
|
*Input Interface (expected input)* |
|
- `goal` (str): The goal from the caller (source flow, i.e. JarvisFlow) |
|
|
|
|
|
*Output Interface (expected output)* |
|
- `result` (str): The result of the flow, the result will be returned to the caller (i.e. JarvisFlow). |
|
- `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow (i.e. JarvisFlow). |
|
|
|
Typical workflow of Coder: |
|
0. JarvisFlow calls Coder with a goal. |
|
1. MemoryReading reads plans, logs and code library. |
|
2. Planner makes plan based on goal. |
|
3. Extend library with the goal given by the controller. |
|
4. Run code with code (possibly calls the newly written function) given by the controller. |
|
5. Finish and give an answer. |
|
""" |
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""The run function of the Coder flow. |
|
:param input_data: The input data of the flow. |
|
:type input_data: Dict[str, Any] |
|
:return: The output data of the flow. |
|
:rtype: Dict[str, Any] |
|
""" |
|
|
|
self._state_update_dict(update_data=input_data) |
|
|
|
|
|
self._state_update_dict(update_data={"memory_files": self.memory_files}) |
|
|
|
max_rounds = self.flow_config.get("max_rounds", 1) |
|
if max_rounds is None: |
|
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
|
|
|
self._sequential_run(max_rounds=max_rounds) |
|
|
|
output = self._get_output_from_state() |
|
|
|
self.reset(full_reset=True, recursive=True, src_flow=self) |
|
|
|
return output |