from typing import Dict, Any | |
from flow_modules.Tachi67.InterpreterFlowModule import InterpreterAtomicFlow | |
class PlanFileEditorAtomicFlow(InterpreterAtomicFlow): | |
def _process_interperter_inputs(self, input_data: Dict[str, Any]): | |
input_data['language'] = 'python' | |
file_location = input_data['file_location'] | |
plan_str = input_data['plan'] | |
input_data['code'] = f""" | |
import os | |
file_location = {repr(file_location)} | |
plan_str = {repr(plan_str)} | |
if os.path.isdir(file_location): | |
file_location = os.path.join(file_location, 'plan.txt') | |
with open(file_location, 'w') as file: | |
file.write(plan_str) | |
print('plan written to' + file_location)""" | |
def run( | |
self, | |
input_data: Dict[str, Any]): | |
self._process_interperter_inputs(input_data) | |
self._process_input_data(input_data) | |
response = self._call() | |
return {"plan_writer_output": response} |