|
import os |
|
import time |
|
import subprocess |
|
from typing import Dict, Any |
|
from flow_modules.aiflows.InterpreterFlowModule import InterpreterAtomicFlow |
|
|
|
|
|
class CodeTestingAtomicFlow(InterpreterAtomicFlow): |
|
"""This class inherits from InterpreterAtomicFlow and is used to test code. |
|
|
|
*Input Interface*: |
|
- `temp_code_file_location`: Location of the file containing the code to be tested. |
|
|
|
*Output Interface*: |
|
- `feedback`: Feedback from the test (i.e. test results). |
|
""" |
|
def _open_file_and_wait_for_file_update(self, file_location, check_interval=1): |
|
process = subprocess.Popen(["code", "--wait", file_location]) |
|
while True: |
|
if process.poll() is not None: |
|
break |
|
time.sleep(check_interval) |
|
def _prepare_code_and_lang(self, input_data: Dict[str, Any]): |
|
file_location = input_data["temp_code_file_location"] |
|
input_data["language"] = "python" |
|
with open(file_location, "r") as file: |
|
code_str = file.read() |
|
input_data["code"] = code_str |
|
|
|
def _check_input(self, input_data: Dict[str, Any]): |
|
assert "temp_code_file_location" in input_data, "temp_code_file_location not passed to CodeTestingAtomicFlow" |
|
|
|
def _delete_file(self, file_location): |
|
if os.path.exists(file_location): |
|
os.remove(file_location) |
|
|
|
def run( |
|
self, |
|
input_data: Dict[str, Any]): |
|
self._check_input(input_data) |
|
file_loc = input_data["temp_code_file_location"] |
|
self._open_file_and_wait_for_file_update(file_loc) |
|
self._prepare_code_and_lang(input_data) |
|
self._process_input_data(input_data) |
|
response = self._call() |
|
self._delete_file(file_loc) |
|
if response.strip() == '': |
|
response = "test passed" |
|
return {"feedback": response} |
|
|