Spaces:
Paused
Paused
| # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
| # Copyright (C) 2009-2017 Nominum, Inc. | |
| # | |
| # Permission to use, copy, modify, and distribute this software and its | |
| # documentation for any purpose with or without fee is hereby granted, | |
| # provided that the above copyright notice and this permission notice | |
| # appear in all copies. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
| # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
| # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
| # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
| # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
| # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
| # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
| """EDNS Options""" | |
| import binascii | |
| import math | |
| import socket | |
| import struct | |
| from typing import Any, Dict, Optional, Union | |
| import dns.enum | |
| import dns.inet | |
| import dns.rdata | |
| import dns.wire | |
| class OptionType(dns.enum.IntEnum): | |
| #: NSID | |
| NSID = 3 | |
| #: DAU | |
| DAU = 5 | |
| #: DHU | |
| DHU = 6 | |
| #: N3U | |
| N3U = 7 | |
| #: ECS (client-subnet) | |
| ECS = 8 | |
| #: EXPIRE | |
| EXPIRE = 9 | |
| #: COOKIE | |
| COOKIE = 10 | |
| #: KEEPALIVE | |
| KEEPALIVE = 11 | |
| #: PADDING | |
| PADDING = 12 | |
| #: CHAIN | |
| CHAIN = 13 | |
| #: EDE (extended-dns-error) | |
| EDE = 15 | |
| def _maximum(cls): | |
| return 65535 | |
| class Option: | |
| """Base class for all EDNS option types.""" | |
| def __init__(self, otype: Union[OptionType, str]): | |
| """Initialize an option. | |
| *otype*, a ``dns.edns.OptionType``, is the option type. | |
| """ | |
| self.otype = OptionType.make(otype) | |
| def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: | |
| """Convert an option to wire format. | |
| Returns a ``bytes`` or ``None``. | |
| """ | |
| raise NotImplementedError # pragma: no cover | |
| def to_text(self) -> str: | |
| raise NotImplementedError # pragma: no cover | |
| def from_wire_parser(cls, otype: OptionType, parser: "dns.wire.Parser") -> "Option": | |
| """Build an EDNS option object from wire format. | |
| *otype*, a ``dns.edns.OptionType``, is the option type. | |
| *parser*, a ``dns.wire.Parser``, the parser, which should be | |
| restructed to the option length. | |
| Returns a ``dns.edns.Option``. | |
| """ | |
| raise NotImplementedError # pragma: no cover | |
| def _cmp(self, other): | |
| """Compare an EDNS option with another option of the same type. | |
| Returns < 0 if < *other*, 0 if == *other*, and > 0 if > *other*. | |
| """ | |
| wire = self.to_wire() | |
| owire = other.to_wire() | |
| if wire == owire: | |
| return 0 | |
| if wire > owire: | |
| return 1 | |
| return -1 | |
| def __eq__(self, other): | |
| if not isinstance(other, Option): | |
| return False | |
| if self.otype != other.otype: | |
| return False | |
| return self._cmp(other) == 0 | |
| def __ne__(self, other): | |
| if not isinstance(other, Option): | |
| return True | |
| if self.otype != other.otype: | |
| return True | |
| return self._cmp(other) != 0 | |
| def __lt__(self, other): | |
| if not isinstance(other, Option) or self.otype != other.otype: | |
| return NotImplemented | |
| return self._cmp(other) < 0 | |
| def __le__(self, other): | |
| if not isinstance(other, Option) or self.otype != other.otype: | |
| return NotImplemented | |
| return self._cmp(other) <= 0 | |
| def __ge__(self, other): | |
| if not isinstance(other, Option) or self.otype != other.otype: | |
| return NotImplemented | |
| return self._cmp(other) >= 0 | |
| def __gt__(self, other): | |
| if not isinstance(other, Option) or self.otype != other.otype: | |
| return NotImplemented | |
| return self._cmp(other) > 0 | |
| def __str__(self): | |
| return self.to_text() | |
| class GenericOption(Option): # lgtm[py/missing-equals] | |
| """Generic Option Class | |
| This class is used for EDNS option types for which we have no better | |
| implementation. | |
| """ | |
| def __init__(self, otype: Union[OptionType, str], data: Union[bytes, str]): | |
| super().__init__(otype) | |
| self.data = dns.rdata.Rdata._as_bytes(data, True) | |
| def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: | |
| if file: | |
| file.write(self.data) | |
| return None | |
| else: | |
| return self.data | |
| def to_text(self) -> str: | |
| return "Generic %d" % self.otype | |
| def from_wire_parser( | |
| cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" | |
| ) -> Option: | |
| return cls(otype, parser.get_remaining()) | |
| class ECSOption(Option): # lgtm[py/missing-equals] | |
| """EDNS Client Subnet (ECS, RFC7871)""" | |
| def __init__(self, address: str, srclen: Optional[int] = None, scopelen: int = 0): | |
| """*address*, a ``str``, is the client address information. | |
| *srclen*, an ``int``, the source prefix length, which is the | |
| leftmost number of bits of the address to be used for the | |
| lookup. The default is 24 for IPv4 and 56 for IPv6. | |
| *scopelen*, an ``int``, the scope prefix length. This value | |
| must be 0 in queries, and should be set in responses. | |
| """ | |
| super().__init__(OptionType.ECS) | |
| af = dns.inet.af_for_address(address) | |
| if af == socket.AF_INET6: | |
| self.family = 2 | |
| if srclen is None: | |
| srclen = 56 | |
| address = dns.rdata.Rdata._as_ipv6_address(address) | |
| srclen = dns.rdata.Rdata._as_int(srclen, 0, 128) | |
| scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 128) | |
| elif af == socket.AF_INET: | |
| self.family = 1 | |
| if srclen is None: | |
| srclen = 24 | |
| address = dns.rdata.Rdata._as_ipv4_address(address) | |
| srclen = dns.rdata.Rdata._as_int(srclen, 0, 32) | |
| scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 32) | |
| else: # pragma: no cover (this will never happen) | |
| raise ValueError("Bad address family") | |
| assert srclen is not None | |
| self.address = address | |
| self.srclen = srclen | |
| self.scopelen = scopelen | |
| addrdata = dns.inet.inet_pton(af, address) | |
| nbytes = int(math.ceil(srclen / 8.0)) | |
| # Truncate to srclen and pad to the end of the last octet needed | |
| # See RFC section 6 | |
| self.addrdata = addrdata[:nbytes] | |
| nbits = srclen % 8 | |
| if nbits != 0: | |
| last = struct.pack("B", ord(self.addrdata[-1:]) & (0xFF << (8 - nbits))) | |
| self.addrdata = self.addrdata[:-1] + last | |
| def to_text(self) -> str: | |
| return "ECS {}/{} scope/{}".format(self.address, self.srclen, self.scopelen) | |
| def from_text(text: str) -> Option: | |
| """Convert a string into a `dns.edns.ECSOption` | |
| *text*, a `str`, the text form of the option. | |
| Returns a `dns.edns.ECSOption`. | |
| Examples: | |
| >>> import dns.edns | |
| >>> | |
| >>> # basic example | |
| >>> dns.edns.ECSOption.from_text('1.2.3.4/24') | |
| >>> | |
| >>> # also understands scope | |
| >>> dns.edns.ECSOption.from_text('1.2.3.4/24/32') | |
| >>> | |
| >>> # IPv6 | |
| >>> dns.edns.ECSOption.from_text('2001:4b98::1/64/64') | |
| >>> | |
| >>> # it understands results from `dns.edns.ECSOption.to_text()` | |
| >>> dns.edns.ECSOption.from_text('ECS 1.2.3.4/24/32') | |
| """ | |
| optional_prefix = "ECS" | |
| tokens = text.split() | |
| ecs_text = None | |
| if len(tokens) == 1: | |
| ecs_text = tokens[0] | |
| elif len(tokens) == 2: | |
| if tokens[0] != optional_prefix: | |
| raise ValueError('could not parse ECS from "{}"'.format(text)) | |
| ecs_text = tokens[1] | |
| else: | |
| raise ValueError('could not parse ECS from "{}"'.format(text)) | |
| n_slashes = ecs_text.count("/") | |
| if n_slashes == 1: | |
| address, tsrclen = ecs_text.split("/") | |
| tscope = "0" | |
| elif n_slashes == 2: | |
| address, tsrclen, tscope = ecs_text.split("/") | |
| else: | |
| raise ValueError('could not parse ECS from "{}"'.format(text)) | |
| try: | |
| scope = int(tscope) | |
| except ValueError: | |
| raise ValueError( | |
| "invalid scope " + '"{}": scope must be an integer'.format(tscope) | |
| ) | |
| try: | |
| srclen = int(tsrclen) | |
| except ValueError: | |
| raise ValueError( | |
| "invalid srclen " + '"{}": srclen must be an integer'.format(tsrclen) | |
| ) | |
| return ECSOption(address, srclen, scope) | |
| def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: | |
| value = ( | |
| struct.pack("!HBB", self.family, self.srclen, self.scopelen) + self.addrdata | |
| ) | |
| if file: | |
| file.write(value) | |
| return None | |
| else: | |
| return value | |
| def from_wire_parser( | |
| cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" | |
| ) -> Option: | |
| family, src, scope = parser.get_struct("!HBB") | |
| addrlen = int(math.ceil(src / 8.0)) | |
| prefix = parser.get_bytes(addrlen) | |
| if family == 1: | |
| pad = 4 - addrlen | |
| addr = dns.ipv4.inet_ntoa(prefix + b"\x00" * pad) | |
| elif family == 2: | |
| pad = 16 - addrlen | |
| addr = dns.ipv6.inet_ntoa(prefix + b"\x00" * pad) | |
| else: | |
| raise ValueError("unsupported family") | |
| return cls(addr, src, scope) | |
| class EDECode(dns.enum.IntEnum): | |
| OTHER = 0 | |
| UNSUPPORTED_DNSKEY_ALGORITHM = 1 | |
| UNSUPPORTED_DS_DIGEST_TYPE = 2 | |
| STALE_ANSWER = 3 | |
| FORGED_ANSWER = 4 | |
| DNSSEC_INDETERMINATE = 5 | |
| DNSSEC_BOGUS = 6 | |
| SIGNATURE_EXPIRED = 7 | |
| SIGNATURE_NOT_YET_VALID = 8 | |
| DNSKEY_MISSING = 9 | |
| RRSIGS_MISSING = 10 | |
| NO_ZONE_KEY_BIT_SET = 11 | |
| NSEC_MISSING = 12 | |
| CACHED_ERROR = 13 | |
| NOT_READY = 14 | |
| BLOCKED = 15 | |
| CENSORED = 16 | |
| FILTERED = 17 | |
| PROHIBITED = 18 | |
| STALE_NXDOMAIN_ANSWER = 19 | |
| NOT_AUTHORITATIVE = 20 | |
| NOT_SUPPORTED = 21 | |
| NO_REACHABLE_AUTHORITY = 22 | |
| NETWORK_ERROR = 23 | |
| INVALID_DATA = 24 | |
| def _maximum(cls): | |
| return 65535 | |
| class EDEOption(Option): # lgtm[py/missing-equals] | |
| """Extended DNS Error (EDE, RFC8914)""" | |
| _preserve_case = {"DNSKEY", "DS", "DNSSEC", "RRSIGs", "NSEC", "NXDOMAIN"} | |
| def __init__(self, code: Union[EDECode, str], text: Optional[str] = None): | |
| """*code*, a ``dns.edns.EDECode`` or ``str``, the info code of the | |
| extended error. | |
| *text*, a ``str`` or ``None``, specifying additional information about | |
| the error. | |
| """ | |
| super().__init__(OptionType.EDE) | |
| self.code = EDECode.make(code) | |
| if text is not None and not isinstance(text, str): | |
| raise ValueError("text must be string or None") | |
| self.text = text | |
| def to_text(self) -> str: | |
| output = f"EDE {self.code}" | |
| if self.code in EDECode: | |
| desc = EDECode.to_text(self.code) | |
| desc = " ".join( | |
| word if word in self._preserve_case else word.title() | |
| for word in desc.split("_") | |
| ) | |
| output += f" ({desc})" | |
| if self.text is not None: | |
| output += f": {self.text}" | |
| return output | |
| def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: | |
| value = struct.pack("!H", self.code) | |
| if self.text is not None: | |
| value += self.text.encode("utf8") | |
| if file: | |
| file.write(value) | |
| return None | |
| else: | |
| return value | |
| def from_wire_parser( | |
| cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" | |
| ) -> Option: | |
| code = EDECode.make(parser.get_uint16()) | |
| text = parser.get_remaining() | |
| if text: | |
| if text[-1] == 0: # text MAY be null-terminated | |
| text = text[:-1] | |
| btext = text.decode("utf8") | |
| else: | |
| btext = None | |
| return cls(code, btext) | |
| class NSIDOption(Option): | |
| def __init__(self, nsid: bytes): | |
| super().__init__(OptionType.NSID) | |
| self.nsid = nsid | |
| def to_wire(self, file: Any = None) -> Optional[bytes]: | |
| if file: | |
| file.write(self.nsid) | |
| return None | |
| else: | |
| return self.nsid | |
| def to_text(self) -> str: | |
| if all(c >= 0x20 and c <= 0x7E for c in self.nsid): | |
| # All ASCII printable, so it's probably a string. | |
| value = self.nsid.decode() | |
| else: | |
| value = binascii.hexlify(self.nsid).decode() | |
| return f"NSID {value}" | |
| def from_wire_parser( | |
| cls, otype: Union[OptionType, str], parser: dns.wire.Parser | |
| ) -> Option: | |
| return cls(parser.get_remaining()) | |
| _type_to_class: Dict[OptionType, Any] = { | |
| OptionType.ECS: ECSOption, | |
| OptionType.EDE: EDEOption, | |
| OptionType.NSID: NSIDOption, | |
| } | |
| def get_option_class(otype: OptionType) -> Any: | |
| """Return the class for the specified option type. | |
| The GenericOption class is used if a more specific class is not | |
| known. | |
| """ | |
| cls = _type_to_class.get(otype) | |
| if cls is None: | |
| cls = GenericOption | |
| return cls | |
| def option_from_wire_parser( | |
| otype: Union[OptionType, str], parser: "dns.wire.Parser" | |
| ) -> Option: | |
| """Build an EDNS option object from wire format. | |
| *otype*, an ``int``, is the option type. | |
| *parser*, a ``dns.wire.Parser``, the parser, which should be | |
| restricted to the option length. | |
| Returns an instance of a subclass of ``dns.edns.Option``. | |
| """ | |
| otype = OptionType.make(otype) | |
| cls = get_option_class(otype) | |
| return cls.from_wire_parser(otype, parser) | |
| def option_from_wire( | |
| otype: Union[OptionType, str], wire: bytes, current: int, olen: int | |
| ) -> Option: | |
| """Build an EDNS option object from wire format. | |
| *otype*, an ``int``, is the option type. | |
| *wire*, a ``bytes``, is the wire-format message. | |
| *current*, an ``int``, is the offset in *wire* of the beginning | |
| of the rdata. | |
| *olen*, an ``int``, is the length of the wire-format option data | |
| Returns an instance of a subclass of ``dns.edns.Option``. | |
| """ | |
| parser = dns.wire.Parser(wire, current) | |
| with parser.restrict_to(olen): | |
| return option_from_wire_parser(otype, parser) | |
| def register_type(implementation: Any, otype: OptionType) -> None: | |
| """Register the implementation of an option type. | |
| *implementation*, a ``class``, is a subclass of ``dns.edns.Option``. | |
| *otype*, an ``int``, is the option type. | |
| """ | |
| _type_to_class[otype] = implementation | |
| ### BEGIN generated OptionType constants | |
| NSID = OptionType.NSID | |
| DAU = OptionType.DAU | |
| DHU = OptionType.DHU | |
| N3U = OptionType.N3U | |
| ECS = OptionType.ECS | |
| EXPIRE = OptionType.EXPIRE | |
| COOKIE = OptionType.COOKIE | |
| KEEPALIVE = OptionType.KEEPALIVE | |
| PADDING = OptionType.PADDING | |
| CHAIN = OptionType.CHAIN | |
| EDE = OptionType.EDE | |
| ### END generated OptionType constants | |