disk-panic-openenv / server /environment.py
yashppawar's picture
Restructure to flat template layout; add pyproject + uv.lock; pass openenv validate
30c52ad verified
"""DiskPanicEnvironment — server-side OpenEnv Environment implementation."""
from __future__ import annotations
import os
from typing import Any, Optional
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import DiskPanicAction, DiskPanicObservation
except ImportError:
from models import DiskPanicAction, DiskPanicObservation
from .graders import GRADERS
from .scenarios import SCENARIOS, TASK_ORDER
from .vfs import VFS, execute
MAX_STEPS = 15
class DiskPanicEnvironment(Environment):
"""A tiny SRE world where the agent issues bash-lite commands to fix a broken server."""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self) -> None:
super().__init__()
self._task_index = 0
self._vfs: VFS = VFS()
self._targets: dict = {}
self._task_id: str = TASK_ORDER[0]
self._step_count: int = 0
self._state = State(episode_id=None, step_count=0)
self._last_reward: float = 0.0
# -- lifecycle ---------------------------------------------------------
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: Optional[str] = None,
**kwargs: Any,
) -> DiskPanicObservation:
# Task selection: explicit task_id kwarg > env var > round-robin.
if task_id and task_id in SCENARIOS:
chosen = task_id
else:
env_task = os.getenv("DISK_PANIC_TASK")
if env_task and env_task in SCENARIOS:
chosen = env_task
else:
chosen = TASK_ORDER[self._task_index % len(TASK_ORDER)]
self._task_index += 1
builder = SCENARIOS[chosen]
self._vfs, self._targets = builder()
self._task_id = chosen
self._step_count = 0
self._last_reward = 0.0
self._state = State(episode_id=episode_id or f"ep-{chosen}", step_count=0)
return DiskPanicObservation(
done=False,
reward=None,
stdout=(
f"=== DiskPanic task: {chosen} ===\n"
f"{self._describe_task()}\n"
f"Current disk usage: {self._vfs.usage_pct():.1f}%"
),
df_output=self._vfs.df_output(),
service_status=self._vfs.services.get("app", "unknown"),
task_id=chosen,
step=0,
last_error=None,
)
def step(
self,
action: DiskPanicAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> DiskPanicObservation:
self._step_count += 1
self._state.step_count = self._step_count
# Execute the command against the VFS.
result = execute(self._vfs, action.command)
# Tick the world (runaway writer, etc.)
self._vfs.tick()
# Compute current grade — always in [0.0, 1.0].
grader = GRADERS[self._task_id]
current_score = grader(self._vfs, self._targets)
self._last_reward = current_score
# Episode ends when full score reached OR max steps elapsed.
done = current_score >= 0.98 or self._step_count >= MAX_STEPS
# Reward is the ABSOLUTE current grade so the last step reflects the final score.
reward = round(float(current_score), 4)
return DiskPanicObservation(
done=done,
reward=reward,
stdout=result.stdout or "",
df_output=self._vfs.df_output(),
service_status=self._vfs.services.get("app", "unknown"),
task_id=self._task_id,
step=self._step_count,
last_error=result.error,
)
@property
def state(self) -> State:
return self._state
def close(self) -> None:
self._vfs = VFS()
self._targets = {}
# -- helpers -----------------------------------------------------------
def _describe_task(self) -> str:
if self._task_id == "easy":
return (
"Root filesystem is at >95% usage — find and remove the bloated log "
"under /var/log. DO NOT touch /var/log/audit/."
)
if self._task_id == "medium":
return (
"Disk is full AND app.service has failed. Free space, restart "
"app.service, and preserve /var/log/audit/."
)
if self._task_id == "hard":
return (
"A runaway process keeps growing /var/log/app/runaway.log by ~100M "
"every tick. Free space, restart app.service, preserve audit logs, "
"AND drop a logrotate config at /etc/logrotate.d/app (must contain "
"the words 'rotate' and 'size') to cap the growth."
)
return "unknown task"