|
from typing import Dict, Any |
|
|
|
from flows.base_flows import CircularFlow |
|
from flows.utils import logging |
|
|
|
from .ControllerAtomicFlow import ControllerAtomicFlow |
|
|
|
logging.set_verbosity_debug() |
|
log = logging.get_logger(__name__) |
|
|
|
|
|
class ControllerExecutorFlow(CircularFlow): |
|
def _on_reach_max_round(self): |
|
self._state_update_dict({ |
|
"answer": "The maximum amount of rounds was reached before the model found an answer.", |
|
"status": "unfinished" |
|
}) |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[str, Any]: |
|
command = output_payload["command"] |
|
if command == "finish": |
|
return { |
|
"EARLY_EXIT": True, |
|
"answer": output_payload["command_args"]["answer"], |
|
"status": "finished" |
|
} |
|
else: |
|
return output_payload |
|
|