import os import time from typing import Dict, Any from flow_modules.Tachi67.InterpreterFlowModule import InterpreterAtomicFlow class CodeTestingAtomicFlow(InterpreterAtomicFlow): def _wait_for_file_update(self, file_location, last_check_time, check_interval=1): while True: time.sleep(check_interval) current_time = os.path.getmtime(file_location) if current_time != last_check_time: break def _prepare_code_and_lang(self, input_data: Dict[str, Any]): file_location = input_data["temp_code_file_location"] input_data["language"] = "python" with open(file_location, "r") as file: code_str = file.read() input_data["code"] = code_str def _check_input(self, input_data: Dict[str, Any]): assert "temp_code_file_location" in input_data, "temp_code_file_location not passed to CodeTestingAtomicFlow" assert "temp_code_file_written_timestamp" in input_data, "temp_code_file_written_timestamp not passed to CodeTestingAtomicFlow" def _delete_file(self, file_location): if os.path.exists(file_location): os.remove(file_location) def run( self, input_data: Dict[str, Any]): self._check_input(input_data) file_loc = input_data["temp_code_file_location"] last_timestamp = input_data["temp_code_file_written_timestamp"] self._wait_for_file_update(file_loc, last_timestamp) self._prepare_code_and_lang(input_data) response = self._call() self._delete_file(file_loc) return {"test_results": response}