Spaces:
Sleeping
Sleeping
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# This code is adapted from OpenAI's release | |
# https://github.com/openai/human-eval/blob/master/human_eval/execution.py | |
import contextlib | |
import faulthandler | |
import io | |
import multiprocessing | |
import os | |
import platform | |
import signal | |
import tempfile | |
def check_correctness(program, test_inputs, test_outputs, timeout, task_id, completion_id): | |
""" | |
Evaluates the functional correctness of a completion by running the test | |
suite provided in the problem. | |
:param program: The program string to evaluate. | |
:param test_inputs: A list of input strings to provide to the program via STDIN. | |
:param test_outputs: A list of expected output strings from the program via STDOUT. | |
:param timeout: Maximum execution time in seconds. | |
:param task_id: ID of the task being evaluated. | |
:param completion_id: Completion ID to match results later. | |
""" | |
manager = multiprocessing.Manager() | |
results = manager.list() | |
process = multiprocessing.Process(target=unsafe_execute, args=(program, test_inputs, test_outputs, results, timeout)) | |
process.start() | |
process.join(timeout=timeout + 1) | |
if process.is_alive(): | |
process.kill() | |
results.append({"status": "timed out"}) | |
return dict( | |
task_id=task_id, | |
passed=all(result.get("status") == "passed" for result in results), | |
result=list(results), | |
completion_id=completion_id, | |
program=program, | |
) | |
def unsafe_execute(program, test_inputs, test_outputs, results, timeout): | |
""" | |
Executes the program with redirected STDIN and compares its STDOUT to the expected output. | |
:param program: The program string to execute. | |
:param test_inputs: List of inputs to provide to the program via STDIN. | |
:param test_outputs: List of expected outputs to compare with STDOUT. | |
:param results: A multiprocessing.Manager().list() to store the execution results. | |
:param timeout: Maximum execution time in seconds. | |
""" | |
with create_tempdir(): | |
# These system calls are needed when cleaning up tempdir. | |
import os | |
import shutil | |
rmtree = shutil.rmtree | |
rmdir = os.rmdir | |
chdir = os.chdir | |
# Disable functionalities that can make destructive changes to the test. | |
reliability_guard() | |
# Run the program for each input-output pair. | |
try: | |
exec_globals = {} | |
for test_input, test_output in zip(test_inputs, test_outputs): | |
input_stream = io.StringIO(test_input) | |
output_stream = io.StringIO() | |
try: | |
with swallow_io(input_stream, output_stream): | |
with time_limit(timeout): | |
exec(program, exec_globals) | |
actual_output = output_stream.getvalue().strip() | |
expected_output = test_output.strip() | |
if actual_output == expected_output: | |
results.append({"status": "passed", "input": test_input, "expected": test_output, "actual": actual_output}) | |
else: | |
results.append({"status": "failed", "input": test_input, "expected": test_output, "actual": actual_output}) | |
except TimeoutException: | |
results.append({"status": "timed out", "input": test_input}) | |
except BaseException as e: | |
results.append({"status": "failed", "input": test_input, "error": str(e)}) | |
finally: | |
# Restore system calls for cleaning up | |
shutil.rmtree = rmtree | |
os.rmdir = rmdir | |
os.chdir = chdir | |
def time_limit(seconds): | |
def signal_handler(signum, frame): | |
raise TimeoutException("Timed out!") | |
signal.setitimer(signal.ITIMER_REAL, seconds) | |
signal.signal(signal.SIGALRM, signal_handler) | |
try: | |
yield | |
finally: | |
signal.setitimer(signal.ITIMER_REAL, 0) | |
def swallow_io(input_stream, output_stream): | |
""" | |
Redirects STDIN, STDOUT, and STDERR for isolated execution. | |
""" | |
with contextlib.redirect_stdout(output_stream): | |
with contextlib.redirect_stderr(output_stream): | |
with redirect_stdin(input_stream): | |
yield | |
def create_tempdir(): | |
with tempfile.TemporaryDirectory() as dirname: | |
with chdir(dirname): | |
yield dirname | |
class TimeoutException(Exception): | |
pass | |
class redirect_stdin(contextlib._RedirectStream): # type: ignore | |
_stream = "stdin" | |
def chdir(root): | |
if root == ".": | |
yield | |
return | |
cwd = os.getcwd() | |
os.chdir(root) | |
try: | |
yield | |
except BaseException as exc: | |
raise exc | |
finally: | |
os.chdir(cwd) | |
def reliability_guard(maximum_memory_bytes=None): | |
""" | |
This disables various destructive functions and prevents the generated code | |
from interfering with the test (e.g. fork bomb, killing other processes, | |
removing filesystem files, etc.) | |
WARNING | |
This function is NOT a security sandbox. Untrusted code, including, model- | |
generated code, should not be blindly executed outside of one. See the | |
Codex paper for more information about OpenAI's code sandbox, and proceed | |
with caution. | |
""" | |
if maximum_memory_bytes is not None: | |
import resource | |
resource.setrlimit(resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes)) | |
resource.setrlimit(resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes)) | |
if not platform.uname().system == "Darwin": | |
resource.setrlimit(resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes)) | |
faulthandler.disable() | |
import builtins | |
builtins.exit = None | |
builtins.quit = None | |
import os | |
os.environ["OMP_NUM_THREADS"] = "1" | |
os.kill = None | |
os.system = None | |
# os.putenv = None | |
os.remove = None | |
os.removedirs = None | |
os.rmdir = None | |
os.fchdir = None | |
os.setuid = None | |
os.fork = None | |
os.forkpty = None | |
os.killpg = None | |
os.rename = None | |
os.renames = None | |
os.truncate = None | |
os.replace = None | |
os.unlink = None | |
os.fchmod = None | |
os.fchown = None | |
os.chmod = None | |
os.chown = None | |
os.chroot = None | |
os.fchdir = None | |
os.lchflags = None | |
os.lchmod = None | |
os.lchown = None | |
# os.getcwd = None | |
os.chdir = None | |
import shutil | |
shutil.rmtree = None | |
shutil.move = None | |
shutil.chown = None | |
import subprocess | |
subprocess.Popen = None # type: ignore | |
__builtins__["help"] = None | |
import sys | |
sys.modules["ipdb"] = None | |
sys.modules["joblib"] = None | |
sys.modules["resource"] = None | |
sys.modules["psutil"] = None | |
sys.modules["tkinter"] = None | |