from typing import Dict, Any import os from flow_modules.Tachi67.ContentWriterFlowModule import ContentWriterFlow from aiflows.base_flows import CircularFlow class PlanWriterFlow(ContentWriterFlow): """This flow inherits from ContentWriterFlow. In the subflow of the executor, we specify the InteractivePlanGneFlow (https://huggingface.co/Tachi67/InteractivePlanGenFlowModule) *Input Interface*: - `goal` *Output Interface*: - `plan` - `result` - `summary` - `status` """ def _on_reach_max_round(self): self._state_update_dict({ "plan": "The maximum amount of rounds was reached before the model generated the plan.", "status": "unfinished" }) @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: command = output_payload["command"] if command == "finish": # ~~~ fetch temp file location, plan content, memory file (of upper level flow e.g. ExtLib) from flow state keys_to_fetch_from_state = ["temp_plan_file_location", "plan", "memory_files"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) temp_plan_file_location = fetched_state["temp_plan_file_location"] plan_content = fetched_state["plan"] plan_file_location = fetched_state["memory_files"]["plan"] # ~~~ delete the temp plan file ~~~ if os.path.exists(temp_plan_file_location): os.remove(temp_plan_file_location) # ~~~ write plan content to plan file ~~~ with open(plan_file_location, 'w') as file: file.write(plan_content) # ~~~ return the plan content ~~~ return { "EARLY_EXIT": True, "plan": plan_content, "summary": "ExtendLibrary/PlanWriter: " + output_payload["command_args"]["summary"], "status": "finished" } elif command == "manual_finish": # ~~~ delete the temp plan file ~~~ keys_to_fetch_from_state = ["temp_plan_file_location"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) temp_plan_file_location = fetched_state["temp_plan_file_location"] if os.path.exists(temp_plan_file_location): os.remove(temp_plan_file_location) # ~~~ return the manual quit status ~~~ return { "EARLY_EXIT": True, "plan": "no plan was generated", "summary": "ExtendLibrary/PlanWriter: PlanWriter was terminated explicitly by the user, process is unfinished", "status": "unfinished" } elif command == "write_plan": keys_to_fetch_from_state = ["memory_files"] fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) plan_file_location = fetched_state["memory_files"]["plan"] output_payload["command_args"]["plan_file_location"] = plan_file_location return output_payload else: return output_payload def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: # ~~~ sets the input_data in the flow_state dict ~~~ self._state_update_dict(update_data=input_data) max_rounds = self.flow_config.get("max_rounds", 1) if max_rounds is None: log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") self._sequential_run(max_rounds=max_rounds) output = self._get_output_from_state() self.reset(full_reset=True, recursive=True, src_flow=self) return output