WritePlanFlowModule / WritePlanFlow.py
Tachi67's picture
Upload 7 files
0660a86
raw
history blame
658 Bytes
from typing import Dict, Any
from flows.base_flows import SequentialFlow
from flows.utils import logging
from .PlanGeneratorAtomicFlow import PlanGeneratorAtomicFlow
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
class WritePlanFlow(SequentialFlow):
@SequentialFlow.output_msg_payload_processor
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: PlanGeneratorAtomicFlow) -> Dict[str, Any]:
if "plan" not in output_payload:
return {
"EARLY_EXIT": True,
"plan_writer_output": output_payload
}
else:
return output_payload