Spaces:
Sleeping
Sleeping
| # Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # This code is adapted from OpenAI's release | |
| # https://github.com/openai/human-eval/blob/master/human_eval/execution.py | |
| import contextlib | |
| import faulthandler | |
| import io | |
| import multiprocessing | |
| import os | |
| import platform | |
| import signal | |
| import tempfile | |
| def check_correctness(program, test_inputs, test_outputs, timeout, task_id, completion_id): | |
| """ | |
| Evaluates the functional correctness of a completion by running the test | |
| suite provided in the problem. | |
| :param program: The program string to evaluate. | |
| :param test_inputs: A list of input strings to provide to the program via STDIN. | |
| :param test_outputs: A list of expected output strings from the program via STDOUT. | |
| :param timeout: Maximum execution time in seconds. | |
| :param task_id: ID of the task being evaluated. | |
| :param completion_id: Completion ID to match results later. | |
| """ | |
| manager = multiprocessing.Manager() | |
| results = manager.list() | |
| process = multiprocessing.Process(target=unsafe_execute, args=(program, test_inputs, test_outputs, results, timeout)) | |
| process.start() | |
| process.join(timeout=timeout + 1) | |
| if process.is_alive(): | |
| process.kill() | |
| results.append({"status": "timed out"}) | |
| return dict( | |
| task_id=task_id, | |
| passed=all(result.get("status") == "passed" for result in results), | |
| result=list(results), | |
| completion_id=completion_id, | |
| program=program, | |
| ) | |
| def unsafe_execute(program, test_inputs, test_outputs, results, timeout): | |
| """ | |
| Executes the program with redirected STDIN and compares its STDOUT to the expected output. | |
| :param program: The program string to execute. | |
| :param test_inputs: List of inputs to provide to the program via STDIN. | |
| :param test_outputs: List of expected outputs to compare with STDOUT. | |
| :param results: A multiprocessing.Manager().list() to store the execution results. | |
| :param timeout: Maximum execution time in seconds. | |
| """ | |
| with create_tempdir(): | |
| # These system calls are needed when cleaning up tempdir. | |
| import os | |
| import shutil | |
| rmtree = shutil.rmtree | |
| rmdir = os.rmdir | |
| chdir = os.chdir | |
| # Disable functionalities that can make destructive changes to the test. | |
| reliability_guard() | |
| # Run the program for each input-output pair. | |
| try: | |
| exec_globals = {} | |
| for test_input, test_output in zip(test_inputs, test_outputs): | |
| input_stream = io.StringIO(test_input) | |
| output_stream = io.StringIO() | |
| try: | |
| with swallow_io(input_stream, output_stream): | |
| with time_limit(timeout): | |
| exec(program, exec_globals) | |
| actual_output = output_stream.getvalue().strip() | |
| expected_output = test_output.strip() | |
| if actual_output == expected_output: | |
| results.append({"status": "passed", "input": test_input, "expected": test_output, "actual": actual_output}) | |
| else: | |
| results.append({"status": "failed", "input": test_input, "expected": test_output, "actual": actual_output}) | |
| except TimeoutException: | |
| results.append({"status": "timed out", "input": test_input}) | |
| except BaseException as e: | |
| results.append({"status": "failed", "input": test_input, "error": str(e)}) | |
| finally: | |
| # Restore system calls for cleaning up | |
| shutil.rmtree = rmtree | |
| os.rmdir = rmdir | |
| os.chdir = chdir | |
| def time_limit(seconds): | |
| def signal_handler(signum, frame): | |
| raise TimeoutException("Timed out!") | |
| signal.setitimer(signal.ITIMER_REAL, seconds) | |
| signal.signal(signal.SIGALRM, signal_handler) | |
| try: | |
| yield | |
| finally: | |
| signal.setitimer(signal.ITIMER_REAL, 0) | |
| def swallow_io(input_stream, output_stream): | |
| """ | |
| Redirects STDIN, STDOUT, and STDERR for isolated execution. | |
| """ | |
| with contextlib.redirect_stdout(output_stream): | |
| with contextlib.redirect_stderr(output_stream): | |
| with redirect_stdin(input_stream): | |
| yield | |
| def create_tempdir(): | |
| with tempfile.TemporaryDirectory() as dirname: | |
| with chdir(dirname): | |
| yield dirname | |
| class TimeoutException(Exception): | |
| pass | |
| class redirect_stdin(contextlib._RedirectStream): # type: ignore | |
| _stream = "stdin" | |
| def chdir(root): | |
| if root == ".": | |
| yield | |
| return | |
| cwd = os.getcwd() | |
| os.chdir(root) | |
| try: | |
| yield | |
| except BaseException as exc: | |
| raise exc | |
| finally: | |
| os.chdir(cwd) | |
| def reliability_guard(maximum_memory_bytes=None): | |
| """ | |
| This disables various destructive functions and prevents the generated code | |
| from interfering with the test (e.g. fork bomb, killing other processes, | |
| removing filesystem files, etc.) | |
| WARNING | |
| This function is NOT a security sandbox. Untrusted code, including, model- | |
| generated code, should not be blindly executed outside of one. See the | |
| Codex paper for more information about OpenAI's code sandbox, and proceed | |
| with caution. | |
| """ | |
| if maximum_memory_bytes is not None: | |
| import resource | |
| resource.setrlimit(resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes)) | |
| resource.setrlimit(resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes)) | |
| if not platform.uname().system == "Darwin": | |
| resource.setrlimit(resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes)) | |
| faulthandler.disable() | |
| import builtins | |
| builtins.exit = None | |
| builtins.quit = None | |
| import os | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| os.kill = None | |
| os.system = None | |
| # os.putenv = None | |
| os.remove = None | |
| os.removedirs = None | |
| os.rmdir = None | |
| os.fchdir = None | |
| os.setuid = None | |
| os.fork = None | |
| os.forkpty = None | |
| os.killpg = None | |
| os.rename = None | |
| os.renames = None | |
| os.truncate = None | |
| os.replace = None | |
| os.unlink = None | |
| os.fchmod = None | |
| os.fchown = None | |
| os.chmod = None | |
| os.chown = None | |
| os.chroot = None | |
| os.fchdir = None | |
| os.lchflags = None | |
| os.lchmod = None | |
| os.lchown = None | |
| # os.getcwd = None | |
| os.chdir = None | |
| import shutil | |
| shutil.rmtree = None | |
| shutil.move = None | |
| shutil.chown = None | |
| import subprocess | |
| subprocess.Popen = None # type: ignore | |
| __builtins__["help"] = None | |
| import sys | |
| sys.modules["ipdb"] = None | |
| sys.modules["joblib"] = None | |
| sys.modules["resource"] = None | |
| sys.modules["psutil"] = None | |
| sys.modules["tkinter"] = None | |