Spaces:
Paused
Paused
| from __future__ import annotations | |
| import os | |
| import random | |
| import re | |
| import shutil | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| from types import ModuleType | |
| from typing import Any | |
| from typing import Sequence | |
| try: | |
| import gymnasium as gym | |
| from gymnasium import spaces | |
| except ImportError as exc: | |
| raise ImportError( | |
| "gymnasium is required for hpc_gym import with pip install gymnasium" | |
| ) from exc | |
| try: | |
| import pexpect | |
| except ImportError as exc: | |
| raise ImportError( | |
| "pexpect is required for hpc_gym import with pip install pexpect" | |
| ) from exc | |
| from sysadmin_env.sandbox import Sandbox | |
| from sysadmin_env.tasks import hpc_gpu_ecc | |
| from sysadmin_env.tasks import hpc_munge | |
| from sysadmin_env.tasks import hpc_nfs_stale | |
| from sysadmin_env.tasks import hpc_ood_apache | |
| from sysadmin_env.tasks import hpc_outage | |
| from sysadmin_env.tasks import hpc_pid_stale | |
| PROMPT_PATTERN = re.compile(r"\[[^\]\r\n]+\][#$]\s?") | |
| PRIMARY_HOSTNAME = "hpc-login" | |
| OOD_PORT = 8080 | |
| OOD_LOG_PATH = "/tmp/ood.log" | |
| OOD_DAEMON_SCRIPT = "/usr/local/bin/ood_server.py" | |
| DEFAULT_STEP_TIMEOUT = 60.0 | |
| DEFAULT_SHELL_TIMEOUT = 30.0 | |
| SCENARIO_REGISTRY: dict[str, ModuleType] = { | |
| hpc_outage.TASK_ID: hpc_outage, | |
| hpc_munge.TASK_ID: hpc_munge, | |
| hpc_pid_stale.TASK_ID: hpc_pid_stale, | |
| hpc_gpu_ecc.TASK_ID: hpc_gpu_ecc, | |
| hpc_nfs_stale.TASK_ID: hpc_nfs_stale, | |
| hpc_ood_apache.TASK_ID: hpc_ood_apache, | |
| } | |
| def resolve_scenario(name_or_module: str | ModuleType) -> ModuleType: | |
| if isinstance(name_or_module, ModuleType): | |
| return name_or_module | |
| if name_or_module in SCENARIO_REGISTRY: | |
| return SCENARIO_REGISTRY[name_or_module] | |
| raise KeyError( | |
| f"unknown scenario {name_or_module} expected one of {sorted(SCENARIO_REGISTRY)}" | |
| ) | |
| class EnterpriseHPCEnv(gym.Env): | |
| metadata = {"render_modes": []} | |
| def __init__( | |
| self, | |
| task_root: str | None = None, | |
| *, | |
| scenario: str | ModuleType = hpc_outage.TASK_ID, | |
| scenario_pool: Sequence[str | ModuleType] | None = None, | |
| overlay_base_dir: str | None = None, | |
| shell_timeout: float = DEFAULT_SHELL_TIMEOUT, | |
| step_timeout: float = DEFAULT_STEP_TIMEOUT, | |
| ) -> None: | |
| super().__init__() | |
| self.action_space = spaces.Text(max_length=4096) | |
| self.observation_space = spaces.Text(max_length=65536) | |
| self._configured_task_root = task_root | |
| self._overlay_base_dir = overlay_base_dir | |
| self._shell_timeout = shell_timeout | |
| self._step_timeout = step_timeout | |
| self._scenario_pool: list[ModuleType] | |
| if scenario_pool is not None: | |
| self._scenario_pool = [resolve_scenario(item) for item in scenario_pool] | |
| else: | |
| self._scenario_pool = [resolve_scenario(scenario)] | |
| self._scenario: ModuleType = self._scenario_pool[0] | |
| self._sandbox: Sandbox | None = None | |
| self._sandbox_scenario_id: str | None = None | |
| self._shell: pexpect.spawn | None = None | |
| self._tmp_task_dir: str | None = None | |
| self._step_count = 0 | |
| self._max_steps = 0 | |
| self._last_reward = 0.0 | |
| self._ood_started = False | |
| self._rng = random.Random() | |
| self._prev_health = 0.0 | |
| def sandbox(self) -> Sandbox | None: | |
| return self._sandbox | |
| def scenario(self) -> ModuleType: | |
| return self._scenario | |
| def reset( | |
| self, | |
| *, | |
| seed: int | None = None, | |
| options: dict[str, Any] | None = None, | |
| ) -> tuple[str, dict[str, Any]]: | |
| super().reset(seed=seed) | |
| if seed is not None: | |
| self._rng.seed(seed) | |
| self._select_scenario(options) | |
| self._close_shell() | |
| scenario_changed = self._sandbox_scenario_id != self._scenario.TASK_ID | |
| if self._sandbox is not None and scenario_changed: | |
| try: | |
| self._sandbox.destroy() | |
| except Exception as exc: | |
| print(f"hpc_gym sandbox destroy failed {type(exc).__name__.lower()} {exc}") | |
| self._sandbox = None | |
| task_root = self._ensure_task_root() | |
| if self._sandbox is None: | |
| print(f"hpc_gym create sandbox scenario {self._scenario.TASK_ID} task_root {task_root}") | |
| self._sandbox = Sandbox( | |
| task_root, | |
| timeout=self._step_timeout, | |
| isolate_network=False, | |
| overlay_base_dir=self._overlay_base_dir, | |
| allow_nested_sandbox=True, | |
| ) | |
| self._sandbox.create() | |
| self._sandbox_scenario_id = self._scenario.TASK_ID | |
| else: | |
| start = time.perf_counter() | |
| latency_ms = self._sandbox.reset() | |
| print( | |
| f"hpc_gym overlay reset scenario {self._scenario.TASK_ID} " | |
| f"{latency_ms:.2f}ms wall {((time.perf_counter()-start)*1000):.2f}ms" | |
| ) | |
| if self._sandbox.state_root is not None: | |
| self._scenario.synchronize(self._sandbox.state_root) | |
| definition = self._scenario.build_definition(str(self._sandbox.state_root or "")) | |
| self._max_steps = definition.metadata.max_steps | |
| self._step_count = 0 | |
| self._last_reward = 0.0 | |
| self._prev_health = 0.0 | |
| self._ood_started = False | |
| self._spawn_shell() | |
| self._bootstrap_primary_prompt() | |
| self._launch_ood_daemon() | |
| self._enter_login_node() | |
| observation = ( | |
| f"login node ready scenario {self._scenario.TASK_ID} ood :" | |
| f"{OOD_PORT} max_steps {self._max_steps}" | |
| ) | |
| info = { | |
| "task_id": self._scenario.TASK_ID, | |
| "max_steps": self._max_steps, | |
| "ood_port": OOD_PORT, | |
| "prompt_pattern": PROMPT_PATTERN.pattern, | |
| } | |
| return observation, info | |
| def step( | |
| self, action: str | |
| ) -> tuple[str, float, bool, bool, dict[str, Any]]: | |
| if self._shell is None or self._sandbox is None: | |
| raise RuntimeError("EnterpriseHPCEnv step called before reset") | |
| command = action if isinstance(action, str) else str(action) | |
| self._step_count += 1 | |
| self._shell.sendline(command) | |
| output = self._await_prompt(self._step_timeout) | |
| grade = self._scenario.grade(self._sandbox.state_root or Path(".")) | |
| health_delta = grade.health - self._prev_health | |
| self._prev_health = grade.health | |
| reward = health_delta | |
| self._last_reward = reward | |
| terminated = grade.done | |
| truncated = not terminated and self._step_count >= self._max_steps | |
| http_code = self._probe_ood_code() | |
| info: dict[str, Any] = { | |
| "task_id": self._scenario.TASK_ID, | |
| "step": self._step_count, | |
| "max_steps": self._max_steps, | |
| "reward_source": "grader", | |
| "command": command, | |
| "grader_health": grade.health, | |
| "grader_details": grade.details, | |
| "ood_http_code": http_code, | |
| } | |
| return output, reward, terminated, truncated, info | |
| def render(self) -> None: | |
| return None | |
| def close(self) -> None: | |
| self._close_shell() | |
| if self._sandbox is not None: | |
| try: | |
| self._sandbox.destroy() | |
| except Exception as exc: | |
| print(f"hpc_gym sandbox destroy failed {type(exc).__name__.lower()} {exc}") | |
| self._sandbox = None | |
| self._sandbox_scenario_id = None | |
| if self._tmp_task_dir is not None: | |
| shutil.rmtree(self._tmp_task_dir, ignore_errors=True) | |
| self._tmp_task_dir = None | |
| def __enter__(self) -> "EnterpriseHPCEnv": | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb) -> bool: | |
| self.close() | |
| return False | |
| def _select_scenario(self, options: dict[str, Any] | None) -> None: | |
| if options and "scenario" in options: | |
| self._scenario = resolve_scenario(options["scenario"]) | |
| return | |
| if len(self._scenario_pool) == 1: | |
| self._scenario = self._scenario_pool[0] | |
| return | |
| self._scenario = self._rng.choice(self._scenario_pool) | |
| def _ensure_task_root(self) -> Path: | |
| if self._configured_task_root is not None: | |
| root = Path(self._configured_task_root) | |
| if self._tmp_task_dir is None: | |
| root.mkdir(parents=True, exist_ok=True) | |
| else: | |
| if self._tmp_task_dir is not None: | |
| shutil.rmtree(self._tmp_task_dir, ignore_errors=True) | |
| self._tmp_task_dir = tempfile.mkdtemp(prefix="hpc_task_") | |
| root = Path(self._tmp_task_dir) | |
| self._scenario.prepare_filesystem(root) | |
| return root | |
| def _spawn_shell(self) -> None: | |
| if self._sandbox is None: | |
| raise RuntimeError("sandbox must be created before shell spawn") | |
| runtime_cmd = self._sandbox._build_runtime_command( | |
| "exec /bin/bash --noprofile --norc -i" | |
| ) | |
| env = { | |
| "PATH": "/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin", | |
| "HOME": "/root", | |
| "TERM": "xterm", | |
| "HOSTNAME": PRIMARY_HOSTNAME, | |
| "PS1": f"[root@{PRIMARY_HOSTNAME} \\W]\\$ ", | |
| "LANG": "C.UTF-8", | |
| } | |
| print(f"hpc_gym spawning pexpect runtime {runtime_cmd[0]}") | |
| self._shell = pexpect.spawn( | |
| runtime_cmd[0], | |
| runtime_cmd[1:], | |
| timeout=self._shell_timeout, | |
| encoding="utf-8", | |
| codec_errors="replace", | |
| env=env, | |
| ) | |
| self._shell.setecho(False) | |
| def _bootstrap_primary_prompt(self) -> None: | |
| if self._shell is None: | |
| raise RuntimeError("shell not spawned") | |
| # disable bracketed-paste mode (\e[?2004l) so terminal escape sequences | |
| # do not pollute command output and confuse the prompt regex | |
| self._shell.sendline( | |
| "printf '\\e[?2004l'; " | |
| f"export PS1='[root@{PRIMARY_HOSTNAME} \\W]\\$ '; " | |
| "export PROMPT_COMMAND=''; stty -echo 2>/dev/null; true" | |
| ) | |
| self._await_prompt(self._shell_timeout) | |
| def _find_python3_in_sandbox() -> str: | |
| """return the first python3 binary that exists on the host and will | |
| therefore be available inside the bwrap ro-bind at the same path.""" | |
| candidates = [ | |
| "/usr/bin/python3", | |
| "/usr/bin/python3.11", | |
| "/usr/bin/python3.12", | |
| "/usr/bin/python3.9", | |
| "/usr/bin/python", | |
| ] | |
| for c in candidates: | |
| if Path(c).exists(): | |
| return c | |
| return "python3" # fallback, may fail — caught by grace window | |
| def _launch_ood_daemon(self) -> None: | |
| if self._shell is None or self._sandbox is None: | |
| raise RuntimeError("shell or sandbox missing for ood launch") | |
| python3 = self._find_python3_in_sandbox() | |
| self._shell.sendline( | |
| f"nohup {python3} {OOD_DAEMON_SCRIPT} >{OOD_LOG_PATH} 2>&1 & disown; true" | |
| ) | |
| self._await_prompt(self._shell_timeout) | |
| for attempt in range(20): | |
| code = self._probe_ood_code() | |
| if code in {"200", "502"}: | |
| self._ood_started = True | |
| print(f"hpc_gym ood ready http_code {code} attempts {attempt + 1}") | |
| return | |
| time.sleep(0.1) | |
| print("hpc_gym ood did not respond within grace window proceeding anyway") | |
| def _enter_login_node(self) -> None: | |
| if self._shell is None: | |
| raise RuntimeError("shell not spawned") | |
| self._shell.sendline("ssh login") | |
| self._await_prompt(self._shell_timeout) | |
| def _await_prompt(self, timeout: float) -> str: | |
| if self._shell is None: | |
| raise RuntimeError("shell not spawned") | |
| try: | |
| self._shell.expect(PROMPT_PATTERN, timeout=timeout) | |
| before = self._shell.before or "" | |
| except pexpect.exceptions.TIMEOUT: | |
| before = self._shell.before or "" | |
| print("hpc_gym prompt timeout sending ctrl-c to recover") | |
| try: | |
| self._shell.sendcontrol("c") | |
| self._shell.expect(PROMPT_PATTERN, timeout=5) | |
| except Exception as exc: | |
| print(f"hpc_gym recovery failed {type(exc).__name__.lower()} {exc}") | |
| except pexpect.exceptions.EOF: | |
| before = self._shell.before or "" | |
| print("hpc_gym shell eof observed") | |
| return _strip_ansi(before).lstrip("\r\n") | |
| def _probe_ood_code(self) -> str: | |
| if self._sandbox is None: | |
| return "" | |
| probe = self._sandbox.execute( | |
| f"curl -s -o /dev/null -w '%{{http_code}}' http://127.0.0.1:{OOD_PORT}/", | |
| timeout=10.0, | |
| ) | |
| return (probe.stdout or "").strip() | |
| def _close_shell(self) -> None: | |
| if self._shell is None: | |
| return | |
| try: | |
| if self._shell.isalive(): | |
| self._shell.sendline("exit 0") | |
| try: | |
| self._shell.expect(pexpect.exceptions.EOF, timeout=2) | |
| except Exception: | |
| pass | |
| self._shell.close(force=True) | |
| except Exception as exc: | |
| print(f"hpc_gym shell close failed {type(exc).__name__.lower()} {exc}") | |
| self._shell = None | |
| _ANSI_RE = re.compile(r"\x1b\[[0-9;?]*[A-Za-z]") | |
| def _strip_ansi(text: str) -> str: | |
| return _ANSI_RE.sub("", text) | |
| def register_env() -> None: | |
| try: | |
| gym.register( | |
| id="EnterpriseHPC-v0", | |
| entry_point="hpc_gym:EnterpriseHPCEnv", | |
| max_episode_steps=hpc_outage.build_definition("").metadata.max_steps, | |
| ) | |
| except gym.error.Error as exc: | |
| print(f"hpc_gym register skipped {type(exc).__name__.lower()} {exc}") | |
| def main() -> None: | |
| env = EnterpriseHPCEnv(scenario_pool=list(SCENARIO_REGISTRY)) | |
| try: | |
| obs, info = env.reset(seed=0) | |
| print(f"reset observation {obs[:120]}") | |
| print(f"reset info {info}") | |
| obs, reward, terminated, truncated, info = env.step("sinfo") | |
| print(f"step reward {reward} terminated {terminated} truncated {truncated}") | |
| print(f"step info {info}") | |
| print(f"step observation\n{obs}") | |
| finally: | |
| env.close() | |
| if __name__ == "__main__": | |
| os.environ.setdefault("OOD_PORT", str(OOD_PORT)) | |
| main() | |