import os import shlex import signal import subprocess from pathlib import Path from threading import Timer, Thread from unittest import ExtendedUnittest import gmpy2 from code_store import CodeStore from config import Config from exec_outcome import ExecOutcome from helper import convert_crlf_to_lf from job import JobData, LanguageError from prlimit import get_prlimit_str from resource_limit import ResourceLimits from runtime import Runtime from seccomp_filter import make_filter from settings import JavaClassNotFoundError class CompilationError(Exception): """Shows the compilation error message Args: Exception command list[str]: command to compile message str: compilation error message """ def __init__(self, command, message: subprocess.CalledProcessError): self.command = command self.message = message super().__init__(f"command: {self.command} produced: {self.message.stderr}") def init_validate_outputs(): _token_set = {"yes", "no", "true", "false"} PRECISION = gmpy2.mpfr(1e-12, 129) def validate_outputs(output1: str, output2: str) -> bool: # for space sensitive problems stripped string should match def validate_lines(lines1, lines2): validate_line = lambda lines: lines[0].strip() == lines[1].strip() if len(lines1) != len(lines2): return False return all(map(validate_line, zip(lines1, lines2))) if validate_lines(output1.strip().split("\n"), output2.strip().split("\n")): return True # lines didn't work so token matching tokens1, tokens2 = output1.strip().split(), output2.strip().split() if len(tokens1) != len(tokens2): return False for tok1, tok2 in zip(tokens1, tokens2): try: num1, num2 = gmpy2.mpfr(tok1, 129), gmpy2.mpfr(tok2, 129) if abs(num1 - num2) > PRECISION: return False except ValueError: if tok1.lower() in _token_set: tok1 = tok1.lower() if tok2.lower() in _token_set: tok2 = tok2.lower() if tok1 != tok2: return False return True return validate_outputs class MonitorThread(Thread): def __init__(self, proc): Thread.__init__(self) self.total_time = None self.peak_memory = None self.proc = proc self.clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"]) def run(self): while self.proc.poll() is None: # print(self.total_time, self.peak_memory) try: # print(f"/proc/{self.proc.pid}/stat", os.path.exists(f"/proc/{self.proc.pid}/stat")) # print(f"/proc/{self.proc.pid}/status", os.path.exists(f"/proc/{self.proc.pid}/status")) # print(self.total_time, self.peak_memory) with open(f"/proc/{self.proc.pid}/stat") as pid_stat: vals = pid_stat.read().split() self.total_time = ( float(vals[13]) + float(vals[14]) + float(vals[15]) + float(vals[16]) ) / self.clk_tck # adding user time and sys time, also childs utime, stime with open(f"/proc/{self.proc.pid}/status") as pid_status: vm_peak_line = [l for l in pid_status if l.startswith("VmPeak:")] if len(vm_peak_line) == 0: continue vm_peak_line = vm_peak_line[0] self.peak_memory = vm_peak_line.split(":")[-1].strip() except (FileNotFoundError, ProcessLookupError): pass class ExecutionEngine: def __init__( self, cfg: Config, limits_by_lang: dict[str, ResourceLimits], run_ids: tuple[int, int], logger, ) -> None: self.code_store = CodeStore(cfg.code_store, run_ids) self.supported_languages: dict[str, Runtime] = dict() self.output_validator = init_validate_outputs() for lang, sup_cfg in cfg.supported_languages.items(): self.supported_languages[lang] = Runtime(sup_cfg) self.run_uid = run_ids[1] self.run_gid = run_ids[0] self.socket_filter = make_filter(["socket"]) self.logger = logger self.limits_by_lang = limits_by_lang self.exec_env = os.environ.copy() self.exec_env["GOCACHE"] = str(self.code_store._source_dir.resolve()) def start(self): self.code_store.create() def stop(self): self.code_store.destroy() def _compile(self, command: str) -> subprocess.CompletedProcess: return subprocess.run( shlex.split(command), user=self.run_uid, group=self.run_gid, capture_output=True, cwd=self.code_store._source_dir, env=self.exec_env, timeout=60, ) def _get_executable_after_compile( self, lang: str, source_file: Path, cmd: str | None = None, flags: str | None = None, ) -> tuple[str | Path, bool]: if not self.supported_languages[lang].is_compiled_language: return source_file, False compile_str, executable = self.supported_languages[lang].compile( source_file, cmd, flags ) try: cp = self._compile(compile_str) except subprocess.TimeoutExpired as e: return f"{e}", True if cp.returncode == 0: return executable, False return cp.stderr.decode(errors="ignore"), True def get_executor( self, job: JobData, limits: ResourceLimits ) -> tuple[str | Path | LanguageError, int]: language = job.language if language is None: return LanguageError("Language must be selected to execute a code."), -1 if language not in self.supported_languages: return LanguageError(f"Support for {language} is not implemented."), -1 source_code = convert_crlf_to_lf(job.source_code) if self.supported_languages[language].has_sanitizer and job.use_sanitizer: source_code = self.supported_languages[language].sanitize(source_code) source_path = self.supported_languages[language].get_file_path(source_code) if isinstance(source_path, JavaClassNotFoundError): return source_path, -1 source_path = self.code_store.write_source_code(source_code, source_path) executable, err = self._get_executable_after_compile( language, source_path, cmd=job.compile_cmd, flags=job.compile_flags ) if err: return executable, -1 execute_flags = job.execute_flags if self.supported_languages[language].extend_mem_for_vm: if limits._as != -1: if execute_flags is None: execute_flags = f" -{self.supported_languages[language].extend_mem_flag_name}{limits._as} " else: execute_flags += f" -{self.supported_languages[language].extend_mem_flag_name}{limits._as} " return ( self.supported_languages[language].execute( executable, cmd=job.execute_cmd, flags=execute_flags ), self.supported_languages[language].timelimit_factor, ) def check_output_match(self, job: JobData) -> list[ExtendedUnittest]: limits = job.limits if limits is None: limits = ResourceLimits() limits.update(self.limits_by_lang[job.language]) executor, timelimit_factor = self.get_executor(job, limits) # raise CompilationError(e.args, e) if timelimit_factor == -1: result = executor if isinstance(executor, (LanguageError, JavaClassNotFoundError)): result = executor.msg elif not isinstance(result, str): result = "Some bug in ExecEval, please do report." return [ ExtendedUnittest( input="", output=[], result=result, exec_outcome=ExecOutcome.COMPILATION_ERROR, ) ] # if language uses vm then add extra 1gb smemory for the parent vm program to run if ( self.supported_languages[job.language].extend_mem_for_vm and limits._as != -1 ): limits._as += 2**30 # executor = f"timeout -k {limits.cpu} -s 9 {limits.cpu * timelimit_factor + 0.5} {get_prlimit_str(limits)} {executor}" executor = f"{get_prlimit_str(limits)} {executor}" new_test_cases = job.unittests.copy() self.logger.debug( f"Execute with gid={self.run_gid}, uid={self.run_uid}: {executor}" ) for key, tc in enumerate(job.unittests): result, exec_outcome = None, None outs, errs = None, None syscall_filter_loaded = False def preexec_fn(): nonlocal syscall_filter_loaded if job.block_network: self.socket_filter.load() syscall_filter_loaded = True with subprocess.Popen( shlex.split(executor), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, user=self.run_uid, group=self.run_gid, preexec_fn=preexec_fn, cwd=self.code_store._source_dir.resolve(), env=self.exec_env, start_new_session=True, ) as child_process: monitor = MonitorThread(child_process) monitor.start() def handler(): if child_process.poll() is None: child_process.kill() timer = Timer(limits.cpu * timelimit_factor + 1, handler) timer.start() tot_time, peak_mem = None, None # self.logger.debug(f"PID: {child_process.pid}") try: outs, errs = child_process.communicate( tc.input.encode("ascii"), timeout=limits.cpu * timelimit_factor ) timer.cancel() except subprocess.TimeoutExpired: exec_outcome = ExecOutcome.TIME_LIMIT_EXCEEDED except subprocess.CalledProcessError: exec_outcome = ExecOutcome.RUNTIME_ERROR if errs is not None: result = errs.decode(errors="ignore").strip() finally: timer.cancel() child_process.kill() child_process.communicate() child_process.wait() monitor.join() if syscall_filter_loaded: self.socket_filter.reset() if exec_outcome is None: if child_process.returncode == 0 and outs is not None: result = outs.decode(errors="ignore").strip() exec_outcome = ( ExecOutcome.PASSED if any( self.output_validator(output, result) for output in tc.output ) else ExecOutcome.WRONG_ANSWER ) elif errs is not None and len(errs) != 0: exec_outcome = ExecOutcome.RUNTIME_ERROR errs = errs.decode(errors="ignore") if ( "out of memory" in errs.lower() or "bad_alloc" in errs.lower() or "bad alloc" in errs.lower() or "memoryerror" in errs.lower() ): exec_outcome = ExecOutcome.MEMORY_LIMIT_EXCEEDED if child_process.returncode > 0: result = errs else: result = f"Process exited with code {-child_process.returncode}, {signal.strsignal(-child_process.returncode)} stderr: {errs}" else: exec_outcome = ExecOutcome.MEMORY_LIMIT_EXCEEDED if outs is not None: result = outs.decode(errors="ignore").strip() elif errs is not None: result = errs.decode(errors="ignore").strip() else: self.logger.debug( "**************** MEMORY_LIMIT_EXCEEDED assigned but no stdout or stderr" ) new_test_cases[key].update_time_mem(monitor.total_time, monitor.peak_memory) new_test_cases[key].update_result(result) new_test_cases[key].update_exec_outcome(exec_outcome) if job.stop_on_first_fail and exec_outcome is not ExecOutcome.PASSED: break return new_test_cases if __name__ == "__main__": class Test: file: str lang: str def __init__(self, file, lang): self.file = file self.lang = lang tests = [ Test("execution_engine/test_codes/test.c", "GNU C"), Test("execution_engine/test_codes/test.cpp", "GNU C++17"), Test("execution_engine/test_codes/test.go", "Go"), Test("execution_engine/test_codes/test.js", "Node js"), Test("execution_engine/test_codes/test.php", "PHP"), Test("execution_engine/test_codes/test.py", "PyPy 3"), Test("execution_engine/test_codes/test.py", "Python 3"), Test("execution_engine/test_codes/test.rb", "Ruby"), Test("execution_engine/test_codes/test.rs", "Rust"), Test("execution_engine/test_codes/test.java", "Java 7"), Test("execution_engine/test_codes/test.kt", "Kotlin"), ] unittests = [ ExtendedUnittest("1 1", ["2"]), ExtendedUnittest("1 3", ["4"]), ExtendedUnittest("-1 2", ["1"]), ExtendedUnittest("122 2", ["124"]), ] from config import load_config from job import JobData from resource_limit import ResourceLimits cfg = load_config(Path("execution_engine/config.yaml")) ce = ExecutionEngine(cfg) for t in tests: with open(t.file) as f: s = f.read() updated_unittests = ce.check_output_match( JobData( language=t.lang, source_code=s, unittests=unittests, limits=ResourceLimits(), ) ) print(f"{t.lang} got: \n", updaed_unittests)