Spaces:
Running
Running
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
# Copyright (C) 2003-2017 Nominum, Inc. | |
# | |
# Permission to use, copy, modify, and distribute this software and its | |
# documentation for any purpose with or without fee is hereby granted, | |
# provided that the above copyright notice and this permission notice | |
# appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
"""DNS stub resolver.""" | |
import contextlib | |
import random | |
import socket | |
import sys | |
import threading | |
import time | |
import warnings | |
from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union | |
from urllib.parse import urlparse | |
import dns._ddr | |
import dns.edns | |
import dns.exception | |
import dns.flags | |
import dns.inet | |
import dns.ipv4 | |
import dns.ipv6 | |
import dns.message | |
import dns.name | |
import dns.nameserver | |
import dns.query | |
import dns.rcode | |
import dns.rdataclass | |
import dns.rdatatype | |
import dns.rdtypes.svcbbase | |
import dns.reversename | |
import dns.tsig | |
if sys.platform == "win32": | |
import dns.win32util | |
class NXDOMAIN(dns.exception.DNSException): | |
"""The DNS query name does not exist.""" | |
supp_kwargs = {"qnames", "responses"} | |
fmt = None # we have our own __str__ implementation | |
# pylint: disable=arguments-differ | |
# We do this as otherwise mypy complains about unexpected keyword argument | |
# idna_exception | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def _check_kwargs(self, qnames, responses=None): | |
if not isinstance(qnames, (list, tuple, set)): | |
raise AttributeError("qnames must be a list, tuple or set") | |
if len(qnames) == 0: | |
raise AttributeError("qnames must contain at least one element") | |
if responses is None: | |
responses = {} | |
elif not isinstance(responses, dict): | |
raise AttributeError("responses must be a dict(qname=response)") | |
kwargs = dict(qnames=qnames, responses=responses) | |
return kwargs | |
def __str__(self) -> str: | |
if "qnames" not in self.kwargs: | |
return super().__str__() | |
qnames = self.kwargs["qnames"] | |
if len(qnames) > 1: | |
msg = "None of DNS query names exist" | |
else: | |
msg = "The DNS query name does not exist" | |
qnames = ", ".join(map(str, qnames)) | |
return "{}: {}".format(msg, qnames) | |
def canonical_name(self): | |
"""Return the unresolved canonical name.""" | |
if "qnames" not in self.kwargs: | |
raise TypeError("parametrized exception required") | |
for qname in self.kwargs["qnames"]: | |
response = self.kwargs["responses"][qname] | |
try: | |
cname = response.canonical_name() | |
if cname != qname: | |
return cname | |
except Exception: | |
# We can just eat this exception as it means there was | |
# something wrong with the response. | |
pass | |
return self.kwargs["qnames"][0] | |
def __add__(self, e_nx): | |
"""Augment by results from another NXDOMAIN exception.""" | |
qnames0 = list(self.kwargs.get("qnames", [])) | |
responses0 = dict(self.kwargs.get("responses", {})) | |
responses1 = e_nx.kwargs.get("responses", {}) | |
for qname1 in e_nx.kwargs.get("qnames", []): | |
if qname1 not in qnames0: | |
qnames0.append(qname1) | |
if qname1 in responses1: | |
responses0[qname1] = responses1[qname1] | |
return NXDOMAIN(qnames=qnames0, responses=responses0) | |
def qnames(self): | |
"""All of the names that were tried. | |
Returns a list of ``dns.name.Name``. | |
""" | |
return self.kwargs["qnames"] | |
def responses(self): | |
"""A map from queried names to their NXDOMAIN responses. | |
Returns a dict mapping a ``dns.name.Name`` to a | |
``dns.message.Message``. | |
""" | |
return self.kwargs["responses"] | |
def response(self, qname): | |
"""The response for query *qname*. | |
Returns a ``dns.message.Message``. | |
""" | |
return self.kwargs["responses"][qname] | |
class YXDOMAIN(dns.exception.DNSException): | |
"""The DNS query name is too long after DNAME substitution.""" | |
ErrorTuple = Tuple[ | |
Optional[str], | |
bool, | |
int, | |
Union[Exception, str], | |
Optional[dns.message.Message], | |
] | |
def _errors_to_text(errors: List[ErrorTuple]) -> List[str]: | |
"""Turn a resolution errors trace into a list of text.""" | |
texts = [] | |
for err in errors: | |
texts.append("Server {} answered {}".format(err[0], err[3])) | |
return texts | |
class LifetimeTimeout(dns.exception.Timeout): | |
"""The resolution lifetime expired.""" | |
msg = "The resolution lifetime expired." | |
fmt = "%s after {timeout:.3f} seconds: {errors}" % msg[:-1] | |
supp_kwargs = {"timeout", "errors"} | |
# We do this as otherwise mypy complains about unexpected keyword argument | |
# idna_exception | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def _fmt_kwargs(self, **kwargs): | |
srv_msgs = _errors_to_text(kwargs["errors"]) | |
return super()._fmt_kwargs( | |
timeout=kwargs["timeout"], errors="; ".join(srv_msgs) | |
) | |
# We added more detail to resolution timeouts, but they are still | |
# subclasses of dns.exception.Timeout for backwards compatibility. We also | |
# keep dns.resolver.Timeout defined for backwards compatibility. | |
Timeout = LifetimeTimeout | |
class NoAnswer(dns.exception.DNSException): | |
"""The DNS response does not contain an answer to the question.""" | |
fmt = "The DNS response does not contain an answer to the question: {query}" | |
supp_kwargs = {"response"} | |
# We do this as otherwise mypy complains about unexpected keyword argument | |
# idna_exception | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def _fmt_kwargs(self, **kwargs): | |
return super()._fmt_kwargs(query=kwargs["response"].question) | |
def response(self): | |
return self.kwargs["response"] | |
class NoNameservers(dns.exception.DNSException): | |
"""All nameservers failed to answer the query. | |
errors: list of servers and respective errors | |
The type of errors is | |
[(server IP address, any object convertible to string)]. | |
Non-empty errors list will add explanatory message () | |
""" | |
msg = "All nameservers failed to answer the query." | |
fmt = "%s {query}: {errors}" % msg[:-1] | |
supp_kwargs = {"request", "errors"} | |
# We do this as otherwise mypy complains about unexpected keyword argument | |
# idna_exception | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def _fmt_kwargs(self, **kwargs): | |
srv_msgs = _errors_to_text(kwargs["errors"]) | |
return super()._fmt_kwargs( | |
query=kwargs["request"].question, errors="; ".join(srv_msgs) | |
) | |
class NotAbsolute(dns.exception.DNSException): | |
"""An absolute domain name is required but a relative name was provided.""" | |
class NoRootSOA(dns.exception.DNSException): | |
"""There is no SOA RR at the DNS root name. This should never happen!""" | |
class NoMetaqueries(dns.exception.DNSException): | |
"""DNS metaqueries are not allowed.""" | |
class NoResolverConfiguration(dns.exception.DNSException): | |
"""Resolver configuration could not be read or specified no nameservers.""" | |
class Answer: | |
"""DNS stub resolver answer. | |
Instances of this class bundle up the result of a successful DNS | |
resolution. | |
For convenience, the answer object implements much of the sequence | |
protocol, forwarding to its ``rrset`` attribute. E.g. | |
``for a in answer`` is equivalent to ``for a in answer.rrset``. | |
``answer[i]`` is equivalent to ``answer.rrset[i]``, and | |
``answer[i:j]`` is equivalent to ``answer.rrset[i:j]``. | |
Note that CNAMEs or DNAMEs in the response may mean that answer | |
RRset's name might not be the query name. | |
""" | |
def __init__( | |
self, | |
qname: dns.name.Name, | |
rdtype: dns.rdatatype.RdataType, | |
rdclass: dns.rdataclass.RdataClass, | |
response: dns.message.QueryMessage, | |
nameserver: Optional[str] = None, | |
port: Optional[int] = None, | |
) -> None: | |
self.qname = qname | |
self.rdtype = rdtype | |
self.rdclass = rdclass | |
self.response = response | |
self.nameserver = nameserver | |
self.port = port | |
self.chaining_result = response.resolve_chaining() | |
# Copy some attributes out of chaining_result for backwards | |
# compatibility and convenience. | |
self.canonical_name = self.chaining_result.canonical_name | |
self.rrset = self.chaining_result.answer | |
self.expiration = time.time() + self.chaining_result.minimum_ttl | |
def __getattr__(self, attr): # pragma: no cover | |
if attr == "name": | |
return self.rrset.name | |
elif attr == "ttl": | |
return self.rrset.ttl | |
elif attr == "covers": | |
return self.rrset.covers | |
elif attr == "rdclass": | |
return self.rrset.rdclass | |
elif attr == "rdtype": | |
return self.rrset.rdtype | |
else: | |
raise AttributeError(attr) | |
def __len__(self) -> int: | |
return self.rrset and len(self.rrset) or 0 | |
def __iter__(self): | |
return self.rrset and iter(self.rrset) or iter(tuple()) | |
def __getitem__(self, i): | |
if self.rrset is None: | |
raise IndexError | |
return self.rrset[i] | |
def __delitem__(self, i): | |
if self.rrset is None: | |
raise IndexError | |
del self.rrset[i] | |
class Answers(dict): | |
"""A dict of DNS stub resolver answers, indexed by type.""" | |
class HostAnswers(Answers): | |
"""A dict of DNS stub resolver answers to a host name lookup, indexed by | |
type. | |
""" | |
def make( | |
cls, | |
v6: Optional[Answer] = None, | |
v4: Optional[Answer] = None, | |
add_empty: bool = True, | |
) -> "HostAnswers": | |
answers = HostAnswers() | |
if v6 is not None and (add_empty or v6.rrset): | |
answers[dns.rdatatype.AAAA] = v6 | |
if v4 is not None and (add_empty or v4.rrset): | |
answers[dns.rdatatype.A] = v4 | |
return answers | |
# Returns pairs of (address, family) from this result, potentiallys | |
# filtering by address family. | |
def addresses_and_families( | |
self, family: int = socket.AF_UNSPEC | |
) -> Iterator[Tuple[str, int]]: | |
if family == socket.AF_UNSPEC: | |
yield from self.addresses_and_families(socket.AF_INET6) | |
yield from self.addresses_and_families(socket.AF_INET) | |
return | |
elif family == socket.AF_INET6: | |
answer = self.get(dns.rdatatype.AAAA) | |
elif family == socket.AF_INET: | |
answer = self.get(dns.rdatatype.A) | |
else: | |
raise NotImplementedError(f"unknown address family {family}") | |
if answer: | |
for rdata in answer: | |
yield (rdata.address, family) | |
# Returns addresses from this result, potentially filtering by | |
# address family. | |
def addresses(self, family: int = socket.AF_UNSPEC) -> Iterator[str]: | |
return (pair[0] for pair in self.addresses_and_families(family)) | |
# Returns the canonical name from this result. | |
def canonical_name(self) -> dns.name.Name: | |
answer = self.get(dns.rdatatype.AAAA, self.get(dns.rdatatype.A)) | |
return answer.canonical_name | |
class CacheStatistics: | |
"""Cache Statistics""" | |
def __init__(self, hits: int = 0, misses: int = 0) -> None: | |
self.hits = hits | |
self.misses = misses | |
def reset(self) -> None: | |
self.hits = 0 | |
self.misses = 0 | |
def clone(self) -> "CacheStatistics": | |
return CacheStatistics(self.hits, self.misses) | |
class CacheBase: | |
def __init__(self) -> None: | |
self.lock = threading.Lock() | |
self.statistics = CacheStatistics() | |
def reset_statistics(self) -> None: | |
"""Reset all statistics to zero.""" | |
with self.lock: | |
self.statistics.reset() | |
def hits(self) -> int: | |
"""How many hits has the cache had?""" | |
with self.lock: | |
return self.statistics.hits | |
def misses(self) -> int: | |
"""How many misses has the cache had?""" | |
with self.lock: | |
return self.statistics.misses | |
def get_statistics_snapshot(self) -> CacheStatistics: | |
"""Return a consistent snapshot of all the statistics. | |
If running with multiple threads, it's better to take a | |
snapshot than to call statistics methods such as hits() and | |
misses() individually. | |
""" | |
with self.lock: | |
return self.statistics.clone() | |
CacheKey = Tuple[dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass] | |
class Cache(CacheBase): | |
"""Simple thread-safe DNS answer cache.""" | |
def __init__(self, cleaning_interval: float = 300.0) -> None: | |
"""*cleaning_interval*, a ``float`` is the number of seconds between | |
periodic cleanings. | |
""" | |
super().__init__() | |
self.data: Dict[CacheKey, Answer] = {} | |
self.cleaning_interval = cleaning_interval | |
self.next_cleaning: float = time.time() + self.cleaning_interval | |
def _maybe_clean(self) -> None: | |
"""Clean the cache if it's time to do so.""" | |
now = time.time() | |
if self.next_cleaning <= now: | |
keys_to_delete = [] | |
for k, v in self.data.items(): | |
if v.expiration <= now: | |
keys_to_delete.append(k) | |
for k in keys_to_delete: | |
del self.data[k] | |
now = time.time() | |
self.next_cleaning = now + self.cleaning_interval | |
def get(self, key: CacheKey) -> Optional[Answer]: | |
"""Get the answer associated with *key*. | |
Returns None if no answer is cached for the key. | |
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
tuple whose values are the query name, rdtype, and rdclass respectively. | |
Returns a ``dns.resolver.Answer`` or ``None``. | |
""" | |
with self.lock: | |
self._maybe_clean() | |
v = self.data.get(key) | |
if v is None or v.expiration <= time.time(): | |
self.statistics.misses += 1 | |
return None | |
self.statistics.hits += 1 | |
return v | |
def put(self, key: CacheKey, value: Answer) -> None: | |
"""Associate key and value in the cache. | |
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
tuple whose values are the query name, rdtype, and rdclass respectively. | |
*value*, a ``dns.resolver.Answer``, the answer. | |
""" | |
with self.lock: | |
self._maybe_clean() | |
self.data[key] = value | |
def flush(self, key: Optional[CacheKey] = None) -> None: | |
"""Flush the cache. | |
If *key* is not ``None``, only that item is flushed. Otherwise the entire cache | |
is flushed. | |
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
tuple whose values are the query name, rdtype, and rdclass respectively. | |
""" | |
with self.lock: | |
if key is not None: | |
if key in self.data: | |
del self.data[key] | |
else: | |
self.data = {} | |
self.next_cleaning = time.time() + self.cleaning_interval | |
class LRUCacheNode: | |
"""LRUCache node.""" | |
def __init__(self, key, value): | |
self.key = key | |
self.value = value | |
self.hits = 0 | |
self.prev = self | |
self.next = self | |
def link_after(self, node: "LRUCacheNode") -> None: | |
self.prev = node | |
self.next = node.next | |
node.next.prev = self | |
node.next = self | |
def unlink(self) -> None: | |
self.next.prev = self.prev | |
self.prev.next = self.next | |
class LRUCache(CacheBase): | |
"""Thread-safe, bounded, least-recently-used DNS answer cache. | |
This cache is better than the simple cache (above) if you're | |
running a web crawler or other process that does a lot of | |
resolutions. The LRUCache has a maximum number of nodes, and when | |
it is full, the least-recently used node is removed to make space | |
for a new one. | |
""" | |
def __init__(self, max_size: int = 100000) -> None: | |
"""*max_size*, an ``int``, is the maximum number of nodes to cache; | |
it must be greater than 0. | |
""" | |
super().__init__() | |
self.data: Dict[CacheKey, LRUCacheNode] = {} | |
self.set_max_size(max_size) | |
self.sentinel: LRUCacheNode = LRUCacheNode(None, None) | |
self.sentinel.prev = self.sentinel | |
self.sentinel.next = self.sentinel | |
def set_max_size(self, max_size: int) -> None: | |
if max_size < 1: | |
max_size = 1 | |
self.max_size = max_size | |
def get(self, key: CacheKey) -> Optional[Answer]: | |
"""Get the answer associated with *key*. | |
Returns None if no answer is cached for the key. | |
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
tuple whose values are the query name, rdtype, and rdclass respectively. | |
Returns a ``dns.resolver.Answer`` or ``None``. | |
""" | |
with self.lock: | |
node = self.data.get(key) | |
if node is None: | |
self.statistics.misses += 1 | |
return None | |
# Unlink because we're either going to move the node to the front | |
# of the LRU list or we're going to free it. | |
node.unlink() | |
if node.value.expiration <= time.time(): | |
del self.data[node.key] | |
self.statistics.misses += 1 | |
return None | |
node.link_after(self.sentinel) | |
self.statistics.hits += 1 | |
node.hits += 1 | |
return node.value | |
def get_hits_for_key(self, key: CacheKey) -> int: | |
"""Return the number of cache hits associated with the specified key.""" | |
with self.lock: | |
node = self.data.get(key) | |
if node is None or node.value.expiration <= time.time(): | |
return 0 | |
else: | |
return node.hits | |
def put(self, key: CacheKey, value: Answer) -> None: | |
"""Associate key and value in the cache. | |
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
tuple whose values are the query name, rdtype, and rdclass respectively. | |
*value*, a ``dns.resolver.Answer``, the answer. | |
""" | |
with self.lock: | |
node = self.data.get(key) | |
if node is not None: | |
node.unlink() | |
del self.data[node.key] | |
while len(self.data) >= self.max_size: | |
gnode = self.sentinel.prev | |
gnode.unlink() | |
del self.data[gnode.key] | |
node = LRUCacheNode(key, value) | |
node.link_after(self.sentinel) | |
self.data[key] = node | |
def flush(self, key: Optional[CacheKey] = None) -> None: | |
"""Flush the cache. | |
If *key* is not ``None``, only that item is flushed. Otherwise the entire cache | |
is flushed. | |
*key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` | |
tuple whose values are the query name, rdtype, and rdclass respectively. | |
""" | |
with self.lock: | |
if key is not None: | |
node = self.data.get(key) | |
if node is not None: | |
node.unlink() | |
del self.data[node.key] | |
else: | |
gnode = self.sentinel.next | |
while gnode != self.sentinel: | |
next = gnode.next | |
gnode.unlink() | |
gnode = next | |
self.data = {} | |
class _Resolution: | |
"""Helper class for dns.resolver.Resolver.resolve(). | |
All of the "business logic" of resolution is encapsulated in this | |
class, allowing us to have multiple resolve() implementations | |
using different I/O schemes without copying all of the | |
complicated logic. | |
This class is a "friend" to dns.resolver.Resolver and manipulates | |
resolver data structures directly. | |
""" | |
def __init__( | |
self, | |
resolver: "BaseResolver", | |
qname: Union[dns.name.Name, str], | |
rdtype: Union[dns.rdatatype.RdataType, str], | |
rdclass: Union[dns.rdataclass.RdataClass, str], | |
tcp: bool, | |
raise_on_no_answer: bool, | |
search: Optional[bool], | |
) -> None: | |
if isinstance(qname, str): | |
qname = dns.name.from_text(qname, None) | |
rdtype = dns.rdatatype.RdataType.make(rdtype) | |
if dns.rdatatype.is_metatype(rdtype): | |
raise NoMetaqueries | |
rdclass = dns.rdataclass.RdataClass.make(rdclass) | |
if dns.rdataclass.is_metaclass(rdclass): | |
raise NoMetaqueries | |
self.resolver = resolver | |
self.qnames_to_try = resolver._get_qnames_to_try(qname, search) | |
self.qnames = self.qnames_to_try[:] | |
self.rdtype = rdtype | |
self.rdclass = rdclass | |
self.tcp = tcp | |
self.raise_on_no_answer = raise_on_no_answer | |
self.nxdomain_responses: Dict[dns.name.Name, dns.message.QueryMessage] = {} | |
# Initialize other things to help analysis tools | |
self.qname = dns.name.empty | |
self.nameservers: List[dns.nameserver.Nameserver] = [] | |
self.current_nameservers: List[dns.nameserver.Nameserver] = [] | |
self.errors: List[ErrorTuple] = [] | |
self.nameserver: Optional[dns.nameserver.Nameserver] = None | |
self.tcp_attempt = False | |
self.retry_with_tcp = False | |
self.request: Optional[dns.message.QueryMessage] = None | |
self.backoff = 0.0 | |
def next_request( | |
self, | |
) -> Tuple[Optional[dns.message.QueryMessage], Optional[Answer]]: | |
"""Get the next request to send, and check the cache. | |
Returns a (request, answer) tuple. At most one of request or | |
answer will not be None. | |
""" | |
# We return a tuple instead of Union[Message,Answer] as it lets | |
# the caller avoid isinstance(). | |
while len(self.qnames) > 0: | |
self.qname = self.qnames.pop(0) | |
# Do we know the answer? | |
if self.resolver.cache: | |
answer = self.resolver.cache.get( | |
(self.qname, self.rdtype, self.rdclass) | |
) | |
if answer is not None: | |
if answer.rrset is None and self.raise_on_no_answer: | |
raise NoAnswer(response=answer.response) | |
else: | |
return (None, answer) | |
answer = self.resolver.cache.get( | |
(self.qname, dns.rdatatype.ANY, self.rdclass) | |
) | |
if answer is not None and answer.response.rcode() == dns.rcode.NXDOMAIN: | |
# cached NXDOMAIN; record it and continue to next | |
# name. | |
self.nxdomain_responses[self.qname] = answer.response | |
continue | |
# Build the request | |
request = dns.message.make_query(self.qname, self.rdtype, self.rdclass) | |
if self.resolver.keyname is not None: | |
request.use_tsig( | |
self.resolver.keyring, | |
self.resolver.keyname, | |
algorithm=self.resolver.keyalgorithm, | |
) | |
request.use_edns( | |
self.resolver.edns, | |
self.resolver.ednsflags, | |
self.resolver.payload, | |
options=self.resolver.ednsoptions, | |
) | |
if self.resolver.flags is not None: | |
request.flags = self.resolver.flags | |
self.nameservers = self.resolver._enrich_nameservers( | |
self.resolver._nameservers, | |
self.resolver.nameserver_ports, | |
self.resolver.port, | |
) | |
if self.resolver.rotate: | |
random.shuffle(self.nameservers) | |
self.current_nameservers = self.nameservers[:] | |
self.errors = [] | |
self.nameserver = None | |
self.tcp_attempt = False | |
self.retry_with_tcp = False | |
self.request = request | |
self.backoff = 0.10 | |
return (request, None) | |
# | |
# We've tried everything and only gotten NXDOMAINs. (We know | |
# it's only NXDOMAINs as anything else would have returned | |
# before now.) | |
# | |
raise NXDOMAIN(qnames=self.qnames_to_try, responses=self.nxdomain_responses) | |
def next_nameserver(self) -> Tuple[dns.nameserver.Nameserver, bool, float]: | |
if self.retry_with_tcp: | |
assert self.nameserver is not None | |
assert not self.nameserver.is_always_max_size() | |
self.tcp_attempt = True | |
self.retry_with_tcp = False | |
return (self.nameserver, True, 0) | |
backoff = 0.0 | |
if not self.current_nameservers: | |
if len(self.nameservers) == 0: | |
# Out of things to try! | |
raise NoNameservers(request=self.request, errors=self.errors) | |
self.current_nameservers = self.nameservers[:] | |
backoff = self.backoff | |
self.backoff = min(self.backoff * 2, 2) | |
self.nameserver = self.current_nameservers.pop(0) | |
self.tcp_attempt = self.tcp or self.nameserver.is_always_max_size() | |
return (self.nameserver, self.tcp_attempt, backoff) | |
def query_result( | |
self, response: Optional[dns.message.Message], ex: Optional[Exception] | |
) -> Tuple[Optional[Answer], bool]: | |
# | |
# returns an (answer: Answer, end_loop: bool) tuple. | |
# | |
assert self.nameserver is not None | |
if ex: | |
# Exception during I/O or from_wire() | |
assert response is None | |
self.errors.append( | |
( | |
str(self.nameserver), | |
self.tcp_attempt, | |
self.nameserver.answer_port(), | |
ex, | |
response, | |
) | |
) | |
if ( | |
isinstance(ex, dns.exception.FormError) | |
or isinstance(ex, EOFError) | |
or isinstance(ex, OSError) | |
or isinstance(ex, NotImplementedError) | |
): | |
# This nameserver is no good, take it out of the mix. | |
self.nameservers.remove(self.nameserver) | |
elif isinstance(ex, dns.message.Truncated): | |
if self.tcp_attempt: | |
# Truncation with TCP is no good! | |
self.nameservers.remove(self.nameserver) | |
else: | |
self.retry_with_tcp = True | |
return (None, False) | |
# We got an answer! | |
assert response is not None | |
assert isinstance(response, dns.message.QueryMessage) | |
rcode = response.rcode() | |
if rcode == dns.rcode.NOERROR: | |
try: | |
answer = Answer( | |
self.qname, | |
self.rdtype, | |
self.rdclass, | |
response, | |
self.nameserver.answer_nameserver(), | |
self.nameserver.answer_port(), | |
) | |
except Exception as e: | |
self.errors.append( | |
( | |
str(self.nameserver), | |
self.tcp_attempt, | |
self.nameserver.answer_port(), | |
e, | |
response, | |
) | |
) | |
# The nameserver is no good, take it out of the mix. | |
self.nameservers.remove(self.nameserver) | |
return (None, False) | |
if self.resolver.cache: | |
self.resolver.cache.put((self.qname, self.rdtype, self.rdclass), answer) | |
if answer.rrset is None and self.raise_on_no_answer: | |
raise NoAnswer(response=answer.response) | |
return (answer, True) | |
elif rcode == dns.rcode.NXDOMAIN: | |
# Further validate the response by making an Answer, even | |
# if we aren't going to cache it. | |
try: | |
answer = Answer( | |
self.qname, dns.rdatatype.ANY, dns.rdataclass.IN, response | |
) | |
except Exception as e: | |
self.errors.append( | |
( | |
str(self.nameserver), | |
self.tcp_attempt, | |
self.nameserver.answer_port(), | |
e, | |
response, | |
) | |
) | |
# The nameserver is no good, take it out of the mix. | |
self.nameservers.remove(self.nameserver) | |
return (None, False) | |
self.nxdomain_responses[self.qname] = response | |
if self.resolver.cache: | |
self.resolver.cache.put( | |
(self.qname, dns.rdatatype.ANY, self.rdclass), answer | |
) | |
# Make next_nameserver() return None, so caller breaks its | |
# inner loop and calls next_request(). | |
return (None, True) | |
elif rcode == dns.rcode.YXDOMAIN: | |
yex = YXDOMAIN() | |
self.errors.append( | |
( | |
str(self.nameserver), | |
self.tcp_attempt, | |
self.nameserver.answer_port(), | |
yex, | |
response, | |
) | |
) | |
raise yex | |
else: | |
# | |
# We got a response, but we're not happy with the | |
# rcode in it. | |
# | |
if rcode != dns.rcode.SERVFAIL or not self.resolver.retry_servfail: | |
self.nameservers.remove(self.nameserver) | |
self.errors.append( | |
( | |
str(self.nameserver), | |
self.tcp_attempt, | |
self.nameserver.answer_port(), | |
dns.rcode.to_text(rcode), | |
response, | |
) | |
) | |
return (None, False) | |
class BaseResolver: | |
"""DNS stub resolver.""" | |
# We initialize in reset() | |
# | |
# pylint: disable=attribute-defined-outside-init | |
domain: dns.name.Name | |
nameserver_ports: Dict[str, int] | |
port: int | |
search: List[dns.name.Name] | |
use_search_by_default: bool | |
timeout: float | |
lifetime: float | |
keyring: Optional[Any] | |
keyname: Optional[Union[dns.name.Name, str]] | |
keyalgorithm: Union[dns.name.Name, str] | |
edns: int | |
ednsflags: int | |
ednsoptions: Optional[List[dns.edns.Option]] | |
payload: int | |
cache: Any | |
flags: Optional[int] | |
retry_servfail: bool | |
rotate: bool | |
ndots: Optional[int] | |
_nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] | |
def __init__( | |
self, filename: str = "/etc/resolv.conf", configure: bool = True | |
) -> None: | |
"""*filename*, a ``str`` or file object, specifying a file | |
in standard /etc/resolv.conf format. This parameter is meaningful | |
only when *configure* is true and the platform is POSIX. | |
*configure*, a ``bool``. If True (the default), the resolver | |
instance is configured in the normal fashion for the operating | |
system the resolver is running on. (I.e. by reading a | |
/etc/resolv.conf file on POSIX systems and from the registry | |
on Windows systems.) | |
""" | |
self.reset() | |
if configure: | |
if sys.platform == "win32": | |
self.read_registry() | |
elif filename: | |
self.read_resolv_conf(filename) | |
def reset(self) -> None: | |
"""Reset all resolver configuration to the defaults.""" | |
self.domain = dns.name.Name(dns.name.from_text(socket.gethostname())[1:]) | |
if len(self.domain) == 0: | |
self.domain = dns.name.root | |
self._nameservers = [] | |
self.nameserver_ports = {} | |
self.port = 53 | |
self.search = [] | |
self.use_search_by_default = False | |
self.timeout = 2.0 | |
self.lifetime = 5.0 | |
self.keyring = None | |
self.keyname = None | |
self.keyalgorithm = dns.tsig.default_algorithm | |
self.edns = -1 | |
self.ednsflags = 0 | |
self.ednsoptions = None | |
self.payload = 0 | |
self.cache = None | |
self.flags = None | |
self.retry_servfail = False | |
self.rotate = False | |
self.ndots = None | |
def read_resolv_conf(self, f: Any) -> None: | |
"""Process *f* as a file in the /etc/resolv.conf format. If f is | |
a ``str``, it is used as the name of the file to open; otherwise it | |
is treated as the file itself. | |
Interprets the following items: | |
- nameserver - name server IP address | |
- domain - local domain name | |
- search - search list for host-name lookup | |
- options - supported options are rotate, timeout, edns0, and ndots | |
""" | |
nameservers = [] | |
if isinstance(f, str): | |
try: | |
cm: contextlib.AbstractContextManager = open(f) | |
except OSError: | |
# /etc/resolv.conf doesn't exist, can't be read, etc. | |
raise NoResolverConfiguration(f"cannot open {f}") | |
else: | |
cm = contextlib.nullcontext(f) | |
with cm as f: | |
for l in f: | |
if len(l) == 0 or l[0] == "#" or l[0] == ";": | |
continue | |
tokens = l.split() | |
# Any line containing less than 2 tokens is malformed | |
if len(tokens) < 2: | |
continue | |
if tokens[0] == "nameserver": | |
nameservers.append(tokens[1]) | |
elif tokens[0] == "domain": | |
self.domain = dns.name.from_text(tokens[1]) | |
# domain and search are exclusive | |
self.search = [] | |
elif tokens[0] == "search": | |
# the last search wins | |
self.search = [] | |
for suffix in tokens[1:]: | |
self.search.append(dns.name.from_text(suffix)) | |
# We don't set domain as it is not used if | |
# len(self.search) > 0 | |
elif tokens[0] == "options": | |
for opt in tokens[1:]: | |
if opt == "rotate": | |
self.rotate = True | |
elif opt == "edns0": | |
self.use_edns() | |
elif "timeout" in opt: | |
try: | |
self.timeout = int(opt.split(":")[1]) | |
except (ValueError, IndexError): | |
pass | |
elif "ndots" in opt: | |
try: | |
self.ndots = int(opt.split(":")[1]) | |
except (ValueError, IndexError): | |
pass | |
if len(nameservers) == 0: | |
raise NoResolverConfiguration("no nameservers") | |
# Assigning directly instead of appending means we invoke the | |
# setter logic, with additonal checking and enrichment. | |
self.nameservers = nameservers | |
def read_registry(self) -> None: | |
"""Extract resolver configuration from the Windows registry.""" | |
try: | |
info = dns.win32util.get_dns_info() # type: ignore | |
if info.domain is not None: | |
self.domain = info.domain | |
self.nameservers = info.nameservers | |
self.search = info.search | |
except AttributeError: | |
raise NotImplementedError | |
def _compute_timeout( | |
self, | |
start: float, | |
lifetime: Optional[float] = None, | |
errors: Optional[List[ErrorTuple]] = None, | |
) -> float: | |
lifetime = self.lifetime if lifetime is None else lifetime | |
now = time.time() | |
duration = now - start | |
if errors is None: | |
errors = [] | |
if duration < 0: | |
if duration < -1: | |
# Time going backwards is bad. Just give up. | |
raise LifetimeTimeout(timeout=duration, errors=errors) | |
else: | |
# Time went backwards, but only a little. This can | |
# happen, e.g. under vmware with older linux kernels. | |
# Pretend it didn't happen. | |
duration = 0 | |
if duration >= lifetime: | |
raise LifetimeTimeout(timeout=duration, errors=errors) | |
return min(lifetime - duration, self.timeout) | |
def _get_qnames_to_try( | |
self, qname: dns.name.Name, search: Optional[bool] | |
) -> List[dns.name.Name]: | |
# This is a separate method so we can unit test the search | |
# rules without requiring the Internet. | |
if search is None: | |
search = self.use_search_by_default | |
qnames_to_try = [] | |
if qname.is_absolute(): | |
qnames_to_try.append(qname) | |
else: | |
abs_qname = qname.concatenate(dns.name.root) | |
if search: | |
if len(self.search) > 0: | |
# There is a search list, so use it exclusively | |
search_list = self.search[:] | |
elif self.domain != dns.name.root and self.domain is not None: | |
# We have some notion of a domain that isn't the root, so | |
# use it as the search list. | |
search_list = [self.domain] | |
else: | |
search_list = [] | |
# Figure out the effective ndots (default is 1) | |
if self.ndots is None: | |
ndots = 1 | |
else: | |
ndots = self.ndots | |
for suffix in search_list: | |
qnames_to_try.append(qname + suffix) | |
if len(qname) > ndots: | |
# The name has at least ndots dots, so we should try an | |
# absolute query first. | |
qnames_to_try.insert(0, abs_qname) | |
else: | |
# The name has less than ndots dots, so we should search | |
# first, then try the absolute name. | |
qnames_to_try.append(abs_qname) | |
else: | |
qnames_to_try.append(abs_qname) | |
return qnames_to_try | |
def use_tsig( | |
self, | |
keyring: Any, | |
keyname: Optional[Union[dns.name.Name, str]] = None, | |
algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, | |
) -> None: | |
"""Add a TSIG signature to each query. | |
The parameters are passed to ``dns.message.Message.use_tsig()``; | |
see its documentation for details. | |
""" | |
self.keyring = keyring | |
self.keyname = keyname | |
self.keyalgorithm = algorithm | |
def use_edns( | |
self, | |
edns: Optional[Union[int, bool]] = 0, | |
ednsflags: int = 0, | |
payload: int = dns.message.DEFAULT_EDNS_PAYLOAD, | |
options: Optional[List[dns.edns.Option]] = None, | |
) -> None: | |
"""Configure EDNS behavior. | |
*edns*, an ``int``, is the EDNS level to use. Specifying | |
``None``, ``False``, or ``-1`` means "do not use EDNS", and in this case | |
the other parameters are ignored. Specifying ``True`` is | |
equivalent to specifying 0, i.e. "use EDNS0". | |
*ednsflags*, an ``int``, the EDNS flag values. | |
*payload*, an ``int``, is the EDNS sender's payload field, which is the | |
maximum size of UDP datagram the sender can handle. I.e. how big | |
a response to this message can be. | |
*options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS | |
options. | |
""" | |
if edns is None or edns is False: | |
edns = -1 | |
elif edns is True: | |
edns = 0 | |
self.edns = edns | |
self.ednsflags = ednsflags | |
self.payload = payload | |
self.ednsoptions = options | |
def set_flags(self, flags: int) -> None: | |
"""Overrides the default flags with your own. | |
*flags*, an ``int``, the message flags to use. | |
""" | |
self.flags = flags | |
def _enrich_nameservers( | |
cls, | |
nameservers: Sequence[Union[str, dns.nameserver.Nameserver]], | |
nameserver_ports: Dict[str, int], | |
default_port: int, | |
) -> List[dns.nameserver.Nameserver]: | |
enriched_nameservers = [] | |
if isinstance(nameservers, list): | |
for nameserver in nameservers: | |
enriched_nameserver: dns.nameserver.Nameserver | |
if isinstance(nameserver, dns.nameserver.Nameserver): | |
enriched_nameserver = nameserver | |
elif dns.inet.is_address(nameserver): | |
port = nameserver_ports.get(nameserver, default_port) | |
enriched_nameserver = dns.nameserver.Do53Nameserver( | |
nameserver, port | |
) | |
else: | |
try: | |
if urlparse(nameserver).scheme != "https": | |
raise NotImplementedError | |
except Exception: | |
raise ValueError( | |
f"nameserver {nameserver} is not a " | |
"dns.nameserver.Nameserver instance or text form, " | |
"IP address, nor a valid https URL" | |
) | |
enriched_nameserver = dns.nameserver.DoHNameserver(nameserver) | |
enriched_nameservers.append(enriched_nameserver) | |
else: | |
raise ValueError( | |
"nameservers must be a list or tuple (not a {})".format( | |
type(nameservers) | |
) | |
) | |
return enriched_nameservers | |
def nameservers( | |
self, | |
) -> Sequence[Union[str, dns.nameserver.Nameserver]]: | |
return self._nameservers | |
def nameservers( | |
self, nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] | |
) -> None: | |
""" | |
*nameservers*, a ``list`` of nameservers, where a nameserver is either | |
a string interpretable as a nameserver, or a ``dns.nameserver.Nameserver`` | |
instance. | |
Raises ``ValueError`` if *nameservers* is not a list of nameservers. | |
""" | |
# We just call _enrich_nameservers() for checking | |
self._enrich_nameservers(nameservers, self.nameserver_ports, self.port) | |
self._nameservers = nameservers | |
class Resolver(BaseResolver): | |
"""DNS stub resolver.""" | |
def resolve( | |
self, | |
qname: Union[dns.name.Name, str], | |
rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
tcp: bool = False, | |
source: Optional[str] = None, | |
raise_on_no_answer: bool = True, | |
source_port: int = 0, | |
lifetime: Optional[float] = None, | |
search: Optional[bool] = None, | |
) -> Answer: # pylint: disable=arguments-differ | |
"""Query nameservers to find the answer to the question. | |
The *qname*, *rdtype*, and *rdclass* parameters may be objects | |
of the appropriate type, or strings that can be converted into objects | |
of the appropriate type. | |
*qname*, a ``dns.name.Name`` or ``str``, the query name. | |
*rdtype*, an ``int`` or ``str``, the query type. | |
*rdclass*, an ``int`` or ``str``, the query class. | |
*tcp*, a ``bool``. If ``True``, use TCP to make the query. | |
*source*, a ``str`` or ``None``. If not ``None``, bind to this IP | |
address when making queries. | |
*raise_on_no_answer*, a ``bool``. If ``True``, raise | |
``dns.resolver.NoAnswer`` if there's no answer to the question. | |
*source_port*, an ``int``, the port from which to send the message. | |
*lifetime*, a ``float``, how many seconds a query should run | |
before timing out. | |
*search*, a ``bool`` or ``None``, determines whether the | |
search list configured in the system's resolver configuration | |
are used for relative names, and whether the resolver's domain | |
may be added to relative names. The default is ``None``, | |
which causes the value of the resolver's | |
``use_search_by_default`` attribute to be used. | |
Raises ``dns.resolver.LifetimeTimeout`` if no answers could be found | |
in the specified lifetime. | |
Raises ``dns.resolver.NXDOMAIN`` if the query name does not exist. | |
Raises ``dns.resolver.YXDOMAIN`` if the query name is too long after | |
DNAME substitution. | |
Raises ``dns.resolver.NoAnswer`` if *raise_on_no_answer* is | |
``True`` and the query name exists but has no RRset of the | |
desired type and class. | |
Raises ``dns.resolver.NoNameservers`` if no non-broken | |
nameservers are available to answer the question. | |
Returns a ``dns.resolver.Answer`` instance. | |
""" | |
resolution = _Resolution( | |
self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search | |
) | |
start = time.time() | |
while True: | |
(request, answer) = resolution.next_request() | |
# Note we need to say "if answer is not None" and not just | |
# "if answer" because answer implements __len__, and python | |
# will call that. We want to return if we have an answer | |
# object, including in cases where its length is 0. | |
if answer is not None: | |
# cache hit! | |
return answer | |
assert request is not None # needed for type checking | |
done = False | |
while not done: | |
(nameserver, tcp, backoff) = resolution.next_nameserver() | |
if backoff: | |
time.sleep(backoff) | |
timeout = self._compute_timeout(start, lifetime, resolution.errors) | |
try: | |
response = nameserver.query( | |
request, | |
timeout=timeout, | |
source=source, | |
source_port=source_port, | |
max_size=tcp, | |
) | |
except Exception as ex: | |
(_, done) = resolution.query_result(None, ex) | |
continue | |
(answer, done) = resolution.query_result(response, None) | |
# Note we need to say "if answer is not None" and not just | |
# "if answer" because answer implements __len__, and python | |
# will call that. We want to return if we have an answer | |
# object, including in cases where its length is 0. | |
if answer is not None: | |
return answer | |
def query( | |
self, | |
qname: Union[dns.name.Name, str], | |
rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
tcp: bool = False, | |
source: Optional[str] = None, | |
raise_on_no_answer: bool = True, | |
source_port: int = 0, | |
lifetime: Optional[float] = None, | |
) -> Answer: # pragma: no cover | |
"""Query nameservers to find the answer to the question. | |
This method calls resolve() with ``search=True``, and is | |
provided for backwards compatibility with prior versions of | |
dnspython. See the documentation for the resolve() method for | |
further details. | |
""" | |
warnings.warn( | |
"please use dns.resolver.Resolver.resolve() instead", | |
DeprecationWarning, | |
stacklevel=2, | |
) | |
return self.resolve( | |
qname, | |
rdtype, | |
rdclass, | |
tcp, | |
source, | |
raise_on_no_answer, | |
source_port, | |
lifetime, | |
True, | |
) | |
def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Any) -> Answer: | |
"""Use a resolver to run a reverse query for PTR records. | |
This utilizes the resolve() method to perform a PTR lookup on the | |
specified IP address. | |
*ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get | |
the PTR record for. | |
All other arguments that can be passed to the resolve() function | |
except for rdtype and rdclass are also supported by this | |
function. | |
""" | |
# We make a modified kwargs for type checking happiness, as otherwise | |
# we get a legit warning about possibly having rdtype and rdclass | |
# in the kwargs more than once. | |
modified_kwargs: Dict[str, Any] = {} | |
modified_kwargs.update(kwargs) | |
modified_kwargs["rdtype"] = dns.rdatatype.PTR | |
modified_kwargs["rdclass"] = dns.rdataclass.IN | |
return self.resolve( | |
dns.reversename.from_address(ipaddr), *args, **modified_kwargs | |
) | |
def resolve_name( | |
self, | |
name: Union[dns.name.Name, str], | |
family: int = socket.AF_UNSPEC, | |
**kwargs: Any, | |
) -> HostAnswers: | |
"""Use a resolver to query for address records. | |
This utilizes the resolve() method to perform A and/or AAAA lookups on | |
the specified name. | |
*qname*, a ``dns.name.Name`` or ``str``, the name to resolve. | |
*family*, an ``int``, the address family. If socket.AF_UNSPEC | |
(the default), both A and AAAA records will be retrieved. | |
All other arguments that can be passed to the resolve() function | |
except for rdtype and rdclass are also supported by this | |
function. | |
""" | |
# We make a modified kwargs for type checking happiness, as otherwise | |
# we get a legit warning about possibly having rdtype and rdclass | |
# in the kwargs more than once. | |
modified_kwargs: Dict[str, Any] = {} | |
modified_kwargs.update(kwargs) | |
modified_kwargs.pop("rdtype", None) | |
modified_kwargs["rdclass"] = dns.rdataclass.IN | |
if family == socket.AF_INET: | |
v4 = self.resolve(name, dns.rdatatype.A, **modified_kwargs) | |
return HostAnswers.make(v4=v4) | |
elif family == socket.AF_INET6: | |
v6 = self.resolve(name, dns.rdatatype.AAAA, **modified_kwargs) | |
return HostAnswers.make(v6=v6) | |
elif family != socket.AF_UNSPEC: | |
raise NotImplementedError(f"unknown address family {family}") | |
raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True) | |
lifetime = modified_kwargs.pop("lifetime", None) | |
start = time.time() | |
v6 = self.resolve( | |
name, | |
dns.rdatatype.AAAA, | |
raise_on_no_answer=False, | |
lifetime=self._compute_timeout(start, lifetime), | |
**modified_kwargs, | |
) | |
# Note that setting name ensures we query the same name | |
# for A as we did for AAAA. (This is just in case search lists | |
# are active by default in the resolver configuration and | |
# we might be talking to a server that says NXDOMAIN when it | |
# wants to say NOERROR no data. | |
name = v6.qname | |
v4 = self.resolve( | |
name, | |
dns.rdatatype.A, | |
raise_on_no_answer=False, | |
lifetime=self._compute_timeout(start, lifetime), | |
**modified_kwargs, | |
) | |
answers = HostAnswers.make(v6=v6, v4=v4, add_empty=not raise_on_no_answer) | |
if not answers: | |
raise NoAnswer(response=v6.response) | |
return answers | |
# pylint: disable=redefined-outer-name | |
def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: | |
"""Determine the canonical name of *name*. | |
The canonical name is the name the resolver uses for queries | |
after all CNAME and DNAME renamings have been applied. | |
*name*, a ``dns.name.Name`` or ``str``, the query name. | |
This method can raise any exception that ``resolve()`` can | |
raise, other than ``dns.resolver.NoAnswer`` and | |
``dns.resolver.NXDOMAIN``. | |
Returns a ``dns.name.Name``. | |
""" | |
try: | |
answer = self.resolve(name, raise_on_no_answer=False) | |
canonical_name = answer.canonical_name | |
except dns.resolver.NXDOMAIN as e: | |
canonical_name = e.canonical_name | |
return canonical_name | |
# pylint: enable=redefined-outer-name | |
def try_ddr(self, lifetime: float = 5.0) -> None: | |
"""Try to update the resolver's nameservers using Discovery of Designated | |
Resolvers (DDR). If successful, the resolver will subsequently use | |
DNS-over-HTTPS or DNS-over-TLS for future queries. | |
*lifetime*, a float, is the maximum time to spend attempting DDR. The default | |
is 5 seconds. | |
If the SVCB query is successful and results in a non-empty list of nameservers, | |
then the resolver's nameservers are set to the returned servers in priority | |
order. | |
The current implementation does not use any address hints from the SVCB record, | |
nor does it resolve addresses for the SCVB target name, rather it assumes that | |
the bootstrap nameserver will always be one of the addresses and uses it. | |
A future revision to the code may offer fuller support. The code verifies that | |
the bootstrap nameserver is in the Subject Alternative Name field of the | |
TLS certficate. | |
""" | |
try: | |
expiration = time.time() + lifetime | |
answer = self.resolve( | |
dns._ddr._local_resolver_name, "SVCB", lifetime=lifetime | |
) | |
timeout = dns.query._remaining(expiration) | |
nameservers = dns._ddr._get_nameservers_sync(answer, timeout) | |
if len(nameservers) > 0: | |
self.nameservers = nameservers | |
except Exception: | |
pass | |
#: The default resolver. | |
default_resolver: Optional[Resolver] = None | |
def get_default_resolver() -> Resolver: | |
"""Get the default resolver, initializing it if necessary.""" | |
if default_resolver is None: | |
reset_default_resolver() | |
assert default_resolver is not None | |
return default_resolver | |
def reset_default_resolver() -> None: | |
"""Re-initialize default resolver. | |
Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX | |
systems) will be re-read immediately. | |
""" | |
global default_resolver | |
default_resolver = Resolver() | |
def resolve( | |
qname: Union[dns.name.Name, str], | |
rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
tcp: bool = False, | |
source: Optional[str] = None, | |
raise_on_no_answer: bool = True, | |
source_port: int = 0, | |
lifetime: Optional[float] = None, | |
search: Optional[bool] = None, | |
) -> Answer: # pragma: no cover | |
"""Query nameservers to find the answer to the question. | |
This is a convenience function that uses the default resolver | |
object to make the query. | |
See ``dns.resolver.Resolver.resolve`` for more information on the | |
parameters. | |
""" | |
return get_default_resolver().resolve( | |
qname, | |
rdtype, | |
rdclass, | |
tcp, | |
source, | |
raise_on_no_answer, | |
source_port, | |
lifetime, | |
search, | |
) | |
def query( | |
qname: Union[dns.name.Name, str], | |
rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
tcp: bool = False, | |
source: Optional[str] = None, | |
raise_on_no_answer: bool = True, | |
source_port: int = 0, | |
lifetime: Optional[float] = None, | |
) -> Answer: # pragma: no cover | |
"""Query nameservers to find the answer to the question. | |
This method calls resolve() with ``search=True``, and is | |
provided for backwards compatibility with prior versions of | |
dnspython. See the documentation for the resolve() method for | |
further details. | |
""" | |
warnings.warn( | |
"please use dns.resolver.resolve() instead", DeprecationWarning, stacklevel=2 | |
) | |
return resolve( | |
qname, | |
rdtype, | |
rdclass, | |
tcp, | |
source, | |
raise_on_no_answer, | |
source_port, | |
lifetime, | |
True, | |
) | |
def resolve_address(ipaddr: str, *args: Any, **kwargs: Any) -> Answer: | |
"""Use a resolver to run a reverse query for PTR records. | |
See ``dns.resolver.Resolver.resolve_address`` for more information on the | |
parameters. | |
""" | |
return get_default_resolver().resolve_address(ipaddr, *args, **kwargs) | |
def resolve_name( | |
name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any | |
) -> HostAnswers: | |
"""Use a resolver to query for address records. | |
See ``dns.resolver.Resolver.resolve_name`` for more information on the | |
parameters. | |
""" | |
return get_default_resolver().resolve_name(name, family, **kwargs) | |
def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: | |
"""Determine the canonical name of *name*. | |
See ``dns.resolver.Resolver.canonical_name`` for more information on the | |
parameters and possible exceptions. | |
""" | |
return get_default_resolver().canonical_name(name) | |
def try_ddr(lifetime: float = 5.0) -> None: | |
"""Try to update the default resolver's nameservers using Discovery of Designated | |
Resolvers (DDR). If successful, the resolver will subsequently use | |
DNS-over-HTTPS or DNS-over-TLS for future queries. | |
See :py:func:`dns.resolver.Resolver.try_ddr` for more information. | |
""" | |
return get_default_resolver().try_ddr(lifetime) | |
def zone_for_name( | |
name: Union[dns.name.Name, str], | |
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, | |
tcp: bool = False, | |
resolver: Optional[Resolver] = None, | |
lifetime: Optional[float] = None, | |
) -> dns.name.Name: | |
"""Find the name of the zone which contains the specified name. | |
*name*, an absolute ``dns.name.Name`` or ``str``, the query name. | |
*rdclass*, an ``int``, the query class. | |
*tcp*, a ``bool``. If ``True``, use TCP to make the query. | |
*resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. | |
If ``None``, the default, then the default resolver is used. | |
*lifetime*, a ``float``, the total time to allow for the queries needed | |
to determine the zone. If ``None``, the default, then only the individual | |
query limits of the resolver apply. | |
Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS | |
root. (This is only likely to happen if you're using non-default | |
root servers in your network and they are misconfigured.) | |
Raises ``dns.resolver.LifetimeTimeout`` if the answer could not be | |
found in the allotted lifetime. | |
Returns a ``dns.name.Name``. | |
""" | |
if isinstance(name, str): | |
name = dns.name.from_text(name, dns.name.root) | |
if resolver is None: | |
resolver = get_default_resolver() | |
if not name.is_absolute(): | |
raise NotAbsolute(name) | |
start = time.time() | |
expiration: Optional[float] | |
if lifetime is not None: | |
expiration = start + lifetime | |
else: | |
expiration = None | |
while 1: | |
try: | |
rlifetime: Optional[float] | |
if expiration is not None: | |
rlifetime = expiration - time.time() | |
if rlifetime <= 0: | |
rlifetime = 0 | |
else: | |
rlifetime = None | |
answer = resolver.resolve( | |
name, dns.rdatatype.SOA, rdclass, tcp, lifetime=rlifetime | |
) | |
assert answer.rrset is not None | |
if answer.rrset.name == name: | |
return name | |
# otherwise we were CNAMEd or DNAMEd and need to look higher | |
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e: | |
if isinstance(e, dns.resolver.NXDOMAIN): | |
response = e.responses().get(name) | |
else: | |
response = e.response() # pylint: disable=no-value-for-parameter | |
if response: | |
for rrs in response.authority: | |
if rrs.rdtype == dns.rdatatype.SOA and rrs.rdclass == rdclass: | |
(nr, _, _) = rrs.name.fullcompare(name) | |
if nr == dns.name.NAMERELN_SUPERDOMAIN: | |
# We're doing a proper superdomain check as | |
# if the name were equal we ought to have gotten | |
# it in the answer section! We are ignoring the | |
# possibility that the authority is insane and | |
# is including multiple SOA RRs for different | |
# authorities. | |
return rrs.name | |
# we couldn't extract anything useful from the response (e.g. it's | |
# a type 3 NXDOMAIN) | |
try: | |
name = name.parent() | |
except dns.name.NoParent: | |
raise NoRootSOA | |
def make_resolver_at( | |
where: Union[dns.name.Name, str], | |
port: int = 53, | |
family: int = socket.AF_UNSPEC, | |
resolver: Optional[Resolver] = None, | |
) -> Resolver: | |
"""Make a stub resolver using the specified destination as the full resolver. | |
*where*, a ``dns.name.Name`` or ``str`` the domain name or IP address of the | |
full resolver. | |
*port*, an ``int``, the port to use. If not specified, the default is 53. | |
*family*, an ``int``, the address family to use. This parameter is used if | |
*where* is not an address. The default is ``socket.AF_UNSPEC`` in which case | |
the first address returned by ``resolve_name()`` will be used, otherwise the | |
first address of the specified family will be used. | |
*resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use for | |
resolution of hostnames. If not specified, the default resolver will be used. | |
Returns a ``dns.resolver.Resolver`` or raises an exception. | |
""" | |
if resolver is None: | |
resolver = get_default_resolver() | |
nameservers: List[Union[str, dns.nameserver.Nameserver]] = [] | |
if isinstance(where, str) and dns.inet.is_address(where): | |
nameservers.append(dns.nameserver.Do53Nameserver(where, port)) | |
else: | |
for address in resolver.resolve_name(where, family).addresses(): | |
nameservers.append(dns.nameserver.Do53Nameserver(address, port)) | |
res = dns.resolver.Resolver(configure=False) | |
res.nameservers = nameservers | |
return res | |
def resolve_at( | |
where: Union[dns.name.Name, str], | |
qname: Union[dns.name.Name, str], | |
rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, | |
rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, | |
tcp: bool = False, | |
source: Optional[str] = None, | |
raise_on_no_answer: bool = True, | |
source_port: int = 0, | |
lifetime: Optional[float] = None, | |
search: Optional[bool] = None, | |
port: int = 53, | |
family: int = socket.AF_UNSPEC, | |
resolver: Optional[Resolver] = None, | |
) -> Answer: | |
"""Query nameservers to find the answer to the question. | |
This is a convenience function that calls ``dns.resolver.make_resolver_at()`` to | |
make a resolver, and then uses it to resolve the query. | |
See ``dns.resolver.Resolver.resolve`` for more information on the resolution | |
parameters, and ``dns.resolver.make_resolver_at`` for information about the resolver | |
parameters *where*, *port*, *family*, and *resolver*. | |
If making more than one query, it is more efficient to call | |
``dns.resolver.make_resolver_at()`` and then use that resolver for the queries | |
instead of calling ``resolve_at()`` multiple times. | |
""" | |
return make_resolver_at(where, port, family, resolver).resolve( | |
qname, | |
rdtype, | |
rdclass, | |
tcp, | |
source, | |
raise_on_no_answer, | |
source_port, | |
lifetime, | |
search, | |
) | |
# | |
# Support for overriding the system resolver for all python code in the | |
# running process. | |
# | |
_protocols_for_socktype = { | |
socket.SOCK_DGRAM: [socket.SOL_UDP], | |
socket.SOCK_STREAM: [socket.SOL_TCP], | |
} | |
_resolver = None | |
_original_getaddrinfo = socket.getaddrinfo | |
_original_getnameinfo = socket.getnameinfo | |
_original_getfqdn = socket.getfqdn | |
_original_gethostbyname = socket.gethostbyname | |
_original_gethostbyname_ex = socket.gethostbyname_ex | |
_original_gethostbyaddr = socket.gethostbyaddr | |
def _getaddrinfo( | |
host=None, service=None, family=socket.AF_UNSPEC, socktype=0, proto=0, flags=0 | |
): | |
if flags & socket.AI_NUMERICHOST != 0: | |
# Short circuit directly into the system's getaddrinfo(). We're | |
# not adding any value in this case, and this avoids infinite loops | |
# because dns.query.* needs to call getaddrinfo() for IPv6 scoping | |
# reasons. We will also do this short circuit below if we | |
# discover that the host is an address literal. | |
return _original_getaddrinfo(host, service, family, socktype, proto, flags) | |
if flags & (socket.AI_ADDRCONFIG | socket.AI_V4MAPPED) != 0: | |
# Not implemented. We raise a gaierror as opposed to a | |
# NotImplementedError as it helps callers handle errors more | |
# appropriately. [Issue #316] | |
# | |
# We raise EAI_FAIL as opposed to EAI_SYSTEM because there is | |
# no EAI_SYSTEM on Windows [Issue #416]. We didn't go for | |
# EAI_BADFLAGS as the flags aren't bad, we just don't | |
# implement them. | |
raise socket.gaierror( | |
socket.EAI_FAIL, "Non-recoverable failure in name resolution" | |
) | |
if host is None and service is None: | |
raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
addrs = [] | |
canonical_name = None # pylint: disable=redefined-outer-name | |
# Is host None or an address literal? If so, use the system's | |
# getaddrinfo(). | |
if host is None: | |
return _original_getaddrinfo(host, service, family, socktype, proto, flags) | |
try: | |
# We don't care about the result of af_for_address(), we're just | |
# calling it so it raises an exception if host is not an IPv4 or | |
# IPv6 address. | |
dns.inet.af_for_address(host) | |
return _original_getaddrinfo(host, service, family, socktype, proto, flags) | |
except Exception: | |
pass | |
# Something needs resolution! | |
try: | |
answers = _resolver.resolve_name(host, family) | |
addrs = answers.addresses_and_families() | |
canonical_name = answers.canonical_name().to_text(True) | |
except dns.resolver.NXDOMAIN: | |
raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
except Exception: | |
# We raise EAI_AGAIN here as the failure may be temporary | |
# (e.g. a timeout) and EAI_SYSTEM isn't defined on Windows. | |
# [Issue #416] | |
raise socket.gaierror(socket.EAI_AGAIN, "Temporary failure in name resolution") | |
port = None | |
try: | |
# Is it a port literal? | |
if service is None: | |
port = 0 | |
else: | |
port = int(service) | |
except Exception: | |
if flags & socket.AI_NUMERICSERV == 0: | |
try: | |
port = socket.getservbyname(service) | |
except Exception: | |
pass | |
if port is None: | |
raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
tuples = [] | |
if socktype == 0: | |
socktypes = [socket.SOCK_DGRAM, socket.SOCK_STREAM] | |
else: | |
socktypes = [socktype] | |
if flags & socket.AI_CANONNAME != 0: | |
cname = canonical_name | |
else: | |
cname = "" | |
for addr, af in addrs: | |
for socktype in socktypes: | |
for proto in _protocols_for_socktype[socktype]: | |
addr_tuple = dns.inet.low_level_address_tuple((addr, port), af) | |
tuples.append((af, socktype, proto, cname, addr_tuple)) | |
if len(tuples) == 0: | |
raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
return tuples | |
def _getnameinfo(sockaddr, flags=0): | |
host = sockaddr[0] | |
port = sockaddr[1] | |
if len(sockaddr) == 4: | |
scope = sockaddr[3] | |
family = socket.AF_INET6 | |
else: | |
scope = None | |
family = socket.AF_INET | |
tuples = _getaddrinfo(host, port, family, socket.SOCK_STREAM, socket.SOL_TCP, 0) | |
if len(tuples) > 1: | |
raise socket.error("sockaddr resolved to multiple addresses") | |
addr = tuples[0][4][0] | |
if flags & socket.NI_DGRAM: | |
pname = "udp" | |
else: | |
pname = "tcp" | |
qname = dns.reversename.from_address(addr) | |
if flags & socket.NI_NUMERICHOST == 0: | |
try: | |
answer = _resolver.resolve(qname, "PTR") | |
hostname = answer.rrset[0].target.to_text(True) | |
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): | |
if flags & socket.NI_NAMEREQD: | |
raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
hostname = addr | |
if scope is not None: | |
hostname += "%" + str(scope) | |
else: | |
hostname = addr | |
if scope is not None: | |
hostname += "%" + str(scope) | |
if flags & socket.NI_NUMERICSERV: | |
service = str(port) | |
else: | |
service = socket.getservbyport(port, pname) | |
return (hostname, service) | |
def _getfqdn(name=None): | |
if name is None: | |
name = socket.gethostname() | |
try: | |
(name, _, _) = _gethostbyaddr(name) | |
# Python's version checks aliases too, but our gethostbyname | |
# ignores them, so we do so here as well. | |
except Exception: | |
pass | |
return name | |
def _gethostbyname(name): | |
return _gethostbyname_ex(name)[2][0] | |
def _gethostbyname_ex(name): | |
aliases = [] | |
addresses = [] | |
tuples = _getaddrinfo( | |
name, 0, socket.AF_INET, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME | |
) | |
canonical = tuples[0][3] | |
for item in tuples: | |
addresses.append(item[4][0]) | |
# XXX we just ignore aliases | |
return (canonical, aliases, addresses) | |
def _gethostbyaddr(ip): | |
try: | |
dns.ipv6.inet_aton(ip) | |
sockaddr = (ip, 80, 0, 0) | |
family = socket.AF_INET6 | |
except Exception: | |
try: | |
dns.ipv4.inet_aton(ip) | |
except Exception: | |
raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") | |
sockaddr = (ip, 80) | |
family = socket.AF_INET | |
(name, _) = _getnameinfo(sockaddr, socket.NI_NAMEREQD) | |
aliases = [] | |
addresses = [] | |
tuples = _getaddrinfo( | |
name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME | |
) | |
canonical = tuples[0][3] | |
# We only want to include an address from the tuples if it's the | |
# same as the one we asked about. We do this comparison in binary | |
# to avoid any differences in text representations. | |
bin_ip = dns.inet.inet_pton(family, ip) | |
for item in tuples: | |
addr = item[4][0] | |
bin_addr = dns.inet.inet_pton(family, addr) | |
if bin_ip == bin_addr: | |
addresses.append(addr) | |
# XXX we just ignore aliases | |
return (canonical, aliases, addresses) | |
def override_system_resolver(resolver: Optional[Resolver] = None) -> None: | |
"""Override the system resolver routines in the socket module with | |
versions which use dnspython's resolver. | |
This can be useful in testing situations where you want to control | |
the resolution behavior of python code without having to change | |
the system's resolver settings (e.g. /etc/resolv.conf). | |
The resolver to use may be specified; if it's not, the default | |
resolver will be used. | |
resolver, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. | |
""" | |
if resolver is None: | |
resolver = get_default_resolver() | |
global _resolver | |
_resolver = resolver | |
socket.getaddrinfo = _getaddrinfo | |
socket.getnameinfo = _getnameinfo | |
socket.getfqdn = _getfqdn | |
socket.gethostbyname = _gethostbyname | |
socket.gethostbyname_ex = _gethostbyname_ex | |
socket.gethostbyaddr = _gethostbyaddr | |
def restore_system_resolver() -> None: | |
"""Undo the effects of prior override_system_resolver().""" | |
global _resolver | |
_resolver = None | |
socket.getaddrinfo = _original_getaddrinfo | |
socket.getnameinfo = _original_getnameinfo | |
socket.getfqdn = _original_getfqdn | |
socket.gethostbyname = _original_gethostbyname | |
socket.gethostbyname_ex = _original_gethostbyname_ex | |
socket.gethostbyaddr = _original_gethostbyaddr | |