maris-ai-master / core-python /maris_core /code /execution_eval.py
MarisUK's picture
Maris AI model sync
f440f03 verified
"""Execution-based code evaluation helpers for coder benchmarks."""
from __future__ import annotations
import math
import os
import re
import shutil
import sqlite3
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
try:
import resource
except ImportError: # pragma: no cover - non-POSIX fallback
resource = None # type: ignore[assignment]
_CODE_BLOCK_RE = re.compile(r"```(?P<lang>[^\n`]*)\n(?P<code>.*?)```", re.DOTALL)
DEFAULT_EXECUTION_MEMORY_LIMIT_MB = 512
DEFAULT_EXECUTION_MAX_OUTPUT_CHARS = 12_000
@dataclass(frozen=True, slots=True)
class CodeExecutionSpec:
language: str
test_code: str = ""
timeout_seconds: float = 8.0
compile_only: bool = False
memory_limit_mb: int = DEFAULT_EXECUTION_MEMORY_LIMIT_MB
max_output_chars: int = DEFAULT_EXECUTION_MAX_OUTPUT_CHARS
@dataclass(frozen=True, slots=True)
class CodeExecutionResult:
language: str
available: bool
passed: bool
summary: str
exit_code: int | None = None
stdout: str = ""
stderr: str = ""
def extract_code_block(text: str, language: str | None = None) -> str:
matches = list(_CODE_BLOCK_RE.finditer(text))
if not matches:
return text.strip()
normalized_language = (language or "").strip().lower()
if normalized_language:
for match in matches:
fence_language = match.group("lang").strip().lower()
if fence_language == normalized_language:
return match.group("code").strip()
return matches[0].group("code").strip()
def evaluate_code_response(response_text: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
language = spec.language.strip().lower()
code = extract_code_block(response_text, language=language)
if not code:
return CodeExecutionResult(
language=language,
available=True,
passed=False,
summary="Atbildē nav atrasts izpildāms koda bloks.",
)
if language == "python":
python_path = shutil.which("python3") or shutil.which("python")
if python_path is None:
return _unsupported_language_result(language, "python nav pieejams.")
command = (
[python_path, "-I", "-B", "-s", "main.py"]
if not spec.compile_only
else [python_path, "-I", "-B", "-s", "-m", "py_compile", "main.py"]
)
return _run_script_eval(
language=language,
command=command,
file_name="main.py",
source=_build_source(code, spec.test_code, "#"),
spec=spec,
)
if language in {"javascript", "js"}:
node_path = shutil.which("node")
if node_path is None:
return _unsupported_language_result(language, "node nav pieejams.")
command = (
[node_path, "main.js"] if not spec.compile_only else [node_path, "--check", "main.js"]
)
return _run_script_eval(
language=language,
command=command,
file_name="main.js",
source=_build_source(code, spec.test_code, "//"),
spec=spec,
)
if language in {"typescript", "ts"}:
return _run_typescript_eval(code, spec)
if language in {"bash", "sh"}:
bash_path = shutil.which("bash")
if bash_path is None:
return _unsupported_language_result(language, "bash nav pieejams.")
command = [bash_path, "main.sh"] if not spec.compile_only else [bash_path, "-n", "main.sh"]
return _run_script_eval(
language=language,
command=command,
file_name="main.sh",
source=_build_source(code, spec.test_code, "#"),
spec=spec,
)
if language == "rust":
return _run_rust_eval(code, spec)
if language == "sql":
return _run_sql_eval(code, spec)
return _unsupported_language_result(
language, "Valoda execution evals režīmā vēl nav atbalstīta."
)
def _build_source(code: str, test_code: str, comment_prefix: str) -> str:
source = code.strip()
tests = test_code.strip()
if not tests:
return source + "\n"
return f"{source}\n\n{comment_prefix} execution harness\n{tests}\n"
def _run_script_eval(
*,
language: str,
command: list[str],
file_name: str,
source: str,
spec: CodeExecutionSpec,
) -> CodeExecutionResult:
with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir:
workspace = Path(tmp_dir)
file_path = workspace / file_name
file_path.write_text(source, encoding="utf-8")
result = _run_command(command, cwd=workspace, spec=spec, language=language)
if result is None:
return CodeExecutionResult(
language=language,
available=True,
passed=True,
summary=f"{language} kods izpildījās veiksmīgi.",
exit_code=0,
)
return result
def _run_typescript_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
tsc_path = shutil.which("tsc")
if tsc_path is None:
return _unsupported_language_result("typescript", "tsc nav pieejams.")
node_path = shutil.which("node")
if not spec.compile_only and node_path is None:
return _unsupported_language_result("typescript", "node nav pieejams TypeScript izpildei.")
with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir:
workspace = Path(tmp_dir)
source_path = workspace / "main.ts"
source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8")
compile_result = _run_command(
[
tsc_path,
"--pretty",
"false",
"--target",
"ES2020",
"--module",
"commonjs",
"main.ts",
],
cwd=workspace,
spec=spec,
language="typescript",
)
if compile_result is not None:
return compile_result
if spec.compile_only:
return CodeExecutionResult(
language="typescript",
available=True,
passed=True,
summary="TypeScript kods veiksmīgi sakompilējās.",
exit_code=0,
)
run_result = _run_command(
[node_path, "main.js"], cwd=workspace, spec=spec, language="typescript"
)
if run_result is None:
return CodeExecutionResult(
language="typescript",
available=True,
passed=True,
summary="TypeScript kods veiksmīgi sakompilējās un izpildījās.",
exit_code=0,
)
return run_result
def _run_rust_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
rustc_path = shutil.which("rustc")
if rustc_path is None:
return _unsupported_language_result("rust", "rustc nav pieejams.")
with tempfile.TemporaryDirectory(prefix="maris-code-eval-") as tmp_dir:
workspace = Path(tmp_dir)
source_path = workspace / "main.rs"
binary_path = workspace / "main"
source_path.write_text(_build_source(code, spec.test_code, "//"), encoding="utf-8")
compile_result = _run_command(
[rustc_path, "main.rs", "-o", str(binary_path)],
cwd=workspace,
spec=spec,
language="rust",
)
if compile_result is not None:
return compile_result
if spec.compile_only:
return CodeExecutionResult(
language="rust",
available=True,
passed=True,
summary="Rust kods veiksmīgi sakompilējās.",
exit_code=0,
)
run_result = _run_command([str(binary_path)], cwd=workspace, spec=spec, language="rust")
if run_result is None:
return CodeExecutionResult(
language="rust",
available=True,
passed=True,
summary="Rust kods veiksmīgi sakompilējās un izpildījās.",
exit_code=0,
)
return run_result
def _run_sql_eval(code: str, spec: CodeExecutionSpec) -> CodeExecutionResult:
try:
with tempfile.TemporaryDirectory(prefix="maris-sql-eval-") as tmp_dir:
workspace = Path(tmp_dir)
connection = sqlite3.connect(":memory:")
try:
connection.execute("PRAGMA foreign_keys = ON")
script = _build_sql_script(code, spec.test_code, compile_only=spec.compile_only)
connection.executescript(script)
finally:
connection.close()
workspace.mkdir(parents=True, exist_ok=True)
except sqlite3.Error as exc:
return CodeExecutionResult(
language="sql",
available=True,
passed=False,
summary="SQL execution eval neizdevās.",
stderr=str(exc),
)
return CodeExecutionResult(
language="sql",
available=True,
passed=True,
summary="SQL skripts veiksmīgi validējās un izpildījās.",
exit_code=0,
)
def _build_sql_script(code: str, test_code: str, *, compile_only: bool) -> str:
candidate = code.strip().rstrip(";")
harness = test_code.strip()
if harness and "{{CODE}}" in harness:
return harness.replace("{{CODE}}", candidate)
if compile_only:
if harness:
return f"{harness}\nEXPLAIN QUERY PLAN {candidate};\n"
return f"EXPLAIN QUERY PLAN {candidate};\n"
if harness:
return f"{harness}\n{candidate};\n"
return candidate + ";\n"
def _run_command(
command: list[str],
*,
cwd: Path,
spec: CodeExecutionSpec,
language: str,
) -> CodeExecutionResult | None:
try:
completed = subprocess.run( # noqa: S603
command,
cwd=str(cwd),
check=False,
capture_output=True,
text=True,
timeout=spec.timeout_seconds,
stdin=subprocess.DEVNULL,
env=_build_isolated_env(cwd),
preexec_fn=_build_subprocess_preexec(spec),
)
except subprocess.TimeoutExpired as exc:
return CodeExecutionResult(
language=language,
available=True,
passed=False,
summary="Execution eval pārsniedza laika limitu.",
stdout=_truncate_output(exc.stdout or "", spec.max_output_chars),
stderr=_truncate_output(exc.stderr or "", spec.max_output_chars),
)
if completed.returncode == 0:
return None
return CodeExecutionResult(
language=language,
available=True,
passed=False,
summary="Execution eval neizdevās.",
exit_code=completed.returncode,
stdout=_truncate_output(completed.stdout, spec.max_output_chars),
stderr=_truncate_output(completed.stderr, spec.max_output_chars),
)
def _build_isolated_env(workspace: Path) -> dict[str, str]:
env: dict[str, str] = {
"HOME": str(workspace),
"TMPDIR": str(workspace),
"TEMP": str(workspace),
"TMP": str(workspace),
"PYTHONNOUSERSITE": "1",
"PYTHONDONTWRITEBYTECODE": "1",
"PYTHONIOENCODING": "utf-8",
"NODE_DISABLE_COLORS": "1",
"CI": "1",
}
for key in ("PATH", "SYSTEMROOT", "SystemRoot", "WINDIR", "ComSpec"):
value = os.environ.get(key)
if value:
env[key] = value
return env
def _build_subprocess_preexec(spec: CodeExecutionSpec):
if os.name != "posix" or resource is None:
return None
memory_limit_bytes = max(spec.memory_limit_mb, 64) * 1024 * 1024
cpu_limit_seconds = max(2, math.ceil(spec.timeout_seconds) + 1)
def _apply_limits() -> None:
os.setsid()
resource.setrlimit(resource.RLIMIT_CPU, (cpu_limit_seconds, cpu_limit_seconds))
resource.setrlimit(resource.RLIMIT_AS, (memory_limit_bytes, memory_limit_bytes))
resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
resource.setrlimit(resource.RLIMIT_FSIZE, (8 * 1024 * 1024, 8 * 1024 * 1024))
resource.setrlimit(resource.RLIMIT_NOFILE, (64, 64))
if hasattr(resource, "RLIMIT_NPROC"):
resource.setrlimit(resource.RLIMIT_NPROC, (32, 32))
return _apply_limits
def _truncate_output(value: str, max_chars: int) -> str:
if len(value) <= max_chars:
return value
return value[:max_chars] + "\n...[truncated]"
def _unsupported_language_result(language: str, reason: str) -> CodeExecutionResult:
return CodeExecutionResult(
language=language,
available=False,
passed=False,
summary=reason,
)