#TODO: generalize updateplanatomicflow with the one in extendlibrary from typing import Dict, Any from aiflows.base_flows.atomic import AtomicFlow class UpdatePlanAtomicFlow(AtomicFlow): """This flow updates the plan file with the updated plan. The controller should pass the updated plan to this flow. This design (controller reflect on the existing plan--update plan) is intended to let the controller more aware of the plan it has. However one setback is that this process in then not deterministic. *Input Interface* - `updated_plan`: the updated plan in string format *Output Interface* - `result`: the result of the update plan operation """ def _check_input(self, input_data: Dict[str, Any]): assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml" assert "plan" in input_data["memory_files"], "plan not in memory_files" def _call(self, input_data: Dict[str, Any]): try: plan_file_location = input_data["memory_files"]["plan"] plan_to_write = input_data["updated_plan"] with open(plan_file_location, 'w') as file: file.write(plan_to_write + "\n") return { "result": "updated plan saved to the plan file and has overriden the previous plan", "summary": f"Coder/UpdatePlanFlow: updated plan saved to {plan_file_location}" } except Exception as e: return { "result": f"Error occurred: {str(e)}", "summary": f"Coder/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}" } def run( self, input_data: Dict[str, Any] ): self._check_input(input_data) return self._call(input_data)