|
|
|
|
|
|
|
|
|
import socket |
|
import time |
|
from urllib.parse import urlparse |
|
|
|
import dns.asyncbackend |
|
import dns.inet |
|
import dns.name |
|
import dns.nameserver |
|
import dns.query |
|
import dns.rdtypes.svcbbase |
|
|
|
|
|
_local_resolver_name = dns.name.from_text("_dns.resolver.arpa") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _SVCBInfo: |
|
def __init__(self, bootstrap_address, port, hostname, nameservers): |
|
self.bootstrap_address = bootstrap_address |
|
self.port = port |
|
self.hostname = hostname |
|
self.nameservers = nameservers |
|
|
|
def ddr_check_certificate(self, cert): |
|
"""Verify that the _SVCBInfo's address is in the cert's subjectAltName (SAN)""" |
|
for name, value in cert["subjectAltName"]: |
|
if name == "IP Address" and value == self.bootstrap_address: |
|
return True |
|
return False |
|
|
|
def make_tls_context(self): |
|
ssl = dns.query.ssl |
|
ctx = ssl.create_default_context() |
|
ctx.minimum_version = ssl.TLSVersion.TLSv1_2 |
|
return ctx |
|
|
|
def ddr_tls_check_sync(self, lifetime): |
|
ctx = self.make_tls_context() |
|
expiration = time.time() + lifetime |
|
with socket.create_connection( |
|
(self.bootstrap_address, self.port), lifetime |
|
) as s: |
|
with ctx.wrap_socket(s, server_hostname=self.hostname) as ts: |
|
ts.settimeout(dns.query._remaining(expiration)) |
|
ts.do_handshake() |
|
cert = ts.getpeercert() |
|
return self.ddr_check_certificate(cert) |
|
|
|
async def ddr_tls_check_async(self, lifetime, backend=None): |
|
if backend is None: |
|
backend = dns.asyncbackend.get_default_backend() |
|
ctx = self.make_tls_context() |
|
expiration = time.time() + lifetime |
|
async with await backend.make_socket( |
|
dns.inet.af_for_address(self.bootstrap_address), |
|
socket.SOCK_STREAM, |
|
0, |
|
None, |
|
(self.bootstrap_address, self.port), |
|
lifetime, |
|
ctx, |
|
self.hostname, |
|
) as ts: |
|
cert = await ts.getpeercert(dns.query._remaining(expiration)) |
|
return self.ddr_check_certificate(cert) |
|
|
|
|
|
def _extract_nameservers_from_svcb(answer): |
|
bootstrap_address = answer.nameserver |
|
if not dns.inet.is_address(bootstrap_address): |
|
return [] |
|
infos = [] |
|
for rr in answer.rrset.processing_order(): |
|
nameservers = [] |
|
param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.ALPN) |
|
if param is None: |
|
continue |
|
alpns = set(param.ids) |
|
host = rr.target.to_text(omit_final_dot=True) |
|
port = None |
|
param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.PORT) |
|
if param is not None: |
|
port = param.port |
|
|
|
|
|
if b"h2" in alpns: |
|
param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.DOHPATH) |
|
if param is None or not param.value.endswith(b"{?dns}"): |
|
continue |
|
path = param.value[:-6].decode() |
|
if not path.startswith("/"): |
|
path = "/" + path |
|
if port is None: |
|
port = 443 |
|
url = f"https://{host}:{port}{path}" |
|
|
|
try: |
|
urlparse(url) |
|
nameservers.append(dns.nameserver.DoHNameserver(url, bootstrap_address)) |
|
except Exception: |
|
|
|
pass |
|
if b"dot" in alpns: |
|
if port is None: |
|
port = 853 |
|
nameservers.append( |
|
dns.nameserver.DoTNameserver(bootstrap_address, port, host) |
|
) |
|
if b"doq" in alpns: |
|
if port is None: |
|
port = 853 |
|
nameservers.append( |
|
dns.nameserver.DoQNameserver(bootstrap_address, port, True, host) |
|
) |
|
if len(nameservers) > 0: |
|
infos.append(_SVCBInfo(bootstrap_address, port, host, nameservers)) |
|
return infos |
|
|
|
|
|
def _get_nameservers_sync(answer, lifetime): |
|
"""Return a list of TLS-validated resolver nameservers extracted from an SVCB |
|
answer.""" |
|
nameservers = [] |
|
infos = _extract_nameservers_from_svcb(answer) |
|
for info in infos: |
|
try: |
|
if info.ddr_tls_check_sync(lifetime): |
|
nameservers.extend(info.nameservers) |
|
except Exception: |
|
pass |
|
return nameservers |
|
|
|
|
|
async def _get_nameservers_async(answer, lifetime): |
|
"""Return a list of TLS-validated resolver nameservers extracted from an SVCB |
|
answer.""" |
|
nameservers = [] |
|
infos = _extract_nameservers_from_svcb(answer) |
|
for info in infos: |
|
try: |
|
if await info.ddr_tls_check_async(lifetime): |
|
nameservers.extend(info.nameservers) |
|
except Exception: |
|
pass |
|
return nameservers |
|
|