Spaces:
Sleeping
Sleeping
| """ | |
| ForensicShell Environment — server-side implementation. | |
| The environment pre-seeds a fake "breached" Linux filesystem in memory. The agent | |
| interrogates it with structured read-only actions (list_dir, read_file, grep, stat) | |
| and finishes the episode by submitting a ForensicReport via action_type='submit_report'. | |
| A deterministic grader scores the report against hidden ground truth and returns | |
| a reward in [0.0, 1.0] on the terminal step. | |
| """ | |
| import hashlib | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import EnvironmentMetadata, State | |
| try: | |
| from ..models import ForensicShellAction, ForensicShellObservation | |
| except ImportError: | |
| from models import ForensicShellAction, ForensicShellObservation | |
| try: | |
| from .grader import grade | |
| from .scenario_generator import generate_scenario | |
| from .scenarios import DEFAULT_TASK_ID, SCENARIOS | |
| except ImportError: | |
| from grader import grade # type: ignore | |
| from scenario_generator import generate_scenario # type: ignore | |
| from scenarios import DEFAULT_TASK_ID, SCENARIOS # type: ignore | |
| MAX_STEPS_PER_EPISODE = 30 # default fallback | |
| # Difficulty-dependent step budgets. Easier tasks shouldn't reward aimless | |
| # exploration; harder tasks with red herrings genuinely need more budget. | |
| STEPS_BY_DIFFICULTY = {"easy": 15, "medium": 25, "hard": 35} | |
| # Hand-authored task overrides (kept for backward compat with Day-1 baselines) | |
| STEPS_BY_TASK = {"t1_login": 15, "t2_modified": 25, "t3_timeline": 35} | |
| # Exploration shaping reward — small positive reward the first time the agent | |
| # reads one of the scenario's "canonical forensic artifacts" (auth.log, bash | |
| # histories, cron files, backdoor path, etc.). Capped so the terminal grader | |
| # reward always dominates the trajectory return. | |
| SHAPING_REWARD_PER_READ = 0.02 | |
| SHAPING_REWARD_CAP = 0.10 | |
| def _canonical_artifacts(scenario: dict) -> set: | |
| """ | |
| Pick out the set of paths in a scenario that a good investigator *should* | |
| read. For hand-authored scenarios we use the ground-truth modified_files | |
| plus a fixed set of classic forensic log paths. For generated scenarios we | |
| also include the bash history of the compromised user. | |
| """ | |
| gt = scenario.get("ground_truth", {}) or {} | |
| paths: set = set() | |
| paths.update(gt.get("modified_files", []) or []) | |
| for p in ( | |
| "/var/log/auth.log", | |
| "/var/log/auth.log.1", | |
| "/etc/passwd", | |
| "/etc/shadow", | |
| ): | |
| if p in scenario.get("filesystem", {}): | |
| paths.add(p) | |
| user = gt.get("compromised_user") | |
| if user: | |
| bh = f"/home/{user}/.bash_history" | |
| if bh in scenario.get("filesystem", {}): | |
| paths.add(bh) | |
| return paths | |
| def _as_bytes(content) -> bytes: | |
| if isinstance(content, bytes): | |
| return content | |
| return str(content).encode("utf-8", errors="replace") | |
| def _as_text(content) -> str: | |
| if isinstance(content, bytes): | |
| try: | |
| return content.decode("utf-8") | |
| except UnicodeDecodeError: | |
| return f"<binary:{len(content)} bytes>" | |
| return str(content) | |
| class ForensicShellEnvironment(Environment): | |
| """ | |
| Pre-seeded forensic investigation environment. Agent actions are read-only file | |
| operations over a synthetic filesystem kept in a Python dict. | |
| Episode flow: | |
| reset(task_id='t1_login' | 't2_modified' | 't3_timeline') -> initial obs | |
| step(list_dir|read_file|grep|stat) -> obs with output | |
| step(submit_report(ForensicReport)) -> terminal obs with reward in [0,1] | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._task_id: str = DEFAULT_TASK_ID | |
| self._scenario: dict = SCENARIOS[DEFAULT_TASK_ID] | |
| self._fs: Dict[str, object] = {} | |
| self._done: bool = False | |
| self._steps_used: int = 0 | |
| self._useful_read: set = set() # paths already rewarded | |
| self._shaping_total: float = 0.0 # running sum, capped at SHAPING_REWARD_CAP | |
| self._canonical: set = set() # per-episode canonical artifact set | |
| # ---- episode lifecycle --------------------------------------------------- | |
| def reset( | |
| self, | |
| task_id: Optional[str] = None, | |
| seed: Optional[int] = None, | |
| difficulty: Optional[int] = None, | |
| pattern: Optional[str] = None, | |
| **kwargs, | |
| ) -> ForensicShellObservation: | |
| """ | |
| Load either a hand-authored scenario (by task_id) OR a procedurally | |
| generated one (by seed+difficulty+pattern). If seed is given, generator | |
| wins; otherwise fall back to task_id lookup, then DEFAULT_TASK_ID. | |
| """ | |
| if seed is not None: | |
| scenario = generate_scenario( | |
| seed=int(seed), | |
| difficulty=int(difficulty) if difficulty is not None else 3, | |
| pattern=pattern, | |
| ) | |
| self._task_id = scenario["task_id"] | |
| self._scenario = scenario | |
| else: | |
| env_task = os.getenv("FORENSIC_TASK_ID") | |
| chosen = task_id or env_task or DEFAULT_TASK_ID | |
| if chosen not in SCENARIOS: | |
| chosen = DEFAULT_TASK_ID | |
| self._task_id = chosen | |
| self._scenario = SCENARIOS[chosen] | |
| self._fs = dict(self._scenario["filesystem"]) | |
| self._done = False | |
| self._steps_used = 0 | |
| self._useful_read = set() | |
| self._shaping_total = 0.0 | |
| self._canonical = _canonical_artifacts(self._scenario) | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| # Difficulty-dependent step budget | |
| diff_label = self._scenario.get("difficulty", "medium") | |
| self._max_steps = ( | |
| STEPS_BY_TASK.get(self._task_id) | |
| or STEPS_BY_DIFFICULTY.get(diff_label) | |
| or MAX_STEPS_PER_EPISODE | |
| ) | |
| return ForensicShellObservation( | |
| output=( | |
| f"ForensicShell ready. Task: {self._task_id} " | |
| f"({diff_label}).\n" | |
| f"Available actions: list_dir(path), read_file(path,max_bytes), " | |
| f"grep(pattern,path), stat(path), find(pattern,path), submit_report(report).\n" | |
| f"Start by listing /var/log or /home." | |
| ), | |
| task_id=self._task_id, | |
| task_description=self._scenario["description"], | |
| steps_remaining=self._max_steps, | |
| action_error=None, | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "difficulty": diff_label, | |
| "max_steps": self._max_steps, | |
| }, | |
| ) | |
| # ---- action dispatch ----------------------------------------------------- | |
| def step(self, action: ForensicShellAction) -> ForensicShellObservation: # type: ignore[override] | |
| self._state.step_count += 1 | |
| self._steps_used += 1 | |
| steps_remaining = max(0, self._max_steps - self._steps_used) | |
| # If already done, return a terminal obs (grace) | |
| if self._done: | |
| return self._obs( | |
| output="Episode already ended. Call reset() to start a new one.", | |
| steps_remaining=0, | |
| error="episode_done", | |
| done=True, | |
| reward=0.0, | |
| ) | |
| # Hard cap on steps | |
| if self._steps_used > self._max_steps: | |
| self._done = True | |
| return self._obs( | |
| output="Step budget exhausted without a submitted report.", | |
| steps_remaining=0, | |
| error="step_budget_exhausted", | |
| done=True, | |
| reward=0.0, | |
| ) | |
| verb = action.action_type | |
| try: | |
| if verb == "list_dir": | |
| out, err = self._do_list_dir(action.path or "/") | |
| return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0) | |
| if verb == "read_file": | |
| path = action.path or "" | |
| out, err = self._do_read_file(path, action.max_bytes or 2048) | |
| shaped = self._award_shaping(path) if err is None else 0.0 | |
| return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=shaped) | |
| if verb == "grep": | |
| path = action.path or "" | |
| out, err = self._do_grep(action.pattern or "", path) | |
| shaped = self._award_shaping(path) if err is None else 0.0 | |
| return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=shaped) | |
| if verb == "stat": | |
| out, err = self._do_stat(action.path or "") | |
| return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0) | |
| if verb == "find": | |
| out, err = self._do_find(action.pattern or "*", action.path or "/") | |
| return self._obs(output=out, steps_remaining=steps_remaining, error=err, done=False, reward=0.0) | |
| if verb == "submit_report": | |
| return self._do_submit_report(action, steps_remaining) | |
| return self._obs( | |
| output="", | |
| steps_remaining=steps_remaining, | |
| error=f"unknown action_type: {verb}", | |
| done=False, | |
| reward=0.0, | |
| ) | |
| except Exception as e: # pragma: no cover - defensive | |
| return self._obs( | |
| output="", | |
| steps_remaining=steps_remaining, | |
| error=f"internal_error: {type(e).__name__}: {e}", | |
| done=False, | |
| reward=0.0, | |
| ) | |
| def _do_find(self, pattern: str, path: str) -> Tuple[str, Optional[str]]: | |
| """Recursive search: find files matching a glob pattern under a directory.""" | |
| from fnmatch import fnmatch | |
| path = path.rstrip("/") or "/" | |
| prefix = "/" if path == "/" else path + "/" | |
| if path == "/": | |
| prefix = "/" | |
| matches: List[str] = [] | |
| for fp in sorted(self._fs.keys()): | |
| if fp == path or fp.startswith(prefix): | |
| basename = fp.rsplit("/", 1)[-1] if "/" in fp else fp | |
| if fnmatch(basename, pattern): | |
| matches.append(fp) | |
| if len(matches) >= 50: | |
| break | |
| if not matches: | |
| return f"(no files matching {pattern!r} under {path})", None | |
| return "\n".join(matches), None | |
| # ---- shaping reward ----------------------------------------------------- | |
| def _award_shaping(self, path: str) -> float: | |
| """ | |
| Return +SHAPING_REWARD_PER_READ the first time the agent touches a | |
| canonical forensic artifact, capped so the cumulative shaping stays | |
| <= SHAPING_REWARD_CAP across the episode. | |
| """ | |
| if not path or path not in self._canonical: | |
| return 0.0 | |
| if path in self._useful_read: | |
| return 0.0 | |
| if self._shaping_total + 1e-9 >= SHAPING_REWARD_CAP: | |
| return 0.0 | |
| self._useful_read.add(path) | |
| grant = min(SHAPING_REWARD_PER_READ, SHAPING_REWARD_CAP - self._shaping_total) | |
| self._shaping_total += grant | |
| return float(grant) | |
| # ---- action primitives --------------------------------------------------- | |
| def _do_list_dir(self, path: str) -> Tuple[str, Optional[str]]: | |
| path = path.rstrip("/") or "/" | |
| prefix = "/" if path == "/" else path + "/" | |
| entries = set() | |
| for fp in self._fs.keys(): | |
| if not fp.startswith(prefix): | |
| continue | |
| rest = fp[len(prefix):] | |
| if not rest: | |
| continue | |
| head = rest.split("/", 1)[0] | |
| entries.add(head) | |
| if not entries: | |
| return "", f"no such directory or empty: {path}" | |
| listing = "\n".join(sorted(entries)) | |
| return f"{path}:\n{listing}", None | |
| def _do_read_file(self, path: str, max_bytes: int) -> Tuple[str, Optional[str]]: | |
| if path not in self._fs: | |
| return "", f"no such file: {path}" | |
| content = self._fs[path] | |
| text = _as_text(content) | |
| if max_bytes and len(text) > max_bytes: | |
| text = text[:max_bytes] + f"\n... [truncated at {max_bytes} bytes]" | |
| return text, None | |
| def _do_grep(self, pattern: str, path: str) -> Tuple[str, Optional[str]]: | |
| if not pattern: | |
| return "", "empty pattern" | |
| if path not in self._fs: | |
| return "", f"no such file: {path}" | |
| text = _as_text(self._fs[path]) | |
| hits: List[str] = [] | |
| for i, line in enumerate(text.splitlines(), start=1): | |
| if pattern in line: | |
| hits.append(f"{i}: {line}") | |
| if len(hits) >= 100: | |
| break | |
| if not hits: | |
| return f"(no matches for {pattern!r} in {path})", None | |
| return "\n".join(hits), None | |
| def _do_stat(self, path: str) -> Tuple[str, Optional[str]]: | |
| if path not in self._fs: | |
| return "", f"no such file: {path}" | |
| content = self._fs[path] | |
| raw = _as_bytes(content) | |
| sha = hashlib.sha256(raw).hexdigest() | |
| return ( | |
| f"path={path}\nsize={len(raw)}\nsha256={sha}", | |
| None, | |
| ) | |
| def _do_submit_report( | |
| self, action: ForensicShellAction, steps_remaining: int | |
| ) -> ForensicShellObservation: | |
| if action.report is None: | |
| return self._obs( | |
| output="submit_report requires a 'report' field.", | |
| steps_remaining=steps_remaining, | |
| error="missing_report", | |
| done=False, | |
| reward=0.0, | |
| ) | |
| report_dict = action.report.model_dump(mode="json") | |
| truth = self._scenario["ground_truth"] | |
| reward = grade(self._task_id, report_dict, truth) | |
| self._done = True | |
| summary = ( | |
| f"Report received for task {self._task_id}. " | |
| f"Reward: {reward:.3f}. Episode complete." | |
| ) | |
| return self._obs( | |
| output=summary, | |
| steps_remaining=0, | |
| error=None, | |
| done=True, | |
| reward=reward, | |
| extra_metadata={"submitted_report": report_dict, "task_id": self._task_id}, | |
| ) | |
| # ---- obs helper ---------------------------------------------------------- | |
| def _obs( | |
| self, | |
| output: str, | |
| steps_remaining: int, | |
| error: Optional[str], | |
| done: bool, | |
| reward: float, | |
| extra_metadata: Optional[Dict] = None, | |
| ) -> ForensicShellObservation: | |
| meta: Dict = { | |
| "task_id": self._task_id, | |
| "step": self._state.step_count, | |
| } | |
| if extra_metadata: | |
| meta.update(extra_metadata) | |
| return ForensicShellObservation( | |
| output=output, | |
| task_id=self._task_id, | |
| task_description=self._scenario.get("description", ""), | |
| steps_remaining=steps_remaining, | |
| action_error=error, | |
| done=done, | |
| reward=reward, | |
| metadata=meta, | |
| ) | |
| # ---- state + metadata ---------------------------------------------------- | |
| def state(self) -> State: | |
| return self._state | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| """ | |
| Override the OpenEnv default to populate the /metadata endpoint with a real | |
| name, description, embedded README, version, author, and docs URL — instead | |
| of the boilerplate auto-derived from the class name. | |
| """ | |
| readme_path = Path(__file__).resolve().parent.parent / "README.md" | |
| readme_content: Optional[str] = None | |
| if readme_path.exists(): | |
| try: | |
| readme_content = readme_path.read_text(encoding="utf-8") | |
| except OSError: | |
| readme_content = None | |
| return EnvironmentMetadata( | |
| name="ForensicShell", | |
| description=( | |
| "Digital-forensics investigation environment for OpenEnv RL. The " | |
| "agent reads logs, hashes backdoors, and reconstructs attacker " | |
| "kill-chains across 5 attack patterns and 5 difficulty tiers. " | |
| "Procedural scenarios via deterministic seeds; deterministic " | |
| "graders return rewards in [0, 1] with partial credit (Jaccard, " | |
| "F1, Kendall-tau)." | |
| ), | |
| readme_content=readme_content, | |
| version="0.2.0", | |
| author="yashppawar", | |
| documentation_url="https://huggingface.co/spaces/yashppawar/forensic-shell", | |
| ) | |