HPCOpenenv / sysadmin_env /tasks /hpc_pid_stale.py
huggingmenfordays's picture
deploy: ccyloopss/HPCOpenenv — with OPENENV_API_KEY auth guard
bc35a94
from __future__ import annotations
import json
import re
from pathlib import Path
from sysadmin_env.models import DiagnosticTrigger
from sysadmin_env.models import DifficultyTier
from sysadmin_env.models import TaskMetadata
from sysadmin_env.models import TaskScenarioDefinition
from sysadmin_env.models import TaskScenarioState
from sysadmin_env.tasks import hpc_outage
TASK_ID = "hpc_pid_stale"
COMPLETION_HEALTH = 1.0
SHARED_STATE_PATH = hpc_outage.SHARED_STATE_PATH
COMPUTE_ROOT = hpc_outage.COMPUTE_ROOT
STALE_PID_RELATIVE = Path("var/run/slurmd.pid")
STALE_PID_PATH = COMPUTE_ROOT / STALE_PID_RELATIVE
STALE_PID_CONTENTS = "31337\n"
INITIAL_STATE: dict = {
"cluster": "rocky-hpc",
"cores_total": hpc_outage.CLUSTER_CORES_TOTAL,
"cores_per_node": hpc_outage.CLUSTER_CORES_PER_NODE,
"partitions": {
"compute": {"nodes": ["compute-01"], "default": True},
},
"nodes": {
"login": {
"state": "up",
"reason": "",
"cores": hpc_outage.CLUSTER_CORES_PER_NODE,
},
"compute-01": {
"state": "drain",
"reason": "slurmd failed stale pid file at /var/run/slurmd.pid",
"cores": hpc_outage.CLUSTER_CORES_PER_NODE,
},
},
"services": {
"slurmd@login": "active",
"slurmd@compute-01": "failed",
"slurmctld@login": "active",
},
"jobs": [
{
"id": 9117,
"name": "lattice_qcd",
"user": "physicist",
"state": "PD",
"partition": "compute",
"nodes": "(NodeDown)",
"time": "0:00",
},
],
}
def build_definition(base_filesystem_path: str) -> TaskScenarioDefinition:
metadata = TaskMetadata(
task_id=TASK_ID,
difficulty=DifficultyTier.hard,
description="slurmd refuses to restart after reboot because a stale pid file is still on disk",
max_steps=90,
time_limit=600.0,
base_filesystem_path=base_filesystem_path,
)
return TaskScenarioDefinition(
metadata=metadata,
requires_network_isolation=False,
allows_nested_sandbox=True,
diagnostic_triggers=diagnostic_triggers(),
)
def diagnostic_triggers() -> list[DiagnosticTrigger]:
return [
DiagnosticTrigger(
fact_id="cluster_queue_inspected",
command_patterns=[r"\bsinfo\b", r"\bsqueue\b"],
reward=0.06,
),
DiagnosticTrigger(
fact_id="compute_node_entered",
command_patterns=[r"\bssh\s+compute-01\b"],
reward=0.07,
),
DiagnosticTrigger(
fact_id="slurmd_service_checked",
command_patterns=[r"systemctl\s+status\s+slurmd", r"systemctl\s+is-failed\s+slurmd"],
reward=0.05,
),
DiagnosticTrigger(
fact_id="pid_file_inspected",
command_patterns=[r"cat\s+.+slurmd\.pid", r"ls\s+.+run/slurmd\.pid", r"stat\s+.+slurmd\.pid"],
reward=0.05,
),
DiagnosticTrigger(
fact_id="ood_portal_probed",
command_patterns=[r"curl\s+.+localhost:8080", r"curl\s+.+127\.0\.0\.1:8080"],
reward=0.05,
),
]
def prepare_filesystem(root: str | Path) -> None:
root_path = Path(root)
hpc_outage.prepare_filesystem(root_path)
route_path = root_path / hpc_outage.COMPUTE_ROUTE_PATH
route_path.parent.mkdir(parents=True, exist_ok=True)
route_path.write_text(hpc_outage.FIXED_ROUTE)
pid_path = root_path / STALE_PID_PATH
pid_path.parent.mkdir(parents=True, exist_ok=True)
pid_path.write_text(STALE_PID_CONTENTS)
_write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE)
def inject_fault(root: str | Path) -> None:
prepare_filesystem(root)
def observe_command(root: str | Path, command: str, _result) -> None:
_ = Path(root)
_ = command
def synchronize(root: str | Path) -> None:
root_path = Path(root)
if not (root_path / SHARED_STATE_PATH).exists():
_write_state(root_path / SHARED_STATE_PATH, INITIAL_STATE)
def grade(root: str | Path) -> TaskScenarioState:
root_path = Path(root)
pid_path = root_path / STALE_PID_PATH
state_doc = _read_state(root_path / SHARED_STATE_PATH)
pid_removed = not pid_path.exists()
slurmd_service = state_doc.get("services", {}).get("slurmd@compute-01", "")
slurmd_active = slurmd_service == "active"
node_state = state_doc.get("nodes", {}).get("compute-01", {}).get("state", "")
node_idle = node_state == "idle"
health = 0.0
if pid_removed:
health += 0.3
if slurmd_active:
health += 0.3
if pid_removed and slurmd_active and node_idle:
health = COMPLETION_HEALTH
done = pid_removed and slurmd_active and node_idle
return TaskScenarioState(
health=health,
done=done,
details={
"stale_pid_file_removed": pid_removed,
"slurmd_service_active": slurmd_active,
"compute_node_idle": node_idle,
"expected_pid_path": str(STALE_PID_RELATIVE),
},
)
def command_reveals_fact(command: str, trigger: DiagnosticTrigger) -> bool:
return any(re.search(pattern, command, flags=re.IGNORECASE) for pattern in trigger.command_patterns)
def _write_state(path: Path, doc: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(doc, indent=2, sort_keys=True) + "\n")
def _read_state(path: Path) -> dict:
if not path.exists():
return {}
try:
return json.loads(path.read_text() or "{}")
except json.JSONDecodeError:
return {}