from typing import Dict, Any | |
from flows.utils import logging | |
log = logging.get_logger(__name__) | |
from flow_modules.Tachi67.AbstractBossFlowModule import AbstractBossFlow | |
class ExtendLibraryFlow(AbstractBossFlow): | |
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: | |
# ~~~ sets the input_data in the flow_state dict ~~~ | |
self._state_update_dict(update_data=input_data) | |
# ~~~ set the memory file to the flow state ~~~ | |
self._state_update_dict(update_data={"memory_files": self.memory_files}) | |
max_rounds = self.flow_config.get("max_rounds", 1) | |
if max_rounds is None: | |
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") | |
self._sequential_run(max_rounds=max_rounds) | |
output = self._get_output_from_state() | |
self.reset(full_reset=True, recursive=True, src_flow=self) | |
return output |