Spaces:
Sleeping
Sleeping
| import time | |
| import re | |
| import ipaddress | |
| from urllib.parse import urlparse, urlunparse | |
| import httpx | |
| import dns.resolver | |
| import dns.exception | |
| import dns.rcode | |
| import gradio as gr | |
| # ----------------------- | |
| # SSRF protection | |
| # ----------------------- | |
| PRIVATE_NETS = [ | |
| ipaddress.ip_network("0.0.0.0/8"), | |
| ipaddress.ip_network("10.0.0.0/8"), | |
| ipaddress.ip_network("127.0.0.0/8"), | |
| ipaddress.ip_network("169.254.0.0/16"), | |
| ipaddress.ip_network("172.16.0.0/12"), | |
| ipaddress.ip_network("192.168.0.0/16"), | |
| ipaddress.ip_network("224.0.0.0/4"), | |
| ipaddress.ip_network("240.0.0.0/4"), | |
| ipaddress.ip_network("::1/128"), | |
| ipaddress.ip_network("fc00::/7"), | |
| ipaddress.ip_network("fe80::/10"), | |
| ] | |
| DOMAIN_RE = re.compile( | |
| r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))*$" | |
| ) | |
| CLIENT = httpx.Client( | |
| follow_redirects=True, | |
| timeout=10.0, | |
| headers={"User-Agent": "HF-Connectivity-Checker/2.0"}, | |
| ) | |
| def is_private_ip(ip_str: str) -> bool: | |
| try: | |
| ip = ipaddress.ip_address(ip_str) | |
| return any(ip in net for net in PRIVATE_NETS) | |
| except Exception: | |
| return True | |
| def parse_target(target: str): | |
| """ | |
| Returns (kind, raw, host) | |
| kind: url | domain | ip | unknown | empty | |
| """ | |
| t = (target or "").strip() | |
| if not t: | |
| return ("empty", "", "") | |
| if t.startswith("http://") or t.startswith("https://"): | |
| u = urlparse(t) | |
| return ("url", t, u.hostname or "") | |
| try: | |
| ipaddress.ip_address(t) | |
| return ("ip", t, t) | |
| except Exception: | |
| pass | |
| d = t.rstrip(".") | |
| if DOMAIN_RE.match(d): | |
| return ("domain", d, d) | |
| return ("unknown", t, "") | |
| def dns_check(host: str): | |
| """ | |
| DNS check with clearer classification. | |
| """ | |
| out = { | |
| "host": host, | |
| "status": "UNKNOWN", | |
| "A": [], | |
| "AAAA": [], | |
| "CNAME": [], | |
| "detail": "", | |
| } | |
| if not host: | |
| out["status"] = "INVALID" | |
| out["detail"] = "Empty host" | |
| return out | |
| r = dns.resolver.Resolver() | |
| r.lifetime = 3.0 | |
| def _resolve(rtype: str): | |
| try: | |
| start = time.time() | |
| ans = r.resolve(host, rtype) | |
| ms = int((time.time() - start) * 1000) | |
| return ("OK", [x.to_text() for x in ans], ms, "") | |
| except dns.resolver.NXDOMAIN as e: | |
| return ("NXDOMAIN", [], 0, str(e)) | |
| except dns.resolver.NoAnswer as e: | |
| return ("NOANSWER", [], 0, str(e)) | |
| except dns.resolver.NoNameservers as e: | |
| return ("NONAMESERVERS", [], 0, str(e)) | |
| except dns.exception.Timeout as e: | |
| return ("TIMEOUT", [], 0, str(e)) | |
| except Exception as e: | |
| return ("ERROR", [], 0, str(e)) | |
| a_stat, a_vals, a_ms, a_err = _resolve("A") | |
| aaaa_stat, aaaa_vals, aaaa_ms, aaaa_err = _resolve("AAAA") | |
| out["A"] = a_vals | |
| out["AAAA"] = aaaa_vals | |
| if a_ms: | |
| out["A_ms"] = a_ms | |
| if aaaa_ms: | |
| out["AAAA_ms"] = aaaa_ms | |
| # CNAME best-effort | |
| try: | |
| ans = r.resolve(host, "CNAME") | |
| out["CNAME"] = [x.target.to_text().rstrip(".") for x in ans] | |
| except Exception: | |
| pass | |
| # classify | |
| if a_vals or aaaa_vals or out["CNAME"]: | |
| out["status"] = "OK" | |
| out["detail"] = "Resolved" | |
| return out | |
| # if both failed, choose most informative | |
| # priority: NXDOMAIN > TIMEOUT > NONAMESERVERS > NOANSWER > ERROR | |
| combined = [(a_stat, a_err), (aaaa_stat, aaaa_err)] | |
| stats = [s for s, _ in combined] | |
| if "NXDOMAIN" in stats: | |
| out["status"] = "NXDOMAIN" | |
| out["detail"] = a_err or aaaa_err | |
| elif "TIMEOUT" in stats: | |
| out["status"] = "TIMEOUT" | |
| out["detail"] = a_err or aaaa_err | |
| elif "NONAMESERVERS" in stats: | |
| out["status"] = "SERVFAIL/NONAMESERVERS" | |
| out["detail"] = a_err or aaaa_err | |
| elif "NOANSWER" in stats: | |
| out["status"] = "NOANSWER" | |
| out["detail"] = a_err or aaaa_err | |
| else: | |
| out["status"] = "ERROR" | |
| out["detail"] = a_err or aaaa_err | |
| return out | |
| def build_probe_urls(kind: str, raw: str, host: str, path: str): | |
| """ | |
| Build unique URLs to probe (avoid duplicates). | |
| If user gives full URL, keep it. | |
| If domain/ip, probe https://host + path then http://host + path | |
| """ | |
| path = (path or "/").strip() | |
| if not path.startswith("/"): | |
| path = "/" + path | |
| urls = [] | |
| if kind == "url": | |
| # Use raw as-is first | |
| urls.append(raw) | |
| # Also probe scheme+host+path (but only if different from raw) | |
| u = urlparse(raw) | |
| host_only = u.hostname or host | |
| # keep query if user gave raw with query; otherwise keep their raw | |
| # For second probe, use https host + path (no query) as fallback | |
| urls.append(f"https://{host_only}{path}") | |
| else: | |
| urls.append(f"https://{host}{path}") | |
| urls.append(f"http://{host}{path}") | |
| # de-dup while preserving order | |
| seen = set() | |
| out = [] | |
| for u in urls: | |
| if u not in seen: | |
| seen.add(u) | |
| out.append(u) | |
| return out[:2] # max 2 probes | |
| def http_probe(url: str): | |
| """ | |
| GET probe (better for API than HEAD). | |
| Returns status + snippet. | |
| """ | |
| info = {"url": url, "ok": False} | |
| try: | |
| start = time.time() | |
| r = CLIENT.get(url, headers={"Range": "bytes=0-2048"}) | |
| ms = int((time.time() - start) * 1000) | |
| ctype = r.headers.get("content-type", "") | |
| snippet = "" | |
| try: | |
| snippet = r.text[:250] | |
| except Exception: | |
| snippet = "" | |
| info.update({ | |
| "ok": True, | |
| "status_code": r.status_code, | |
| "final_url": str(r.url), | |
| "latency_ms": ms, | |
| "content_type": ctype, | |
| "server": r.headers.get("server", ""), | |
| "cf_ray": r.headers.get("cf-ray", ""), | |
| "snippet": snippet, | |
| }) | |
| return info | |
| except httpx.ConnectTimeout: | |
| info["error"] = "connect_timeout" | |
| except httpx.ReadTimeout: | |
| info["error"] = "read_timeout" | |
| except httpx.ConnectError as e: | |
| info["error"] = f"connect_error: {e}" | |
| except httpx.HTTPError as e: | |
| info["error"] = f"http_error: {e}" | |
| except Exception as e: | |
| info["error"] = f"unknown_error: {e}" | |
| return info | |
| def overall_status(dns_result, http_results): | |
| """ | |
| Make it super clear: allowed/blocked/access-denied/down. | |
| """ | |
| dns_stat = dns_result.get("status", "UNKNOWN") | |
| if dns_stat in ("NXDOMAIN", "TIMEOUT", "SERVFAIL/NONAMESERVERS", "ERROR"): | |
| return f"DNS_{dns_stat} (HF can't resolve reliably)" | |
| # HTTP | |
| oks = [x for x in http_results if x.get("ok")] | |
| if oks: | |
| code = oks[0].get("status_code", 0) | |
| if code in (401, 403): | |
| return f"REACHABLE_BUT_PROTECTED ({code})" | |
| if code == 451: | |
| return "REACHABLE_BUT_RESTRICTED (451)" | |
| if 200 <= code < 300: | |
| return f"API_ACCESSIBLE ({code})" | |
| if 300 <= code < 400: | |
| return f"REACHABLE_REDIRECT ({code})" | |
| if code == 404: | |
| return "REACHABLE_BUT_NOT_FOUND (404) (domain ok, path missing)" | |
| return f"REACHABLE_OTHER ({code})" | |
| # No OK results, DNS is OK => likely network block OR origin down | |
| errs = " | ".join(x.get("error", "") for x in http_results if x.get("error")) | |
| if "timeout" in errs: | |
| return "HTTP_TIMEOUT (possible block / route issue / origin down)" | |
| if "No address associated" in errs: | |
| return "DNS_ISSUE (no address)" | |
| return f"HTTP_FAIL ({errs or 'unknown'})" | |
| def check_one(target: str, path: str): | |
| kind, raw, host = parse_target(target) | |
| if kind == "empty": | |
| return {"error": "Enter a domain / IP / URL"} | |
| if kind == "unknown" or not host: | |
| return {"error": "Invalid input"} | |
| # DNS | |
| # If IP -> skip DNS, but block private/reserved | |
| try: | |
| ipaddress.ip_address(host) | |
| if is_private_ip(host): | |
| return {"error": "Blocked: private/reserved IP not allowed (SSRF protection)."} | |
| dns_result = {"host": host, "status": "OK", "A": [host], "AAAA": [], "CNAME": [], "detail": "IP input"} | |
| except Exception: | |
| dns_result = dns_check(host) | |
| ips = (dns_result.get("A") or []) + (dns_result.get("AAAA") or []) | |
| for ip in ips: | |
| if is_private_ip(ip): | |
| return {"error": "Blocked: resolves to private/reserved IP (SSRF protection)."} | |
| # HTTP/API probes | |
| urls = build_probe_urls(kind, raw, host, path) | |
| http_results = [http_probe(urls[0]), http_probe(urls[1])] if len(urls) > 1 else [http_probe(urls[0])] | |
| status = overall_status(dns_result, http_results) | |
| return { | |
| "input": (target or "").strip(), | |
| "probe_path": (path or "/").strip(), | |
| "host": host, | |
| "dns": dns_result, | |
| "http": http_results, | |
| "status": status, | |
| "note": "Checked from Hugging Face Space network (egress).", | |
| } | |
| def bulk_check(base_domain: str, subdomains_text: str, path: str): | |
| base = (base_domain or "").strip().rstrip(".") | |
| if not base: | |
| return [] | |
| lines = [x.strip() for x in (subdomains_text or "").splitlines() if x.strip()] | |
| targets = [] | |
| for s in lines[:200]: | |
| targets.append(s if "." in s else f"{s}.{base}") | |
| rows = [] | |
| for t in targets: | |
| r = check_one(t, path) | |
| dns = r.get("dns", {}) if isinstance(r, dict) else {} | |
| http = r.get("http", [{}]) if isinstance(r, dict) else [{}] | |
| code = http[0].get("status_code", "") | |
| rows.append([ | |
| t, | |
| r.get("status") or r.get("error", "error"), | |
| dns.get("status", ""), | |
| ",".join(dns.get("A", [])), | |
| str(code), | |
| ]) | |
| return rows | |
| with gr.Blocks(title="HF Domain IP Checker") as demo: | |
| gr.Markdown( | |
| "## HF Domain/IP + API Accessibility Checker\n" | |
| "✅ DNS resolve + ✅ API reachable check **from this Hugging Face Space**.\n" | |
| "- Subdomains are checked only from your provided list.\n" | |
| "- Private/reserved IPs blocked (SSRF protection)." | |
| ) | |
| with gr.Tab("Single Check"): | |
| inp = gr.Textbox(label="Domain / IP / URL", placeholder="example.com OR https://example.com/api OR 1.2.3.4") | |
| path = gr.Textbox(label="Probe path (optional)", value="/", placeholder="/ OR /health OR /api") | |
| btn = gr.Button("Check") | |
| out = gr.JSON(label="Result") | |
| btn.click(check_one, inputs=[inp, path], outputs=out) | |
| with gr.Tab("Bulk (Your list only)"): | |
| base = gr.Textbox(label="Base domain", placeholder="example.com") | |
| subs = gr.Textbox(label="Subdomains (one per line)", lines=10, placeholder="www\napi\ncdn\nor full: api.example.com") | |
| path2 = gr.Textbox(label="Probe path for all", value="/", placeholder="/health (recommended for API)") | |
| btn2 = gr.Button("Bulk Check") | |
| table = gr.Dataframe( | |
| headers=["target", "overall_status", "dns_status", "A_records", "http_code"], | |
| datatype=["str", "str", "str", "str", "str"], | |
| row_count=5, | |
| label="Results", | |
| ) | |
| btn2.click(bulk_check, inputs=[base, subs, path2], outputs=table) | |
| demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) |