|
|
|
|
|
|
|
|
|
import logging |
|
import re |
|
from subprocess import Popen, PIPE, TimeoutExpired |
|
from typing import List, Tuple |
|
import threading |
|
|
|
log = logging.getLogger(__name__) |
|
lock = threading.Lock() |
|
|
|
def evaluate_solution_for_problem( |
|
candidate_solution, |
|
python_stub, |
|
hidden_tests_io=None, |
|
public_tests_io=None, |
|
timeout=10, |
|
debug=False, |
|
add_extra_imports=False, |
|
): |
|
with lock: |
|
"""See the readme for the output format of this function.""" |
|
if hidden_tests_io is None: |
|
hidden_tests_io = [] |
|
if public_tests_io is None: |
|
public_tests_io = [] |
|
|
|
if candidate_solution is None: |
|
results_dict = { |
|
"compilation_status": False, |
|
"compilation_error_message": "No code was provided.", |
|
"timeout_error": False, |
|
"hidden_tests_results": [ |
|
{ |
|
"status": False, |
|
"error_message": "No code was provided.", |
|
"generated_output": None, |
|
"input": test[0], |
|
"expected_output": test[1], |
|
} |
|
for test in hidden_tests_io |
|
], |
|
"public_tests_results": [ |
|
{ |
|
"status": False, |
|
"error_message": "No code was provided.", |
|
"generated_output": None, |
|
"input": test[0], |
|
"expected_output": test[1], |
|
} |
|
for test in public_tests_io |
|
], |
|
} |
|
return results_dict |
|
|
|
hidden_tests_results = check_correctness( |
|
candidate_solution, python_stub, hidden_tests_io, timeout, debug, add_extra_imports |
|
) |
|
public_tests_results = check_correctness( |
|
candidate_solution, python_stub, public_tests_io, timeout, debug, add_extra_imports |
|
) |
|
|
|
|
|
if len(hidden_tests_io) > 0 and len(public_tests_io) > 0: |
|
assert hidden_tests_results["compilation_status"] == public_tests_results["compilation_status"] |
|
|
|
compilation_status = True |
|
error_message = None |
|
timeout_error = False |
|
|
|
if len(hidden_tests_io) > 0: |
|
compilation_status = compilation_status and hidden_tests_results["compilation_status"] |
|
error_message = hidden_tests_results["error_message"] |
|
timeout_error = timeout_error or hidden_tests_results["timeout_error"] |
|
|
|
if len(public_tests_io) > 0: |
|
compilation_status = compilation_status and public_tests_results["compilation_status"] |
|
error_message = public_tests_results["error_message"] |
|
timeout_error = timeout_error or public_tests_results["timeout_error"] |
|
|
|
results_dict = { |
|
"compilation_status": compilation_status, |
|
"compilation_error_message": error_message, |
|
"timeout_error": timeout_error, |
|
"hidden_tests_results": hidden_tests_results["results"], |
|
"public_tests_results": public_tests_results["results"], |
|
} |
|
|
|
return results_dict |
|
|
|
|
|
def check_correctness( |
|
candidate_solution: str, |
|
python_stub: str, |
|
tests: List[Tuple[List[str], str]], |
|
timeout: int = 6000, |
|
debug=True, |
|
add_extra_imports=False, |
|
): |
|
compilation_status = True |
|
compilation_error = None |
|
results = [] |
|
timeout_occurred = False |
|
|
|
for idx, test in enumerate(tests): |
|
inp, out, expl = test |
|
result = one_test( |
|
candidate_solution, python_stub, inp, out, timeout=timeout, debug=debug, add_extra_imports=add_extra_imports |
|
) |
|
error_message = result["error_message"] |
|
|
|
if error_message is not None: |
|
if "syntaxerror" in error_message.lower(): |
|
compilation_status = False |
|
compilation_error = error_message |
|
if "timeout" in error_message.lower(): |
|
timeout_occurred = True |
|
results.append(result) |
|
|
|
if timeout_occurred: |
|
break |
|
|
|
if timeout_occurred: |
|
return { |
|
"compilation_status": True, |
|
"timeout_error": True, |
|
"error_message": "Timeout error.", |
|
"results": results, |
|
} |
|
|
|
return { |
|
"compilation_status": compilation_status, |
|
"timeout_error": False, |
|
"error_message": compilation_error, |
|
"results": results, |
|
} |
|
|
|
|
|
def one_test(candidate_solution, python_stub, inp, out, timeout=10, debug=False, add_extra_imports=False): |
|
python_stub = python_stub.strip() |
|
candidate_solution = candidate_solution.strip() |
|
|
|
out = out.replace("null", "None").replace("true", "True").replace("false", "False") |
|
|
|
|
|
class_def, signature = python_stub.split(" def ") |
|
class_name = class_def.split("class ")[1].strip().rstrip(":") |
|
func_name, _ = signature.split("(") |
|
|
|
|
|
first_param = r"^\w+\s\=\s" |
|
later_params = r",\s\w+\s\=\s" |
|
|
|
inp = re.sub(first_param, "", inp) |
|
inp = re.sub(later_params, ", ", inp) |
|
|
|
|
|
before_output = "AFTER THIS COMES OUR OWN GENERATED OUTPUT !@#!@!" |
|
after_output = "AFTER THIS COMES OUR VERDICT !@#!@!" |
|
|
|
if add_extra_imports: |
|
sol = f""" |
|
from collections import * |
|
from math import * |
|
import math |
|
from functools import * |
|
from heapq import * |
|
import heapq |
|
import itertools |
|
from itertools import * |
|
import bisect |
|
from bisect import * |
|
""" |
|
else: |
|
sol = "" |
|
|
|
sol += f""" |
|
from typing import List, Tuple, Optional |
|
{candidate_solution} |
|
sfohsdfdsfjhsdkfjhsdkjfh = {class_name}() |
|
res = sfohsdfdsfjhsdkfjhsdkjfh.{func_name}({inp}) |
|
|
|
def nested_list_convert(inp): |
|
try: |
|
try: |
|
inp = list(inp) |
|
except BaseException as e: |
|
return inp |
|
out = [] |
|
for i in inp: |
|
out.append(nested_list_convert(i)) |
|
except BaseException as e: |
|
return inp |
|
return out |
|
|
|
matching = False |
|
matching = matching or res == {out} |
|
matching = matching or nested_list_convert(res) == {out} |
|
matching = matching or nested_list_convert(res) == nested_list_convert({out}) |
|
matching = matching or str({out})==str(res).replace("{{","[").replace("(","[").replace("}}","]").replace(")","]") |
|
matching = matching or str({out})==str(res).replace("{{","[").replace("(","[").replace("}}","]").replace(")","]") |
|
print("res: ", res) |
|
print("out: ", {out}) |
|
print("{before_output}") |
|
print(res) |
|
print("{after_output}") |
|
print(matching) |
|
""" |
|
|
|
cmd = "python3" |
|
|
|
proc = Popen([cmd, "-c", sol], stdin=PIPE, stdout=PIPE, stderr=PIPE) |
|
|
|
result_object = {"input": inp, "expected_output": out.strip('"')} |
|
|
|
try: |
|
stdout, stderr = proc.communicate("", timeout=timeout) |
|
except TimeoutExpired as e: |
|
if debug: |
|
log.info(f"Timeout error, timeout={timeout}") |
|
result_object.update({"status": False, "error_message": "Timeout error.", "generated_output": None}) |
|
return result_object |
|
|
|
finally: |
|
proc.kill() |
|
|
|
stdout = stdout.decode() |
|
stderr = stderr.decode().lower() |
|
|
|
if stderr == "": |
|
|
|
stderr = None |
|
else: |
|
|
|
result_object.update(**{"status": False, "error_message": stderr, "generated_output": None}) |
|
return result_object |
|
|
|
try: |
|
generated_output = stdout.split(before_output)[1] |
|
generated_output, verdict = generated_output.split(after_output) |
|
result_object.update( |
|
**{ |
|
"status": verdict.strip() == "True", |
|
"error_message": stderr, |
|
"generated_output": generated_output.strip(), |
|
} |
|
) |
|
return result_object |
|
except IndexError as e: |
|
raise Exception(f"An unexpected error has occurred while parsing the following generated output: {stdout}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|