Spaces:
Paused
Paused
| from __future__ import annotations | |
| import json | |
| import re | |
| import stat | |
| from pathlib import Path | |
| from sysadmin_env.models import DiagnosticTrigger | |
| from sysadmin_env.models import DifficultyTier | |
| from sysadmin_env.models import TaskMetadata | |
| from sysadmin_env.models import TaskScenarioDefinition | |
| from sysadmin_env.models import TaskScenarioState | |
| from sysadmin_env.tasks import hpc_outage | |
| TASK_ID = "hpc_munge" | |
| COMPLETION_HEALTH = 1.0 | |
| SHARED_STATE_PATH = hpc_outage.SHARED_STATE_PATH | |
| NODES_ROOT = hpc_outage.NODES_ROOT | |
| COMPUTE_ROOT = hpc_outage.COMPUTE_ROOT | |
| MUNGE_KEY_RELATIVE = Path("etc/munge/munge.key") | |
| MUNGE_KEY_PATH = COMPUTE_ROOT / MUNGE_KEY_RELATIVE | |
| EXPECTED_KEY_MODE = 0o400 | |
| EXPECTED_KEY_BYTES = b"MUNGE_KEY_" + b"A" * 54 + b"\n" | |
| INITIAL_STATE: dict = { | |
| "cluster": "rocky-hpc", | |
| "cores_total": hpc_outage.CLUSTER_CORES_TOTAL, | |
| "cores_per_node": hpc_outage.CLUSTER_CORES_PER_NODE, | |
| "partitions": { | |
| "compute": {"nodes": ["compute-01"], "default": True}, | |
| }, | |
| "nodes": { | |
| "login": { | |
| "state": "up", | |
| "reason": "", | |
| "cores": hpc_outage.CLUSTER_CORES_PER_NODE, | |
| }, | |
| "compute-01": { | |
| "state": "drain", | |
| "reason": "munge authentication failed", | |
| "cores": hpc_outage.CLUSTER_CORES_PER_NODE, | |
| }, | |
| }, | |
| "services": { | |
| "slurmd@login": "active", | |
| "slurmd@compute-01": "failed", | |
| "slurmctld@login": "active", | |
| "munge@compute-01": "failed", | |
| "munge@login": "active", | |
| }, | |
| "jobs": [ | |
| { | |
| "id": 8421, | |
| "name": "cfd_simulation", | |
| "user": "engineer", | |
| "state": "PD", | |
| "partition": "compute", | |
| "nodes": "(AuthFail)", | |
| "time": "0:00", | |
| }, | |
| ], | |
| } | |
| def build_definition(base_filesystem_path: str) -> TaskScenarioDefinition: | |
| metadata = TaskMetadata( | |
| task_id=TASK_ID, | |
| difficulty=DifficultyTier.hard, | |
| description="slurm compute node draining due to munge key permission fault and broken route", | |
| max_steps=90, | |
| time_limit=600.0, | |
| base_filesystem_path=base_filesystem_path, | |
| ) | |
| return TaskScenarioDefinition( | |
| metadata=metadata, | |
| requires_network_isolation=False, | |
| allows_nested_sandbox=True, | |
| diagnostic_triggers=diagnostic_triggers(), | |
| ) | |
| def diagnostic_triggers() -> list[DiagnosticTrigger]: | |
| return [ | |
| DiagnosticTrigger( | |
| fact_id="cluster_queue_inspected", | |
| command_patterns=[r"\bsinfo\b", r"\bsqueue\b"], | |
| reward=0.06, | |
| ), | |
| DiagnosticTrigger( | |
| fact_id="compute_node_entered", | |
| command_patterns=[r"\bssh\s+compute-01\b"], | |
| reward=0.07, | |
| ), | |
| DiagnosticTrigger( | |
| fact_id="munge_key_inspected", | |
| command_patterns=[r"ls\s+-l\s+.+munge", r"stat\s+.+munge\.key", r"cat\s+.+munge\.key"], | |
| reward=0.05, | |
| ), | |
| DiagnosticTrigger( | |
| fact_id="munge_service_checked", | |
| command_patterns=[r"systemctl\s+status\s+munge", r"systemctl\s+is-failed\s+munge"], | |
| reward=0.05, | |
| ), | |
| DiagnosticTrigger( | |
| fact_id="ood_portal_probed", | |
| command_patterns=[r"curl\s+.+localhost:8080", r"curl\s+.+127\.0\.0\.1:8080"], | |
| reward=0.05, | |
| ), | |
| ] | |
| def prepare_filesystem(root: str | Path) -> None: | |
| root_path = Path(root) | |
| hpc_outage.prepare_filesystem(root_path) | |
| _write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE) | |
| (root_path / COMPUTE_ROOT / "etc/munge").mkdir(parents=True, exist_ok=True) | |
| key_path = root_path / MUNGE_KEY_PATH | |
| key_path.write_bytes(EXPECTED_KEY_BYTES) | |
| key_path.chmod(0o644) | |
| def inject_fault(root: str | Path) -> None: | |
| prepare_filesystem(root) | |
| def observe_command(root: str | Path, command: str, _result) -> None: | |
| _ = Path(root) | |
| _ = command | |
| def synchronize(root: str | Path) -> None: | |
| root_path = Path(root) | |
| if not (root_path / SHARED_STATE_PATH).exists(): | |
| _write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE) | |
| def grade(root: str | Path) -> TaskScenarioState: | |
| root_path = Path(root) | |
| key_path = root_path / MUNGE_KEY_PATH | |
| key_locked_down = _key_mode_matches(key_path) | |
| key_contents_intact = _key_contents_match(key_path) | |
| munge_key_fixed = key_locked_down and key_contents_intact | |
| state_doc = _read_state(root_path / SHARED_STATE_PATH) | |
| node_state = ( | |
| state_doc.get("nodes", {}) | |
| .get("compute-01", {}) | |
| .get("state", "") | |
| ) | |
| munge_service = ( | |
| state_doc.get("services", {}).get("munge@compute-01", "") | |
| ) | |
| slurmd_service = ( | |
| state_doc.get("services", {}).get("slurmd@compute-01", "") | |
| ) | |
| auth_restored = munge_service == "active" | |
| node_idle = node_state == "idle" and slurmd_service == "active" | |
| health = 0.0 | |
| if munge_key_fixed: | |
| health += 0.3 | |
| if auth_restored: | |
| health += 0.3 | |
| if node_idle: | |
| health = COMPLETION_HEALTH | |
| done = munge_key_fixed and auth_restored and node_idle | |
| return TaskScenarioState( | |
| health=health, | |
| done=done, | |
| details={ | |
| "munge_key_mode_correct": key_locked_down, | |
| "munge_key_contents_correct": key_contents_intact, | |
| "munge_service_active": auth_restored, | |
| "compute_node_idle": node_idle, | |
| "expected_mode_octal": oct(EXPECTED_KEY_MODE), | |
| }, | |
| ) | |
| def command_reveals_fact(command: str, trigger: DiagnosticTrigger) -> bool: | |
| return any(re.search(pattern, command, flags=re.IGNORECASE) for pattern in trigger.command_patterns) | |
| def _key_mode_matches(path: Path) -> bool: | |
| if not path.exists(): | |
| return False | |
| mode = stat.S_IMODE(path.stat().st_mode) | |
| return mode == EXPECTED_KEY_MODE | |
| def _key_contents_match(path: Path) -> bool: | |
| if not path.exists(): | |
| return False | |
| return path.read_bytes() == EXPECTED_KEY_BYTES | |
| def _write_state(path: Path, doc: dict) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(doc, indent=2, sort_keys=True) + "\n") | |
| def _read_state(path: Path) -> dict: | |
| if not path.exists(): | |
| return {} | |
| try: | |
| return json.loads(path.read_text() or "{}") | |
| except json.JSONDecodeError: | |
| return {} | |