from typing import Dict, Any from flow_modules.Tachi67.InterpreterFlowModule import InterpreterAtomicFlow class PlanFileEditorAtomicFlow(InterpreterAtomicFlow): def _process_interperter_inputs(self, input_data: Dict[str, Any]): input_data['language'] = 'python' file_location = input_data['file_location'] plan_str = input_data['plan'] input_data['code'] = f""" import os file_location = {repr(file_location)} plan_str = {repr(plan_str)} if os.path.isdir(file_location): file_location = os.path.join(file_location, 'plan.txt') with open(file_location, 'w') as file: file.write(plan_str) print('plan written to' + file_location)""" def run( self, input_data: Dict[str, Any]): self._process_interperter_inputs(input_data) self._process_input_data(input_data) response = self._call() return {"plan_writer_output": response}