Spaces:
Paused
Paused
| # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
| # | |
| # Support for Discovery of Designated Resolvers | |
| import socket | |
| import time | |
| from urllib.parse import urlparse | |
| import dns.asyncbackend | |
| import dns.inet | |
| import dns.name | |
| import dns.nameserver | |
| import dns.query | |
| import dns.rdtypes.svcbbase | |
| # The special name of the local resolver when using DDR | |
| _local_resolver_name = dns.name.from_text("_dns.resolver.arpa") | |
| # | |
| # Processing is split up into I/O independent and I/O dependent parts to | |
| # make supporting sync and async versions easy. | |
| # | |
| class _SVCBInfo: | |
| def __init__(self, bootstrap_address, port, hostname, nameservers): | |
| self.bootstrap_address = bootstrap_address | |
| self.port = port | |
| self.hostname = hostname | |
| self.nameservers = nameservers | |
| def ddr_check_certificate(self, cert): | |
| """Verify that the _SVCBInfo's address is in the cert's subjectAltName (SAN)""" | |
| for name, value in cert["subjectAltName"]: | |
| if name == "IP Address" and value == self.bootstrap_address: | |
| return True | |
| return False | |
| def make_tls_context(self): | |
| ssl = dns.query.ssl | |
| ctx = ssl.create_default_context() | |
| ctx.minimum_version = ssl.TLSVersion.TLSv1_2 | |
| return ctx | |
| def ddr_tls_check_sync(self, lifetime): | |
| ctx = self.make_tls_context() | |
| expiration = time.time() + lifetime | |
| with socket.create_connection( | |
| (self.bootstrap_address, self.port), lifetime | |
| ) as s: | |
| with ctx.wrap_socket(s, server_hostname=self.hostname) as ts: | |
| ts.settimeout(dns.query._remaining(expiration)) | |
| ts.do_handshake() | |
| cert = ts.getpeercert() | |
| return self.ddr_check_certificate(cert) | |
| async def ddr_tls_check_async(self, lifetime, backend=None): | |
| if backend is None: | |
| backend = dns.asyncbackend.get_default_backend() | |
| ctx = self.make_tls_context() | |
| expiration = time.time() + lifetime | |
| async with await backend.make_socket( | |
| dns.inet.af_for_address(self.bootstrap_address), | |
| socket.SOCK_STREAM, | |
| 0, | |
| None, | |
| (self.bootstrap_address, self.port), | |
| lifetime, | |
| ctx, | |
| self.hostname, | |
| ) as ts: | |
| cert = await ts.getpeercert(dns.query._remaining(expiration)) | |
| return self.ddr_check_certificate(cert) | |
| def _extract_nameservers_from_svcb(answer): | |
| bootstrap_address = answer.nameserver | |
| if not dns.inet.is_address(bootstrap_address): | |
| return [] | |
| infos = [] | |
| for rr in answer.rrset.processing_order(): | |
| nameservers = [] | |
| param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.ALPN) | |
| if param is None: | |
| continue | |
| alpns = set(param.ids) | |
| host = rr.target.to_text(omit_final_dot=True) | |
| port = None | |
| param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.PORT) | |
| if param is not None: | |
| port = param.port | |
| # For now we ignore address hints and address resolution and always use the | |
| # bootstrap address | |
| if b"h2" in alpns: | |
| param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.DOHPATH) | |
| if param is None or not param.value.endswith(b"{?dns}"): | |
| continue | |
| path = param.value[:-6].decode() | |
| if not path.startswith("/"): | |
| path = "/" + path | |
| if port is None: | |
| port = 443 | |
| url = f"https://{host}:{port}{path}" | |
| # check the URL | |
| try: | |
| urlparse(url) | |
| nameservers.append(dns.nameserver.DoHNameserver(url, bootstrap_address)) | |
| except Exception: | |
| # continue processing other ALPN types | |
| pass | |
| if b"dot" in alpns: | |
| if port is None: | |
| port = 853 | |
| nameservers.append( | |
| dns.nameserver.DoTNameserver(bootstrap_address, port, host) | |
| ) | |
| if b"doq" in alpns: | |
| if port is None: | |
| port = 853 | |
| nameservers.append( | |
| dns.nameserver.DoQNameserver(bootstrap_address, port, True, host) | |
| ) | |
| if len(nameservers) > 0: | |
| infos.append(_SVCBInfo(bootstrap_address, port, host, nameservers)) | |
| return infos | |
| def _get_nameservers_sync(answer, lifetime): | |
| """Return a list of TLS-validated resolver nameservers extracted from an SVCB | |
| answer.""" | |
| nameservers = [] | |
| infos = _extract_nameservers_from_svcb(answer) | |
| for info in infos: | |
| try: | |
| if info.ddr_tls_check_sync(lifetime): | |
| nameservers.extend(info.nameservers) | |
| except Exception: | |
| pass | |
| return nameservers | |
| async def _get_nameservers_async(answer, lifetime): | |
| """Return a list of TLS-validated resolver nameservers extracted from an SVCB | |
| answer.""" | |
| nameservers = [] | |
| infos = _extract_nameservers_from_svcb(answer) | |
| for info in infos: | |
| try: | |
| if await info.ddr_tls_check_async(lifetime): | |
| nameservers.extend(info.nameservers) | |
| except Exception: | |
| pass | |
| return nameservers | |