| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | """DNS stub resolver.""" |
| |
|
| | import contextlib |
| | import random |
| | import socket |
| | import sys |
| | import threading |
| | import time |
| | import warnings |
| | from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union |
| | from urllib.parse import urlparse |
| |
|
| | import dns._ddr |
| | import dns.edns |
| | import dns.exception |
| | import dns.flags |
| | import dns.inet |
| | import dns.ipv4 |
| | import dns.ipv6 |
| | import dns.message |
| | import dns.name |
| | import dns.nameserver |
| | import dns.query |
| | import dns.rcode |
| | import dns.rdataclass |
| | import dns.rdatatype |
| | import dns.rdtypes.svcbbase |
| | import dns.reversename |
| | import dns.tsig |
| |
|
| | if sys.platform == "win32": |
| | import dns.win32util |
| |
|
| |
|
| | class NXDOMAIN(dns.exception.DNSException): |
| | """The DNS query name does not exist.""" |
| |
|
| | supp_kwargs = {"qnames", "responses"} |
| | fmt = None |
| |
|
| | |
| |
|
| | |
| | |
| | def __init__(self, *args, **kwargs): |
| | super().__init__(*args, **kwargs) |
| |
|
| | def _check_kwargs(self, qnames, responses=None): |
| | if not isinstance(qnames, (list, tuple, set)): |
| | raise AttributeError("qnames must be a list, tuple or set") |
| | if len(qnames) == 0: |
| | raise AttributeError("qnames must contain at least one element") |
| | if responses is None: |
| | responses = {} |
| | elif not isinstance(responses, dict): |
| | raise AttributeError("responses must be a dict(qname=response)") |
| | kwargs = dict(qnames=qnames, responses=responses) |
| | return kwargs |
| |
|
| | def __str__(self) -> str: |
| | if "qnames" not in self.kwargs: |
| | return super().__str__() |
| | qnames = self.kwargs["qnames"] |
| | if len(qnames) > 1: |
| | msg = "None of DNS query names exist" |
| | else: |
| | msg = "The DNS query name does not exist" |
| | qnames = ", ".join(map(str, qnames)) |
| | return "{}: {}".format(msg, qnames) |
| |
|
| | @property |
| | def canonical_name(self): |
| | """Return the unresolved canonical name.""" |
| | if "qnames" not in self.kwargs: |
| | raise TypeError("parametrized exception required") |
| | for qname in self.kwargs["qnames"]: |
| | response = self.kwargs["responses"][qname] |
| | try: |
| | cname = response.canonical_name() |
| | if cname != qname: |
| | return cname |
| | except Exception: |
| | |
| | |
| | pass |
| | return self.kwargs["qnames"][0] |
| |
|
| | def __add__(self, e_nx): |
| | """Augment by results from another NXDOMAIN exception.""" |
| | qnames0 = list(self.kwargs.get("qnames", [])) |
| | responses0 = dict(self.kwargs.get("responses", {})) |
| | responses1 = e_nx.kwargs.get("responses", {}) |
| | for qname1 in e_nx.kwargs.get("qnames", []): |
| | if qname1 not in qnames0: |
| | qnames0.append(qname1) |
| | if qname1 in responses1: |
| | responses0[qname1] = responses1[qname1] |
| | return NXDOMAIN(qnames=qnames0, responses=responses0) |
| |
|
| | def qnames(self): |
| | """All of the names that were tried. |
| | |
| | Returns a list of ``dns.name.Name``. |
| | """ |
| | return self.kwargs["qnames"] |
| |
|
| | def responses(self): |
| | """A map from queried names to their NXDOMAIN responses. |
| | |
| | Returns a dict mapping a ``dns.name.Name`` to a |
| | ``dns.message.Message``. |
| | """ |
| | return self.kwargs["responses"] |
| |
|
| | def response(self, qname): |
| | """The response for query *qname*. |
| | |
| | Returns a ``dns.message.Message``. |
| | """ |
| | return self.kwargs["responses"][qname] |
| |
|
| |
|
| | class YXDOMAIN(dns.exception.DNSException): |
| | """The DNS query name is too long after DNAME substitution.""" |
| |
|
| |
|
| | ErrorTuple = Tuple[ |
| | Optional[str], |
| | bool, |
| | int, |
| | Union[Exception, str], |
| | Optional[dns.message.Message], |
| | ] |
| |
|
| |
|
| | def _errors_to_text(errors: List[ErrorTuple]) -> List[str]: |
| | """Turn a resolution errors trace into a list of text.""" |
| | texts = [] |
| | for err in errors: |
| | texts.append("Server {} answered {}".format(err[0], err[3])) |
| | return texts |
| |
|
| |
|
| | class LifetimeTimeout(dns.exception.Timeout): |
| | """The resolution lifetime expired.""" |
| |
|
| | msg = "The resolution lifetime expired." |
| | fmt = "%s after {timeout:.3f} seconds: {errors}" % msg[:-1] |
| | supp_kwargs = {"timeout", "errors"} |
| |
|
| | |
| | |
| | def __init__(self, *args, **kwargs): |
| | super().__init__(*args, **kwargs) |
| |
|
| | def _fmt_kwargs(self, **kwargs): |
| | srv_msgs = _errors_to_text(kwargs["errors"]) |
| | return super()._fmt_kwargs( |
| | timeout=kwargs["timeout"], errors="; ".join(srv_msgs) |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| | Timeout = LifetimeTimeout |
| |
|
| |
|
| | class NoAnswer(dns.exception.DNSException): |
| | """The DNS response does not contain an answer to the question.""" |
| |
|
| | fmt = "The DNS response does not contain an answer to the question: {query}" |
| | supp_kwargs = {"response"} |
| |
|
| | |
| | |
| | def __init__(self, *args, **kwargs): |
| | super().__init__(*args, **kwargs) |
| |
|
| | def _fmt_kwargs(self, **kwargs): |
| | return super()._fmt_kwargs(query=kwargs["response"].question) |
| |
|
| | def response(self): |
| | return self.kwargs["response"] |
| |
|
| |
|
| | class NoNameservers(dns.exception.DNSException): |
| | """All nameservers failed to answer the query. |
| | |
| | errors: list of servers and respective errors |
| | The type of errors is |
| | [(server IP address, any object convertible to string)]. |
| | Non-empty errors list will add explanatory message () |
| | """ |
| |
|
| | msg = "All nameservers failed to answer the query." |
| | fmt = "%s {query}: {errors}" % msg[:-1] |
| | supp_kwargs = {"request", "errors"} |
| |
|
| | |
| | |
| | def __init__(self, *args, **kwargs): |
| | super().__init__(*args, **kwargs) |
| |
|
| | def _fmt_kwargs(self, **kwargs): |
| | srv_msgs = _errors_to_text(kwargs["errors"]) |
| | return super()._fmt_kwargs( |
| | query=kwargs["request"].question, errors="; ".join(srv_msgs) |
| | ) |
| |
|
| |
|
| | class NotAbsolute(dns.exception.DNSException): |
| | """An absolute domain name is required but a relative name was provided.""" |
| |
|
| |
|
| | class NoRootSOA(dns.exception.DNSException): |
| | """There is no SOA RR at the DNS root name. This should never happen!""" |
| |
|
| |
|
| | class NoMetaqueries(dns.exception.DNSException): |
| | """DNS metaqueries are not allowed.""" |
| |
|
| |
|
| | class NoResolverConfiguration(dns.exception.DNSException): |
| | """Resolver configuration could not be read or specified no nameservers.""" |
| |
|
| |
|
| | class Answer: |
| | """DNS stub resolver answer. |
| | |
| | Instances of this class bundle up the result of a successful DNS |
| | resolution. |
| | |
| | For convenience, the answer object implements much of the sequence |
| | protocol, forwarding to its ``rrset`` attribute. E.g. |
| | ``for a in answer`` is equivalent to ``for a in answer.rrset``. |
| | ``answer[i]`` is equivalent to ``answer.rrset[i]``, and |
| | ``answer[i:j]`` is equivalent to ``answer.rrset[i:j]``. |
| | |
| | Note that CNAMEs or DNAMEs in the response may mean that answer |
| | RRset's name might not be the query name. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | qname: dns.name.Name, |
| | rdtype: dns.rdatatype.RdataType, |
| | rdclass: dns.rdataclass.RdataClass, |
| | response: dns.message.QueryMessage, |
| | nameserver: Optional[str] = None, |
| | port: Optional[int] = None, |
| | ) -> None: |
| | self.qname = qname |
| | self.rdtype = rdtype |
| | self.rdclass = rdclass |
| | self.response = response |
| | self.nameserver = nameserver |
| | self.port = port |
| | self.chaining_result = response.resolve_chaining() |
| | |
| | |
| | self.canonical_name = self.chaining_result.canonical_name |
| | self.rrset = self.chaining_result.answer |
| | self.expiration = time.time() + self.chaining_result.minimum_ttl |
| |
|
| | def __getattr__(self, attr): |
| | if attr == "name": |
| | return self.rrset.name |
| | elif attr == "ttl": |
| | return self.rrset.ttl |
| | elif attr == "covers": |
| | return self.rrset.covers |
| | elif attr == "rdclass": |
| | return self.rrset.rdclass |
| | elif attr == "rdtype": |
| | return self.rrset.rdtype |
| | else: |
| | raise AttributeError(attr) |
| |
|
| | def __len__(self) -> int: |
| | return self.rrset and len(self.rrset) or 0 |
| |
|
| | def __iter__(self): |
| | return self.rrset and iter(self.rrset) or iter(tuple()) |
| |
|
| | def __getitem__(self, i): |
| | if self.rrset is None: |
| | raise IndexError |
| | return self.rrset[i] |
| |
|
| | def __delitem__(self, i): |
| | if self.rrset is None: |
| | raise IndexError |
| | del self.rrset[i] |
| |
|
| |
|
| | class Answers(dict): |
| | """A dict of DNS stub resolver answers, indexed by type.""" |
| |
|
| |
|
| | class HostAnswers(Answers): |
| | """A dict of DNS stub resolver answers to a host name lookup, indexed by |
| | type. |
| | """ |
| |
|
| | @classmethod |
| | def make( |
| | cls, |
| | v6: Optional[Answer] = None, |
| | v4: Optional[Answer] = None, |
| | add_empty: bool = True, |
| | ) -> "HostAnswers": |
| | answers = HostAnswers() |
| | if v6 is not None and (add_empty or v6.rrset): |
| | answers[dns.rdatatype.AAAA] = v6 |
| | if v4 is not None and (add_empty or v4.rrset): |
| | answers[dns.rdatatype.A] = v4 |
| | return answers |
| |
|
| | |
| | |
| | def addresses_and_families( |
| | self, family: int = socket.AF_UNSPEC |
| | ) -> Iterator[Tuple[str, int]]: |
| | if family == socket.AF_UNSPEC: |
| | yield from self.addresses_and_families(socket.AF_INET6) |
| | yield from self.addresses_and_families(socket.AF_INET) |
| | return |
| | elif family == socket.AF_INET6: |
| | answer = self.get(dns.rdatatype.AAAA) |
| | elif family == socket.AF_INET: |
| | answer = self.get(dns.rdatatype.A) |
| | else: |
| | raise NotImplementedError(f"unknown address family {family}") |
| | if answer: |
| | for rdata in answer: |
| | yield (rdata.address, family) |
| |
|
| | |
| | |
| | def addresses(self, family: int = socket.AF_UNSPEC) -> Iterator[str]: |
| | return (pair[0] for pair in self.addresses_and_families(family)) |
| |
|
| | |
| | def canonical_name(self) -> dns.name.Name: |
| | answer = self.get(dns.rdatatype.AAAA, self.get(dns.rdatatype.A)) |
| | return answer.canonical_name |
| |
|
| |
|
| | class CacheStatistics: |
| | """Cache Statistics""" |
| |
|
| | def __init__(self, hits: int = 0, misses: int = 0) -> None: |
| | self.hits = hits |
| | self.misses = misses |
| |
|
| | def reset(self) -> None: |
| | self.hits = 0 |
| | self.misses = 0 |
| |
|
| | def clone(self) -> "CacheStatistics": |
| | return CacheStatistics(self.hits, self.misses) |
| |
|
| |
|
| | class CacheBase: |
| | def __init__(self) -> None: |
| | self.lock = threading.Lock() |
| | self.statistics = CacheStatistics() |
| |
|
| | def reset_statistics(self) -> None: |
| | """Reset all statistics to zero.""" |
| | with self.lock: |
| | self.statistics.reset() |
| |
|
| | def hits(self) -> int: |
| | """How many hits has the cache had?""" |
| | with self.lock: |
| | return self.statistics.hits |
| |
|
| | def misses(self) -> int: |
| | """How many misses has the cache had?""" |
| | with self.lock: |
| | return self.statistics.misses |
| |
|
| | def get_statistics_snapshot(self) -> CacheStatistics: |
| | """Return a consistent snapshot of all the statistics. |
| | |
| | If running with multiple threads, it's better to take a |
| | snapshot than to call statistics methods such as hits() and |
| | misses() individually. |
| | """ |
| | with self.lock: |
| | return self.statistics.clone() |
| |
|
| |
|
| | CacheKey = Tuple[dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass] |
| |
|
| |
|
| | class Cache(CacheBase): |
| | """Simple thread-safe DNS answer cache.""" |
| |
|
| | def __init__(self, cleaning_interval: float = 300.0) -> None: |
| | """*cleaning_interval*, a ``float`` is the number of seconds between |
| | periodic cleanings. |
| | """ |
| |
|
| | super().__init__() |
| | self.data: Dict[CacheKey, Answer] = {} |
| | self.cleaning_interval = cleaning_interval |
| | self.next_cleaning: float = time.time() + self.cleaning_interval |
| |
|
| | def _maybe_clean(self) -> None: |
| | """Clean the cache if it's time to do so.""" |
| |
|
| | now = time.time() |
| | if self.next_cleaning <= now: |
| | keys_to_delete = [] |
| | for k, v in self.data.items(): |
| | if v.expiration <= now: |
| | keys_to_delete.append(k) |
| | for k in keys_to_delete: |
| | del self.data[k] |
| | now = time.time() |
| | self.next_cleaning = now + self.cleaning_interval |
| |
|
| | def get(self, key: CacheKey) -> Optional[Answer]: |
| | """Get the answer associated with *key*. |
| | |
| | Returns None if no answer is cached for the key. |
| | |
| | *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| | tuple whose values are the query name, rdtype, and rdclass respectively. |
| | |
| | Returns a ``dns.resolver.Answer`` or ``None``. |
| | """ |
| |
|
| | with self.lock: |
| | self._maybe_clean() |
| | v = self.data.get(key) |
| | if v is None or v.expiration <= time.time(): |
| | self.statistics.misses += 1 |
| | return None |
| | self.statistics.hits += 1 |
| | return v |
| |
|
| | def put(self, key: CacheKey, value: Answer) -> None: |
| | """Associate key and value in the cache. |
| | |
| | *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| | tuple whose values are the query name, rdtype, and rdclass respectively. |
| | |
| | *value*, a ``dns.resolver.Answer``, the answer. |
| | """ |
| |
|
| | with self.lock: |
| | self._maybe_clean() |
| | self.data[key] = value |
| |
|
| | def flush(self, key: Optional[CacheKey] = None) -> None: |
| | """Flush the cache. |
| | |
| | If *key* is not ``None``, only that item is flushed. Otherwise the entire cache |
| | is flushed. |
| | |
| | *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| | tuple whose values are the query name, rdtype, and rdclass respectively. |
| | """ |
| |
|
| | with self.lock: |
| | if key is not None: |
| | if key in self.data: |
| | del self.data[key] |
| | else: |
| | self.data = {} |
| | self.next_cleaning = time.time() + self.cleaning_interval |
| |
|
| |
|
| | class LRUCacheNode: |
| | """LRUCache node.""" |
| |
|
| | def __init__(self, key, value): |
| | self.key = key |
| | self.value = value |
| | self.hits = 0 |
| | self.prev = self |
| | self.next = self |
| |
|
| | def link_after(self, node: "LRUCacheNode") -> None: |
| | self.prev = node |
| | self.next = node.next |
| | node.next.prev = self |
| | node.next = self |
| |
|
| | def unlink(self) -> None: |
| | self.next.prev = self.prev |
| | self.prev.next = self.next |
| |
|
| |
|
| | class LRUCache(CacheBase): |
| | """Thread-safe, bounded, least-recently-used DNS answer cache. |
| | |
| | This cache is better than the simple cache (above) if you're |
| | running a web crawler or other process that does a lot of |
| | resolutions. The LRUCache has a maximum number of nodes, and when |
| | it is full, the least-recently used node is removed to make space |
| | for a new one. |
| | """ |
| |
|
| | def __init__(self, max_size: int = 100000) -> None: |
| | """*max_size*, an ``int``, is the maximum number of nodes to cache; |
| | it must be greater than 0. |
| | """ |
| |
|
| | super().__init__() |
| | self.data: Dict[CacheKey, LRUCacheNode] = {} |
| | self.set_max_size(max_size) |
| | self.sentinel: LRUCacheNode = LRUCacheNode(None, None) |
| | self.sentinel.prev = self.sentinel |
| | self.sentinel.next = self.sentinel |
| |
|
| | def set_max_size(self, max_size: int) -> None: |
| | if max_size < 1: |
| | max_size = 1 |
| | self.max_size = max_size |
| |
|
| | def get(self, key: CacheKey) -> Optional[Answer]: |
| | """Get the answer associated with *key*. |
| | |
| | Returns None if no answer is cached for the key. |
| | |
| | *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| | tuple whose values are the query name, rdtype, and rdclass respectively. |
| | |
| | Returns a ``dns.resolver.Answer`` or ``None``. |
| | """ |
| |
|
| | with self.lock: |
| | node = self.data.get(key) |
| | if node is None: |
| | self.statistics.misses += 1 |
| | return None |
| | |
| | |
| | node.unlink() |
| | if node.value.expiration <= time.time(): |
| | del self.data[node.key] |
| | self.statistics.misses += 1 |
| | return None |
| | node.link_after(self.sentinel) |
| | self.statistics.hits += 1 |
| | node.hits += 1 |
| | return node.value |
| |
|
| | def get_hits_for_key(self, key: CacheKey) -> int: |
| | """Return the number of cache hits associated with the specified key.""" |
| | with self.lock: |
| | node = self.data.get(key) |
| | if node is None or node.value.expiration <= time.time(): |
| | return 0 |
| | else: |
| | return node.hits |
| |
|
| | def put(self, key: CacheKey, value: Answer) -> None: |
| | """Associate key and value in the cache. |
| | |
| | *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| | tuple whose values are the query name, rdtype, and rdclass respectively. |
| | |
| | *value*, a ``dns.resolver.Answer``, the answer. |
| | """ |
| |
|
| | with self.lock: |
| | node = self.data.get(key) |
| | if node is not None: |
| | node.unlink() |
| | del self.data[node.key] |
| | while len(self.data) >= self.max_size: |
| | gnode = self.sentinel.prev |
| | gnode.unlink() |
| | del self.data[gnode.key] |
| | node = LRUCacheNode(key, value) |
| | node.link_after(self.sentinel) |
| | self.data[key] = node |
| |
|
| | def flush(self, key: Optional[CacheKey] = None) -> None: |
| | """Flush the cache. |
| | |
| | If *key* is not ``None``, only that item is flushed. Otherwise the entire cache |
| | is flushed. |
| | |
| | *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| | tuple whose values are the query name, rdtype, and rdclass respectively. |
| | """ |
| |
|
| | with self.lock: |
| | if key is not None: |
| | node = self.data.get(key) |
| | if node is not None: |
| | node.unlink() |
| | del self.data[node.key] |
| | else: |
| | gnode = self.sentinel.next |
| | while gnode != self.sentinel: |
| | next = gnode.next |
| | gnode.unlink() |
| | gnode = next |
| | self.data = {} |
| |
|
| |
|
| | class _Resolution: |
| | """Helper class for dns.resolver.Resolver.resolve(). |
| | |
| | All of the "business logic" of resolution is encapsulated in this |
| | class, allowing us to have multiple resolve() implementations |
| | using different I/O schemes without copying all of the |
| | complicated logic. |
| | |
| | This class is a "friend" to dns.resolver.Resolver and manipulates |
| | resolver data structures directly. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | resolver: "BaseResolver", |
| | qname: Union[dns.name.Name, str], |
| | rdtype: Union[dns.rdatatype.RdataType, str], |
| | rdclass: Union[dns.rdataclass.RdataClass, str], |
| | tcp: bool, |
| | raise_on_no_answer: bool, |
| | search: Optional[bool], |
| | ) -> None: |
| | if isinstance(qname, str): |
| | qname = dns.name.from_text(qname, None) |
| | rdtype = dns.rdatatype.RdataType.make(rdtype) |
| | if dns.rdatatype.is_metatype(rdtype): |
| | raise NoMetaqueries |
| | rdclass = dns.rdataclass.RdataClass.make(rdclass) |
| | if dns.rdataclass.is_metaclass(rdclass): |
| | raise NoMetaqueries |
| | self.resolver = resolver |
| | self.qnames_to_try = resolver._get_qnames_to_try(qname, search) |
| | self.qnames = self.qnames_to_try[:] |
| | self.rdtype = rdtype |
| | self.rdclass = rdclass |
| | self.tcp = tcp |
| | self.raise_on_no_answer = raise_on_no_answer |
| | self.nxdomain_responses: Dict[dns.name.Name, dns.message.QueryMessage] = {} |
| | |
| | self.qname = dns.name.empty |
| | self.nameservers: List[dns.nameserver.Nameserver] = [] |
| | self.current_nameservers: List[dns.nameserver.Nameserver] = [] |
| | self.errors: List[ErrorTuple] = [] |
| | self.nameserver: Optional[dns.nameserver.Nameserver] = None |
| | self.tcp_attempt = False |
| | self.retry_with_tcp = False |
| | self.request: Optional[dns.message.QueryMessage] = None |
| | self.backoff = 0.0 |
| |
|
| | def next_request( |
| | self, |
| | ) -> Tuple[Optional[dns.message.QueryMessage], Optional[Answer]]: |
| | """Get the next request to send, and check the cache. |
| | |
| | Returns a (request, answer) tuple. At most one of request or |
| | answer will not be None. |
| | """ |
| |
|
| | |
| | |
| |
|
| | while len(self.qnames) > 0: |
| | self.qname = self.qnames.pop(0) |
| |
|
| | |
| | if self.resolver.cache: |
| | answer = self.resolver.cache.get( |
| | (self.qname, self.rdtype, self.rdclass) |
| | ) |
| | if answer is not None: |
| | if answer.rrset is None and self.raise_on_no_answer: |
| | raise NoAnswer(response=answer.response) |
| | else: |
| | return (None, answer) |
| | answer = self.resolver.cache.get( |
| | (self.qname, dns.rdatatype.ANY, self.rdclass) |
| | ) |
| | if answer is not None and answer.response.rcode() == dns.rcode.NXDOMAIN: |
| | |
| | |
| | self.nxdomain_responses[self.qname] = answer.response |
| | continue |
| |
|
| | |
| | request = dns.message.make_query(self.qname, self.rdtype, self.rdclass) |
| | if self.resolver.keyname is not None: |
| | request.use_tsig( |
| | self.resolver.keyring, |
| | self.resolver.keyname, |
| | algorithm=self.resolver.keyalgorithm, |
| | ) |
| | request.use_edns( |
| | self.resolver.edns, |
| | self.resolver.ednsflags, |
| | self.resolver.payload, |
| | options=self.resolver.ednsoptions, |
| | ) |
| | if self.resolver.flags is not None: |
| | request.flags = self.resolver.flags |
| |
|
| | self.nameservers = self.resolver._enrich_nameservers( |
| | self.resolver._nameservers, |
| | self.resolver.nameserver_ports, |
| | self.resolver.port, |
| | ) |
| | if self.resolver.rotate: |
| | random.shuffle(self.nameservers) |
| | self.current_nameservers = self.nameservers[:] |
| | self.errors = [] |
| | self.nameserver = None |
| | self.tcp_attempt = False |
| | self.retry_with_tcp = False |
| | self.request = request |
| | self.backoff = 0.10 |
| |
|
| | return (request, None) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | raise NXDOMAIN(qnames=self.qnames_to_try, responses=self.nxdomain_responses) |
| |
|
| | def next_nameserver(self) -> Tuple[dns.nameserver.Nameserver, bool, float]: |
| | if self.retry_with_tcp: |
| | assert self.nameserver is not None |
| | assert not self.nameserver.is_always_max_size() |
| | self.tcp_attempt = True |
| | self.retry_with_tcp = False |
| | return (self.nameserver, True, 0) |
| |
|
| | backoff = 0.0 |
| | if not self.current_nameservers: |
| | if len(self.nameservers) == 0: |
| | |
| | raise NoNameservers(request=self.request, errors=self.errors) |
| | self.current_nameservers = self.nameservers[:] |
| | backoff = self.backoff |
| | self.backoff = min(self.backoff * 2, 2) |
| |
|
| | self.nameserver = self.current_nameservers.pop(0) |
| | self.tcp_attempt = self.tcp or self.nameserver.is_always_max_size() |
| | return (self.nameserver, self.tcp_attempt, backoff) |
| |
|
| | def query_result( |
| | self, response: Optional[dns.message.Message], ex: Optional[Exception] |
| | ) -> Tuple[Optional[Answer], bool]: |
| | |
| | |
| | |
| | assert self.nameserver is not None |
| | if ex: |
| | |
| | assert response is None |
| | self.errors.append( |
| | ( |
| | str(self.nameserver), |
| | self.tcp_attempt, |
| | self.nameserver.answer_port(), |
| | ex, |
| | response, |
| | ) |
| | ) |
| | if ( |
| | isinstance(ex, dns.exception.FormError) |
| | or isinstance(ex, EOFError) |
| | or isinstance(ex, OSError) |
| | or isinstance(ex, NotImplementedError) |
| | ): |
| | |
| | self.nameservers.remove(self.nameserver) |
| | elif isinstance(ex, dns.message.Truncated): |
| | if self.tcp_attempt: |
| | |
| | self.nameservers.remove(self.nameserver) |
| | else: |
| | self.retry_with_tcp = True |
| | return (None, False) |
| | |
| | assert response is not None |
| | assert isinstance(response, dns.message.QueryMessage) |
| | rcode = response.rcode() |
| | if rcode == dns.rcode.NOERROR: |
| | try: |
| | answer = Answer( |
| | self.qname, |
| | self.rdtype, |
| | self.rdclass, |
| | response, |
| | self.nameserver.answer_nameserver(), |
| | self.nameserver.answer_port(), |
| | ) |
| | except Exception as e: |
| | self.errors.append( |
| | ( |
| | str(self.nameserver), |
| | self.tcp_attempt, |
| | self.nameserver.answer_port(), |
| | e, |
| | response, |
| | ) |
| | ) |
| | |
| | self.nameservers.remove(self.nameserver) |
| | return (None, False) |
| | if self.resolver.cache: |
| | self.resolver.cache.put((self.qname, self.rdtype, self.rdclass), answer) |
| | if answer.rrset is None and self.raise_on_no_answer: |
| | raise NoAnswer(response=answer.response) |
| | return (answer, True) |
| | elif rcode == dns.rcode.NXDOMAIN: |
| | |
| | |
| | try: |
| | answer = Answer( |
| | self.qname, dns.rdatatype.ANY, dns.rdataclass.IN, response |
| | ) |
| | except Exception as e: |
| | self.errors.append( |
| | ( |
| | str(self.nameserver), |
| | self.tcp_attempt, |
| | self.nameserver.answer_port(), |
| | e, |
| | response, |
| | ) |
| | ) |
| | |
| | self.nameservers.remove(self.nameserver) |
| | return (None, False) |
| | self.nxdomain_responses[self.qname] = response |
| | if self.resolver.cache: |
| | self.resolver.cache.put( |
| | (self.qname, dns.rdatatype.ANY, self.rdclass), answer |
| | ) |
| | |
| | |
| | return (None, True) |
| | elif rcode == dns.rcode.YXDOMAIN: |
| | yex = YXDOMAIN() |
| | self.errors.append( |
| | ( |
| | str(self.nameserver), |
| | self.tcp_attempt, |
| | self.nameserver.answer_port(), |
| | yex, |
| | response, |
| | ) |
| | ) |
| | raise yex |
| | else: |
| | |
| | |
| | |
| | |
| | if rcode != dns.rcode.SERVFAIL or not self.resolver.retry_servfail: |
| | self.nameservers.remove(self.nameserver) |
| | self.errors.append( |
| | ( |
| | str(self.nameserver), |
| | self.tcp_attempt, |
| | self.nameserver.answer_port(), |
| | dns.rcode.to_text(rcode), |
| | response, |
| | ) |
| | ) |
| | return (None, False) |
| |
|
| |
|
| | class BaseResolver: |
| | """DNS stub resolver.""" |
| |
|
| | |
| | |
| | |
| |
|
| | domain: dns.name.Name |
| | nameserver_ports: Dict[str, int] |
| | port: int |
| | search: List[dns.name.Name] |
| | use_search_by_default: bool |
| | timeout: float |
| | lifetime: float |
| | keyring: Optional[Any] |
| | keyname: Optional[Union[dns.name.Name, str]] |
| | keyalgorithm: Union[dns.name.Name, str] |
| | edns: int |
| | ednsflags: int |
| | ednsoptions: Optional[List[dns.edns.Option]] |
| | payload: int |
| | cache: Any |
| | flags: Optional[int] |
| | retry_servfail: bool |
| | rotate: bool |
| | ndots: Optional[int] |
| | _nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] |
| |
|
| | def __init__( |
| | self, filename: str = "/etc/resolv.conf", configure: bool = True |
| | ) -> None: |
| | """*filename*, a ``str`` or file object, specifying a file |
| | in standard /etc/resolv.conf format. This parameter is meaningful |
| | only when *configure* is true and the platform is POSIX. |
| | |
| | *configure*, a ``bool``. If True (the default), the resolver |
| | instance is configured in the normal fashion for the operating |
| | system the resolver is running on. (I.e. by reading a |
| | /etc/resolv.conf file on POSIX systems and from the registry |
| | on Windows systems.) |
| | """ |
| |
|
| | self.reset() |
| | if configure: |
| | if sys.platform == "win32": |
| | self.read_registry() |
| | elif filename: |
| | self.read_resolv_conf(filename) |
| |
|
| | def reset(self) -> None: |
| | """Reset all resolver configuration to the defaults.""" |
| |
|
| | self.domain = dns.name.Name(dns.name.from_text(socket.gethostname())[1:]) |
| | if len(self.domain) == 0: |
| | self.domain = dns.name.root |
| | self._nameservers = [] |
| | self.nameserver_ports = {} |
| | self.port = 53 |
| | self.search = [] |
| | self.use_search_by_default = False |
| | self.timeout = 2.0 |
| | self.lifetime = 5.0 |
| | self.keyring = None |
| | self.keyname = None |
| | self.keyalgorithm = dns.tsig.default_algorithm |
| | self.edns = -1 |
| | self.ednsflags = 0 |
| | self.ednsoptions = None |
| | self.payload = 0 |
| | self.cache = None |
| | self.flags = None |
| | self.retry_servfail = False |
| | self.rotate = False |
| | self.ndots = None |
| |
|
| | def read_resolv_conf(self, f: Any) -> None: |
| | """Process *f* as a file in the /etc/resolv.conf format. If f is |
| | a ``str``, it is used as the name of the file to open; otherwise it |
| | is treated as the file itself. |
| | |
| | Interprets the following items: |
| | |
| | - nameserver - name server IP address |
| | |
| | - domain - local domain name |
| | |
| | - search - search list for host-name lookup |
| | |
| | - options - supported options are rotate, timeout, edns0, and ndots |
| | |
| | """ |
| |
|
| | nameservers = [] |
| | if isinstance(f, str): |
| | try: |
| | cm: contextlib.AbstractContextManager = open(f) |
| | except OSError: |
| | |
| | raise NoResolverConfiguration(f"cannot open {f}") |
| | else: |
| | cm = contextlib.nullcontext(f) |
| | with cm as f: |
| | for l in f: |
| | if len(l) == 0 or l[0] == "#" or l[0] == ";": |
| | continue |
| | tokens = l.split() |
| |
|
| | |
| | if len(tokens) < 2: |
| | continue |
| |
|
| | if tokens[0] == "nameserver": |
| | nameservers.append(tokens[1]) |
| | elif tokens[0] == "domain": |
| | self.domain = dns.name.from_text(tokens[1]) |
| | |
| | self.search = [] |
| | elif tokens[0] == "search": |
| | |
| | self.search = [] |
| | for suffix in tokens[1:]: |
| | self.search.append(dns.name.from_text(suffix)) |
| | |
| | |
| | elif tokens[0] == "options": |
| | for opt in tokens[1:]: |
| | if opt == "rotate": |
| | self.rotate = True |
| | elif opt == "edns0": |
| | self.use_edns() |
| | elif "timeout" in opt: |
| | try: |
| | self.timeout = int(opt.split(":")[1]) |
| | except (ValueError, IndexError): |
| | pass |
| | elif "ndots" in opt: |
| | try: |
| | self.ndots = int(opt.split(":")[1]) |
| | except (ValueError, IndexError): |
| | pass |
| | if len(nameservers) == 0: |
| | raise NoResolverConfiguration("no nameservers") |
| | |
| | |
| | self.nameservers = nameservers |
| |
|
| | def read_registry(self) -> None: |
| | """Extract resolver configuration from the Windows registry.""" |
| | try: |
| | info = dns.win32util.get_dns_info() |
| | if info.domain is not None: |
| | self.domain = info.domain |
| | self.nameservers = info.nameservers |
| | self.search = info.search |
| | except AttributeError: |
| | raise NotImplementedError |
| |
|
| | def _compute_timeout( |
| | self, |
| | start: float, |
| | lifetime: Optional[float] = None, |
| | errors: Optional[List[ErrorTuple]] = None, |
| | ) -> float: |
| | lifetime = self.lifetime if lifetime is None else lifetime |
| | now = time.time() |
| | duration = now - start |
| | if errors is None: |
| | errors = [] |
| | if duration < 0: |
| | if duration < -1: |
| | |
| | raise LifetimeTimeout(timeout=duration, errors=errors) |
| | else: |
| | |
| | |
| | |
| | duration = 0 |
| | if duration >= lifetime: |
| | raise LifetimeTimeout(timeout=duration, errors=errors) |
| | return min(lifetime - duration, self.timeout) |
| |
|
| | def _get_qnames_to_try( |
| | self, qname: dns.name.Name, search: Optional[bool] |
| | ) -> List[dns.name.Name]: |
| | |
| | |
| | if search is None: |
| | search = self.use_search_by_default |
| | qnames_to_try = [] |
| | if qname.is_absolute(): |
| | qnames_to_try.append(qname) |
| | else: |
| | abs_qname = qname.concatenate(dns.name.root) |
| | if search: |
| | if len(self.search) > 0: |
| | |
| | search_list = self.search[:] |
| | elif self.domain != dns.name.root and self.domain is not None: |
| | |
| | |
| | search_list = [self.domain] |
| | else: |
| | search_list = [] |
| | |
| | if self.ndots is None: |
| | ndots = 1 |
| | else: |
| | ndots = self.ndots |
| | for suffix in search_list: |
| | qnames_to_try.append(qname + suffix) |
| | if len(qname) > ndots: |
| | |
| | |
| | qnames_to_try.insert(0, abs_qname) |
| | else: |
| | |
| | |
| | qnames_to_try.append(abs_qname) |
| | else: |
| | qnames_to_try.append(abs_qname) |
| | return qnames_to_try |
| |
|
| | def use_tsig( |
| | self, |
| | keyring: Any, |
| | keyname: Optional[Union[dns.name.Name, str]] = None, |
| | algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, |
| | ) -> None: |
| | """Add a TSIG signature to each query. |
| | |
| | The parameters are passed to ``dns.message.Message.use_tsig()``; |
| | see its documentation for details. |
| | """ |
| |
|
| | self.keyring = keyring |
| | self.keyname = keyname |
| | self.keyalgorithm = algorithm |
| |
|
| | def use_edns( |
| | self, |
| | edns: Optional[Union[int, bool]] = 0, |
| | ednsflags: int = 0, |
| | payload: int = dns.message.DEFAULT_EDNS_PAYLOAD, |
| | options: Optional[List[dns.edns.Option]] = None, |
| | ) -> None: |
| | """Configure EDNS behavior. |
| | |
| | *edns*, an ``int``, is the EDNS level to use. Specifying |
| | ``None``, ``False``, or ``-1`` means "do not use EDNS", and in this case |
| | the other parameters are ignored. Specifying ``True`` is |
| | equivalent to specifying 0, i.e. "use EDNS0". |
| | |
| | *ednsflags*, an ``int``, the EDNS flag values. |
| | |
| | *payload*, an ``int``, is the EDNS sender's payload field, which is the |
| | maximum size of UDP datagram the sender can handle. I.e. how big |
| | a response to this message can be. |
| | |
| | *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS |
| | options. |
| | """ |
| |
|
| | if edns is None or edns is False: |
| | edns = -1 |
| | elif edns is True: |
| | edns = 0 |
| | self.edns = edns |
| | self.ednsflags = ednsflags |
| | self.payload = payload |
| | self.ednsoptions = options |
| |
|
| | def set_flags(self, flags: int) -> None: |
| | """Overrides the default flags with your own. |
| | |
| | *flags*, an ``int``, the message flags to use. |
| | """ |
| |
|
| | self.flags = flags |
| |
|
| | @classmethod |
| | def _enrich_nameservers( |
| | cls, |
| | nameservers: Sequence[Union[str, dns.nameserver.Nameserver]], |
| | nameserver_ports: Dict[str, int], |
| | default_port: int, |
| | ) -> List[dns.nameserver.Nameserver]: |
| | enriched_nameservers = [] |
| | if isinstance(nameservers, list): |
| | for nameserver in nameservers: |
| | enriched_nameserver: dns.nameserver.Nameserver |
| | if isinstance(nameserver, dns.nameserver.Nameserver): |
| | enriched_nameserver = nameserver |
| | elif dns.inet.is_address(nameserver): |
| | port = nameserver_ports.get(nameserver, default_port) |
| | enriched_nameserver = dns.nameserver.Do53Nameserver( |
| | nameserver, port |
| | ) |
| | else: |
| | try: |
| | if urlparse(nameserver).scheme != "https": |
| | raise NotImplementedError |
| | except Exception: |
| | raise ValueError( |
| | f"nameserver {nameserver} is not a " |
| | "dns.nameserver.Nameserver instance or text form, " |
| | "IP address, nor a valid https URL" |
| | ) |
| | enriched_nameserver = dns.nameserver.DoHNameserver(nameserver) |
| | enriched_nameservers.append(enriched_nameserver) |
| | else: |
| | raise ValueError( |
| | "nameservers must be a list or tuple (not a {})".format( |
| | type(nameservers) |
| | ) |
| | ) |
| | return enriched_nameservers |
| |
|
| | @property |
| | def nameservers( |
| | self, |
| | ) -> Sequence[Union[str, dns.nameserver.Nameserver]]: |
| | return self._nameservers |
| |
|
| | @nameservers.setter |
| | def nameservers( |
| | self, nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] |
| | ) -> None: |
| | """ |
| | *nameservers*, a ``list`` of nameservers, where a nameserver is either |
| | a string interpretable as a nameserver, or a ``dns.nameserver.Nameserver`` |
| | instance. |
| | |
| | Raises ``ValueError`` if *nameservers* is not a list of nameservers. |
| | """ |
| | |
| | self._enrich_nameservers(nameservers, self.nameserver_ports, self.port) |
| | self._nameservers = nameservers |
| |
|
| |
|
| | class Resolver(BaseResolver): |
| | """DNS stub resolver.""" |
| |
|
| | def resolve( |
| | self, |
| | qname: Union[dns.name.Name, str], |
| | rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| | rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| | tcp: bool = False, |
| | source: Optional[str] = None, |
| | raise_on_no_answer: bool = True, |
| | source_port: int = 0, |
| | lifetime: Optional[float] = None, |
| | search: Optional[bool] = None, |
| | ) -> Answer: |
| | """Query nameservers to find the answer to the question. |
| | |
| | The *qname*, *rdtype*, and *rdclass* parameters may be objects |
| | of the appropriate type, or strings that can be converted into objects |
| | of the appropriate type. |
| | |
| | *qname*, a ``dns.name.Name`` or ``str``, the query name. |
| | |
| | *rdtype*, an ``int`` or ``str``, the query type. |
| | |
| | *rdclass*, an ``int`` or ``str``, the query class. |
| | |
| | *tcp*, a ``bool``. If ``True``, use TCP to make the query. |
| | |
| | *source*, a ``str`` or ``None``. If not ``None``, bind to this IP |
| | address when making queries. |
| | |
| | *raise_on_no_answer*, a ``bool``. If ``True``, raise |
| | ``dns.resolver.NoAnswer`` if there's no answer to the question. |
| | |
| | *source_port*, an ``int``, the port from which to send the message. |
| | |
| | *lifetime*, a ``float``, how many seconds a query should run |
| | before timing out. |
| | |
| | *search*, a ``bool`` or ``None``, determines whether the |
| | search list configured in the system's resolver configuration |
| | are used for relative names, and whether the resolver's domain |
| | may be added to relative names. The default is ``None``, |
| | which causes the value of the resolver's |
| | ``use_search_by_default`` attribute to be used. |
| | |
| | Raises ``dns.resolver.LifetimeTimeout`` if no answers could be found |
| | in the specified lifetime. |
| | |
| | Raises ``dns.resolver.NXDOMAIN`` if the query name does not exist. |
| | |
| | Raises ``dns.resolver.YXDOMAIN`` if the query name is too long after |
| | DNAME substitution. |
| | |
| | Raises ``dns.resolver.NoAnswer`` if *raise_on_no_answer* is |
| | ``True`` and the query name exists but has no RRset of the |
| | desired type and class. |
| | |
| | Raises ``dns.resolver.NoNameservers`` if no non-broken |
| | nameservers are available to answer the question. |
| | |
| | Returns a ``dns.resolver.Answer`` instance. |
| | |
| | """ |
| |
|
| | resolution = _Resolution( |
| | self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search |
| | ) |
| | start = time.time() |
| | while True: |
| | (request, answer) = resolution.next_request() |
| | |
| | |
| | |
| | |
| | if answer is not None: |
| | |
| | return answer |
| | assert request is not None |
| | done = False |
| | while not done: |
| | (nameserver, tcp, backoff) = resolution.next_nameserver() |
| | if backoff: |
| | time.sleep(backoff) |
| | timeout = self._compute_timeout(start, lifetime, resolution.errors) |
| | try: |
| | response = nameserver.query( |
| | request, |
| | timeout=timeout, |
| | source=source, |
| | source_port=source_port, |
| | max_size=tcp, |
| | ) |
| | except Exception as ex: |
| | (_, done) = resolution.query_result(None, ex) |
| | continue |
| | (answer, done) = resolution.query_result(response, None) |
| | |
| | |
| | |
| | |
| | if answer is not None: |
| | return answer |
| |
|
| | def query( |
| | self, |
| | qname: Union[dns.name.Name, str], |
| | rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| | rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| | tcp: bool = False, |
| | source: Optional[str] = None, |
| | raise_on_no_answer: bool = True, |
| | source_port: int = 0, |
| | lifetime: Optional[float] = None, |
| | ) -> Answer: |
| | """Query nameservers to find the answer to the question. |
| | |
| | This method calls resolve() with ``search=True``, and is |
| | provided for backwards compatibility with prior versions of |
| | dnspython. See the documentation for the resolve() method for |
| | further details. |
| | """ |
| | warnings.warn( |
| | "please use dns.resolver.Resolver.resolve() instead", |
| | DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| | return self.resolve( |
| | qname, |
| | rdtype, |
| | rdclass, |
| | tcp, |
| | source, |
| | raise_on_no_answer, |
| | source_port, |
| | lifetime, |
| | True, |
| | ) |
| |
|
| | def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Any) -> Answer: |
| | """Use a resolver to run a reverse query for PTR records. |
| | |
| | This utilizes the resolve() method to perform a PTR lookup on the |
| | specified IP address. |
| | |
| | *ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get |
| | the PTR record for. |
| | |
| | All other arguments that can be passed to the resolve() function |
| | except for rdtype and rdclass are also supported by this |
| | function. |
| | """ |
| | |
| | |
| | |
| | modified_kwargs: Dict[str, Any] = {} |
| | modified_kwargs.update(kwargs) |
| | modified_kwargs["rdtype"] = dns.rdatatype.PTR |
| | modified_kwargs["rdclass"] = dns.rdataclass.IN |
| | return self.resolve( |
| | dns.reversename.from_address(ipaddr), *args, **modified_kwargs |
| | ) |
| |
|
| | def resolve_name( |
| | self, |
| | name: Union[dns.name.Name, str], |
| | family: int = socket.AF_UNSPEC, |
| | **kwargs: Any, |
| | ) -> HostAnswers: |
| | """Use a resolver to query for address records. |
| | |
| | This utilizes the resolve() method to perform A and/or AAAA lookups on |
| | the specified name. |
| | |
| | *qname*, a ``dns.name.Name`` or ``str``, the name to resolve. |
| | |
| | *family*, an ``int``, the address family. If socket.AF_UNSPEC |
| | (the default), both A and AAAA records will be retrieved. |
| | |
| | All other arguments that can be passed to the resolve() function |
| | except for rdtype and rdclass are also supported by this |
| | function. |
| | """ |
| | |
| | |
| | |
| | modified_kwargs: Dict[str, Any] = {} |
| | modified_kwargs.update(kwargs) |
| | modified_kwargs.pop("rdtype", None) |
| | modified_kwargs["rdclass"] = dns.rdataclass.IN |
| |
|
| | if family == socket.AF_INET: |
| | v4 = self.resolve(name, dns.rdatatype.A, **modified_kwargs) |
| | return HostAnswers.make(v4=v4) |
| | elif family == socket.AF_INET6: |
| | v6 = self.resolve(name, dns.rdatatype.AAAA, **modified_kwargs) |
| | return HostAnswers.make(v6=v6) |
| | elif family != socket.AF_UNSPEC: |
| | raise NotImplementedError(f"unknown address family {family}") |
| |
|
| | raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True) |
| | lifetime = modified_kwargs.pop("lifetime", None) |
| | start = time.time() |
| | v6 = self.resolve( |
| | name, |
| | dns.rdatatype.AAAA, |
| | raise_on_no_answer=False, |
| | lifetime=self._compute_timeout(start, lifetime), |
| | **modified_kwargs, |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | name = v6.qname |
| | v4 = self.resolve( |
| | name, |
| | dns.rdatatype.A, |
| | raise_on_no_answer=False, |
| | lifetime=self._compute_timeout(start, lifetime), |
| | **modified_kwargs, |
| | ) |
| | answers = HostAnswers.make(v6=v6, v4=v4, add_empty=not raise_on_no_answer) |
| | if not answers: |
| | raise NoAnswer(response=v6.response) |
| | return answers |
| |
|
| | |
| |
|
| | def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: |
| | """Determine the canonical name of *name*. |
| | |
| | The canonical name is the name the resolver uses for queries |
| | after all CNAME and DNAME renamings have been applied. |
| | |
| | *name*, a ``dns.name.Name`` or ``str``, the query name. |
| | |
| | This method can raise any exception that ``resolve()`` can |
| | raise, other than ``dns.resolver.NoAnswer`` and |
| | ``dns.resolver.NXDOMAIN``. |
| | |
| | Returns a ``dns.name.Name``. |
| | """ |
| | try: |
| | answer = self.resolve(name, raise_on_no_answer=False) |
| | canonical_name = answer.canonical_name |
| | except dns.resolver.NXDOMAIN as e: |
| | canonical_name = e.canonical_name |
| | return canonical_name |
| |
|
| | |
| |
|
| | def try_ddr(self, lifetime: float = 5.0) -> None: |
| | """Try to update the resolver's nameservers using Discovery of Designated |
| | Resolvers (DDR). If successful, the resolver will subsequently use |
| | DNS-over-HTTPS or DNS-over-TLS for future queries. |
| | |
| | *lifetime*, a float, is the maximum time to spend attempting DDR. The default |
| | is 5 seconds. |
| | |
| | If the SVCB query is successful and results in a non-empty list of nameservers, |
| | then the resolver's nameservers are set to the returned servers in priority |
| | order. |
| | |
| | The current implementation does not use any address hints from the SVCB record, |
| | nor does it resolve addresses for the SCVB target name, rather it assumes that |
| | the bootstrap nameserver will always be one of the addresses and uses it. |
| | A future revision to the code may offer fuller support. The code verifies that |
| | the bootstrap nameserver is in the Subject Alternative Name field of the |
| | TLS certficate. |
| | """ |
| | try: |
| | expiration = time.time() + lifetime |
| | answer = self.resolve( |
| | dns._ddr._local_resolver_name, "SVCB", lifetime=lifetime |
| | ) |
| | timeout = dns.query._remaining(expiration) |
| | nameservers = dns._ddr._get_nameservers_sync(answer, timeout) |
| | if len(nameservers) > 0: |
| | self.nameservers = nameservers |
| | except Exception: |
| | pass |
| |
|
| |
|
| | |
| | default_resolver: Optional[Resolver] = None |
| |
|
| |
|
| | def get_default_resolver() -> Resolver: |
| | """Get the default resolver, initializing it if necessary.""" |
| | if default_resolver is None: |
| | reset_default_resolver() |
| | assert default_resolver is not None |
| | return default_resolver |
| |
|
| |
|
| | def reset_default_resolver() -> None: |
| | """Re-initialize default resolver. |
| | |
| | Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX |
| | systems) will be re-read immediately. |
| | """ |
| |
|
| | global default_resolver |
| | default_resolver = Resolver() |
| |
|
| |
|
| | def resolve( |
| | qname: Union[dns.name.Name, str], |
| | rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| | rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| | tcp: bool = False, |
| | source: Optional[str] = None, |
| | raise_on_no_answer: bool = True, |
| | source_port: int = 0, |
| | lifetime: Optional[float] = None, |
| | search: Optional[bool] = None, |
| | ) -> Answer: |
| | """Query nameservers to find the answer to the question. |
| | |
| | This is a convenience function that uses the default resolver |
| | object to make the query. |
| | |
| | See ``dns.resolver.Resolver.resolve`` for more information on the |
| | parameters. |
| | """ |
| |
|
| | return get_default_resolver().resolve( |
| | qname, |
| | rdtype, |
| | rdclass, |
| | tcp, |
| | source, |
| | raise_on_no_answer, |
| | source_port, |
| | lifetime, |
| | search, |
| | ) |
| |
|
| |
|
| | def query( |
| | qname: Union[dns.name.Name, str], |
| | rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| | rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| | tcp: bool = False, |
| | source: Optional[str] = None, |
| | raise_on_no_answer: bool = True, |
| | source_port: int = 0, |
| | lifetime: Optional[float] = None, |
| | ) -> Answer: |
| | """Query nameservers to find the answer to the question. |
| | |
| | This method calls resolve() with ``search=True``, and is |
| | provided for backwards compatibility with prior versions of |
| | dnspython. See the documentation for the resolve() method for |
| | further details. |
| | """ |
| | warnings.warn( |
| | "please use dns.resolver.resolve() instead", DeprecationWarning, stacklevel=2 |
| | ) |
| | return resolve( |
| | qname, |
| | rdtype, |
| | rdclass, |
| | tcp, |
| | source, |
| | raise_on_no_answer, |
| | source_port, |
| | lifetime, |
| | True, |
| | ) |
| |
|
| |
|
| | def resolve_address(ipaddr: str, *args: Any, **kwargs: Any) -> Answer: |
| | """Use a resolver to run a reverse query for PTR records. |
| | |
| | See ``dns.resolver.Resolver.resolve_address`` for more information on the |
| | parameters. |
| | """ |
| |
|
| | return get_default_resolver().resolve_address(ipaddr, *args, **kwargs) |
| |
|
| |
|
| | def resolve_name( |
| | name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any |
| | ) -> HostAnswers: |
| | """Use a resolver to query for address records. |
| | |
| | See ``dns.resolver.Resolver.resolve_name`` for more information on the |
| | parameters. |
| | """ |
| |
|
| | return get_default_resolver().resolve_name(name, family, **kwargs) |
| |
|
| |
|
| | def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: |
| | """Determine the canonical name of *name*. |
| | |
| | See ``dns.resolver.Resolver.canonical_name`` for more information on the |
| | parameters and possible exceptions. |
| | """ |
| |
|
| | return get_default_resolver().canonical_name(name) |
| |
|
| |
|
| | def try_ddr(lifetime: float = 5.0) -> None: |
| | """Try to update the default resolver's nameservers using Discovery of Designated |
| | Resolvers (DDR). If successful, the resolver will subsequently use |
| | DNS-over-HTTPS or DNS-over-TLS for future queries. |
| | |
| | See :py:func:`dns.resolver.Resolver.try_ddr` for more information. |
| | """ |
| | return get_default_resolver().try_ddr(lifetime) |
| |
|
| |
|
| | def zone_for_name( |
| | name: Union[dns.name.Name, str], |
| | rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, |
| | tcp: bool = False, |
| | resolver: Optional[Resolver] = None, |
| | lifetime: Optional[float] = None, |
| | ) -> dns.name.Name: |
| | """Find the name of the zone which contains the specified name. |
| | |
| | *name*, an absolute ``dns.name.Name`` or ``str``, the query name. |
| | |
| | *rdclass*, an ``int``, the query class. |
| | |
| | *tcp*, a ``bool``. If ``True``, use TCP to make the query. |
| | |
| | *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. |
| | If ``None``, the default, then the default resolver is used. |
| | |
| | *lifetime*, a ``float``, the total time to allow for the queries needed |
| | to determine the zone. If ``None``, the default, then only the individual |
| | query limits of the resolver apply. |
| | |
| | Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS |
| | root. (This is only likely to happen if you're using non-default |
| | root servers in your network and they are misconfigured.) |
| | |
| | Raises ``dns.resolver.LifetimeTimeout`` if the answer could not be |
| | found in the allotted lifetime. |
| | |
| | Returns a ``dns.name.Name``. |
| | """ |
| |
|
| | if isinstance(name, str): |
| | name = dns.name.from_text(name, dns.name.root) |
| | if resolver is None: |
| | resolver = get_default_resolver() |
| | if not name.is_absolute(): |
| | raise NotAbsolute(name) |
| | start = time.time() |
| | expiration: Optional[float] |
| | if lifetime is not None: |
| | expiration = start + lifetime |
| | else: |
| | expiration = None |
| | while 1: |
| | try: |
| | rlifetime: Optional[float] |
| | if expiration is not None: |
| | rlifetime = expiration - time.time() |
| | if rlifetime <= 0: |
| | rlifetime = 0 |
| | else: |
| | rlifetime = None |
| | answer = resolver.resolve( |
| | name, dns.rdatatype.SOA, rdclass, tcp, lifetime=rlifetime |
| | ) |
| | assert answer.rrset is not None |
| | if answer.rrset.name == name: |
| | return name |
| | |
| | except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e: |
| | if isinstance(e, dns.resolver.NXDOMAIN): |
| | response = e.responses().get(name) |
| | else: |
| | response = e.response() |
| | if response: |
| | for rrs in response.authority: |
| | if rrs.rdtype == dns.rdatatype.SOA and rrs.rdclass == rdclass: |
| | (nr, _, _) = rrs.name.fullcompare(name) |
| | if nr == dns.name.NAMERELN_SUPERDOMAIN: |
| | |
| | |
| | |
| | |
| | |
| | |
| | return rrs.name |
| | |
| | |
| | try: |
| | name = name.parent() |
| | except dns.name.NoParent: |
| | raise NoRootSOA |
| |
|
| |
|
| | def make_resolver_at( |
| | where: Union[dns.name.Name, str], |
| | port: int = 53, |
| | family: int = socket.AF_UNSPEC, |
| | resolver: Optional[Resolver] = None, |
| | ) -> Resolver: |
| | """Make a stub resolver using the specified destination as the full resolver. |
| | |
| | *where*, a ``dns.name.Name`` or ``str`` the domain name or IP address of the |
| | full resolver. |
| | |
| | *port*, an ``int``, the port to use. If not specified, the default is 53. |
| | |
| | *family*, an ``int``, the address family to use. This parameter is used if |
| | *where* is not an address. The default is ``socket.AF_UNSPEC`` in which case |
| | the first address returned by ``resolve_name()`` will be used, otherwise the |
| | first address of the specified family will be used. |
| | |
| | *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use for |
| | resolution of hostnames. If not specified, the default resolver will be used. |
| | |
| | Returns a ``dns.resolver.Resolver`` or raises an exception. |
| | """ |
| | if resolver is None: |
| | resolver = get_default_resolver() |
| | nameservers: List[Union[str, dns.nameserver.Nameserver]] = [] |
| | if isinstance(where, str) and dns.inet.is_address(where): |
| | nameservers.append(dns.nameserver.Do53Nameserver(where, port)) |
| | else: |
| | for address in resolver.resolve_name(where, family).addresses(): |
| | nameservers.append(dns.nameserver.Do53Nameserver(address, port)) |
| | res = dns.resolver.Resolver(configure=False) |
| | res.nameservers = nameservers |
| | return res |
| |
|
| |
|
| | def resolve_at( |
| | where: Union[dns.name.Name, str], |
| | qname: Union[dns.name.Name, str], |
| | rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| | rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| | tcp: bool = False, |
| | source: Optional[str] = None, |
| | raise_on_no_answer: bool = True, |
| | source_port: int = 0, |
| | lifetime: Optional[float] = None, |
| | search: Optional[bool] = None, |
| | port: int = 53, |
| | family: int = socket.AF_UNSPEC, |
| | resolver: Optional[Resolver] = None, |
| | ) -> Answer: |
| | """Query nameservers to find the answer to the question. |
| | |
| | This is a convenience function that calls ``dns.resolver.make_resolver_at()`` to |
| | make a resolver, and then uses it to resolve the query. |
| | |
| | See ``dns.resolver.Resolver.resolve`` for more information on the resolution |
| | parameters, and ``dns.resolver.make_resolver_at`` for information about the resolver |
| | parameters *where*, *port*, *family*, and *resolver*. |
| | |
| | If making more than one query, it is more efficient to call |
| | ``dns.resolver.make_resolver_at()`` and then use that resolver for the queries |
| | instead of calling ``resolve_at()`` multiple times. |
| | """ |
| | return make_resolver_at(where, port, family, resolver).resolve( |
| | qname, |
| | rdtype, |
| | rdclass, |
| | tcp, |
| | source, |
| | raise_on_no_answer, |
| | source_port, |
| | lifetime, |
| | search, |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| | _protocols_for_socktype = { |
| | socket.SOCK_DGRAM: [socket.SOL_UDP], |
| | socket.SOCK_STREAM: [socket.SOL_TCP], |
| | } |
| |
|
| | _resolver = None |
| | _original_getaddrinfo = socket.getaddrinfo |
| | _original_getnameinfo = socket.getnameinfo |
| | _original_getfqdn = socket.getfqdn |
| | _original_gethostbyname = socket.gethostbyname |
| | _original_gethostbyname_ex = socket.gethostbyname_ex |
| | _original_gethostbyaddr = socket.gethostbyaddr |
| |
|
| |
|
| | def _getaddrinfo( |
| | host=None, service=None, family=socket.AF_UNSPEC, socktype=0, proto=0, flags=0 |
| | ): |
| | if flags & socket.AI_NUMERICHOST != 0: |
| | |
| | |
| | |
| | |
| | |
| | return _original_getaddrinfo(host, service, family, socktype, proto, flags) |
| | if flags & (socket.AI_ADDRCONFIG | socket.AI_V4MAPPED) != 0: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | raise socket.gaierror( |
| | socket.EAI_FAIL, "Non-recoverable failure in name resolution" |
| | ) |
| | if host is None and service is None: |
| | raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| | addrs = [] |
| | canonical_name = None |
| | |
| | |
| | if host is None: |
| | return _original_getaddrinfo(host, service, family, socktype, proto, flags) |
| | try: |
| | |
| | |
| | |
| | dns.inet.af_for_address(host) |
| | return _original_getaddrinfo(host, service, family, socktype, proto, flags) |
| | except Exception: |
| | pass |
| | |
| | try: |
| | answers = _resolver.resolve_name(host, family) |
| | addrs = answers.addresses_and_families() |
| | canonical_name = answers.canonical_name().to_text(True) |
| | except dns.resolver.NXDOMAIN: |
| | raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| | except Exception: |
| | |
| | |
| | |
| | raise socket.gaierror(socket.EAI_AGAIN, "Temporary failure in name resolution") |
| | port = None |
| | try: |
| | |
| | if service is None: |
| | port = 0 |
| | else: |
| | port = int(service) |
| | except Exception: |
| | if flags & socket.AI_NUMERICSERV == 0: |
| | try: |
| | port = socket.getservbyname(service) |
| | except Exception: |
| | pass |
| | if port is None: |
| | raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| | tuples = [] |
| | if socktype == 0: |
| | socktypes = [socket.SOCK_DGRAM, socket.SOCK_STREAM] |
| | else: |
| | socktypes = [socktype] |
| | if flags & socket.AI_CANONNAME != 0: |
| | cname = canonical_name |
| | else: |
| | cname = "" |
| | for addr, af in addrs: |
| | for socktype in socktypes: |
| | for proto in _protocols_for_socktype[socktype]: |
| | addr_tuple = dns.inet.low_level_address_tuple((addr, port), af) |
| | tuples.append((af, socktype, proto, cname, addr_tuple)) |
| | if len(tuples) == 0: |
| | raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| | return tuples |
| |
|
| |
|
| | def _getnameinfo(sockaddr, flags=0): |
| | host = sockaddr[0] |
| | port = sockaddr[1] |
| | if len(sockaddr) == 4: |
| | scope = sockaddr[3] |
| | family = socket.AF_INET6 |
| | else: |
| | scope = None |
| | family = socket.AF_INET |
| | tuples = _getaddrinfo(host, port, family, socket.SOCK_STREAM, socket.SOL_TCP, 0) |
| | if len(tuples) > 1: |
| | raise socket.error("sockaddr resolved to multiple addresses") |
| | addr = tuples[0][4][0] |
| | if flags & socket.NI_DGRAM: |
| | pname = "udp" |
| | else: |
| | pname = "tcp" |
| | qname = dns.reversename.from_address(addr) |
| | if flags & socket.NI_NUMERICHOST == 0: |
| | try: |
| | answer = _resolver.resolve(qname, "PTR") |
| | hostname = answer.rrset[0].target.to_text(True) |
| | except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): |
| | if flags & socket.NI_NAMEREQD: |
| | raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| | hostname = addr |
| | if scope is not None: |
| | hostname += "%" + str(scope) |
| | else: |
| | hostname = addr |
| | if scope is not None: |
| | hostname += "%" + str(scope) |
| | if flags & socket.NI_NUMERICSERV: |
| | service = str(port) |
| | else: |
| | service = socket.getservbyport(port, pname) |
| | return (hostname, service) |
| |
|
| |
|
| | def _getfqdn(name=None): |
| | if name is None: |
| | name = socket.gethostname() |
| | try: |
| | (name, _, _) = _gethostbyaddr(name) |
| | |
| | |
| | except Exception: |
| | pass |
| | return name |
| |
|
| |
|
| | def _gethostbyname(name): |
| | return _gethostbyname_ex(name)[2][0] |
| |
|
| |
|
| | def _gethostbyname_ex(name): |
| | aliases = [] |
| | addresses = [] |
| | tuples = _getaddrinfo( |
| | name, 0, socket.AF_INET, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME |
| | ) |
| | canonical = tuples[0][3] |
| | for item in tuples: |
| | addresses.append(item[4][0]) |
| | |
| | return (canonical, aliases, addresses) |
| |
|
| |
|
| | def _gethostbyaddr(ip): |
| | try: |
| | dns.ipv6.inet_aton(ip) |
| | sockaddr = (ip, 80, 0, 0) |
| | family = socket.AF_INET6 |
| | except Exception: |
| | try: |
| | dns.ipv4.inet_aton(ip) |
| | except Exception: |
| | raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| | sockaddr = (ip, 80) |
| | family = socket.AF_INET |
| | (name, _) = _getnameinfo(sockaddr, socket.NI_NAMEREQD) |
| | aliases = [] |
| | addresses = [] |
| | tuples = _getaddrinfo( |
| | name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME |
| | ) |
| | canonical = tuples[0][3] |
| | |
| | |
| | |
| | bin_ip = dns.inet.inet_pton(family, ip) |
| | for item in tuples: |
| | addr = item[4][0] |
| | bin_addr = dns.inet.inet_pton(family, addr) |
| | if bin_ip == bin_addr: |
| | addresses.append(addr) |
| | |
| | return (canonical, aliases, addresses) |
| |
|
| |
|
| | def override_system_resolver(resolver: Optional[Resolver] = None) -> None: |
| | """Override the system resolver routines in the socket module with |
| | versions which use dnspython's resolver. |
| | |
| | This can be useful in testing situations where you want to control |
| | the resolution behavior of python code without having to change |
| | the system's resolver settings (e.g. /etc/resolv.conf). |
| | |
| | The resolver to use may be specified; if it's not, the default |
| | resolver will be used. |
| | |
| | resolver, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. |
| | """ |
| |
|
| | if resolver is None: |
| | resolver = get_default_resolver() |
| | global _resolver |
| | _resolver = resolver |
| | socket.getaddrinfo = _getaddrinfo |
| | socket.getnameinfo = _getnameinfo |
| | socket.getfqdn = _getfqdn |
| | socket.gethostbyname = _gethostbyname |
| | socket.gethostbyname_ex = _gethostbyname_ex |
| | socket.gethostbyaddr = _gethostbyaddr |
| |
|
| |
|
| | def restore_system_resolver() -> None: |
| | """Undo the effects of prior override_system_resolver().""" |
| |
|
| | global _resolver |
| | _resolver = None |
| | socket.getaddrinfo = _original_getaddrinfo |
| | socket.getnameinfo = _original_getnameinfo |
| | socket.getfqdn = _original_getfqdn |
| | socket.gethostbyname = _original_gethostbyname |
| | socket.gethostbyname_ex = _original_gethostbyname_ex |
| | socket.gethostbyaddr = _original_gethostbyaddr |
| |
|