|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""DNS rdata.""" |
|
|
|
import base64 |
|
import binascii |
|
import inspect |
|
import io |
|
import itertools |
|
import random |
|
from importlib import import_module |
|
from typing import Any, Dict, Optional, Tuple, Union |
|
|
|
import dns.exception |
|
import dns.immutable |
|
import dns.ipv4 |
|
import dns.ipv6 |
|
import dns.name |
|
import dns.rdataclass |
|
import dns.rdatatype |
|
import dns.tokenizer |
|
import dns.ttl |
|
import dns.wire |
|
|
|
_chunksize = 32 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_allow_relative_comparisons = True |
|
|
|
|
|
class NoRelativeRdataOrdering(dns.exception.DNSException): |
|
"""An attempt was made to do an ordered comparison of one or more |
|
rdata with relative names. The only reliable way of sorting rdata |
|
is to use non-relativized rdata. |
|
|
|
""" |
|
|
|
|
|
def _wordbreak(data, chunksize=_chunksize, separator=b" "): |
|
"""Break a binary string into chunks of chunksize characters separated by |
|
a space. |
|
""" |
|
|
|
if not chunksize: |
|
return data.decode() |
|
return separator.join( |
|
[data[i : i + chunksize] for i in range(0, len(data), chunksize)] |
|
).decode() |
|
|
|
|
|
|
|
|
|
|
|
def _hexify(data, chunksize=_chunksize, separator=b" ", **kw): |
|
"""Convert a binary string into its hex encoding, broken up into chunks |
|
of chunksize characters separated by a separator. |
|
""" |
|
|
|
return _wordbreak(binascii.hexlify(data), chunksize, separator) |
|
|
|
|
|
def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw): |
|
"""Convert a binary string into its base64 encoding, broken up into chunks |
|
of chunksize characters separated by a separator. |
|
""" |
|
|
|
return _wordbreak(base64.b64encode(data), chunksize, separator) |
|
|
|
|
|
|
|
|
|
__escaped = b'"\\' |
|
|
|
|
|
def _escapify(qstring): |
|
"""Escape the characters in a quoted string which need it.""" |
|
|
|
if isinstance(qstring, str): |
|
qstring = qstring.encode() |
|
if not isinstance(qstring, bytearray): |
|
qstring = bytearray(qstring) |
|
|
|
text = "" |
|
for c in qstring: |
|
if c in __escaped: |
|
text += "\\" + chr(c) |
|
elif c >= 0x20 and c < 0x7F: |
|
text += chr(c) |
|
else: |
|
text += "\\%03d" % c |
|
return text |
|
|
|
|
|
def _truncate_bitmap(what): |
|
"""Determine the index of greatest byte that isn't all zeros, and |
|
return the bitmap that contains all the bytes less than that index. |
|
""" |
|
|
|
for i in range(len(what) - 1, -1, -1): |
|
if what[i] != 0: |
|
return what[0 : i + 1] |
|
return what[0:1] |
|
|
|
|
|
|
|
_constify = dns.immutable.constify |
|
|
|
|
|
@dns.immutable.immutable |
|
class Rdata: |
|
"""Base class for all DNS rdata types.""" |
|
|
|
__slots__ = ["rdclass", "rdtype", "rdcomment"] |
|
|
|
def __init__(self, rdclass, rdtype): |
|
"""Initialize an rdata. |
|
|
|
*rdclass*, an ``int`` is the rdataclass of the Rdata. |
|
|
|
*rdtype*, an ``int`` is the rdatatype of the Rdata. |
|
""" |
|
|
|
self.rdclass = self._as_rdataclass(rdclass) |
|
self.rdtype = self._as_rdatatype(rdtype) |
|
self.rdcomment = None |
|
|
|
def _get_all_slots(self): |
|
return itertools.chain.from_iterable( |
|
getattr(cls, "__slots__", []) for cls in self.__class__.__mro__ |
|
) |
|
|
|
def __getstate__(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
state = {} |
|
for slot in self._get_all_slots(): |
|
state[slot] = getattr(self, slot) |
|
return state |
|
|
|
def __setstate__(self, state): |
|
for slot, val in state.items(): |
|
object.__setattr__(self, slot, val) |
|
if not hasattr(self, "rdcomment"): |
|
|
|
|
|
object.__setattr__(self, "rdcomment", None) |
|
|
|
def covers(self) -> dns.rdatatype.RdataType: |
|
"""Return the type a Rdata covers. |
|
|
|
DNS SIG/RRSIG rdatas apply to a specific type; this type is |
|
returned by the covers() function. If the rdata type is not |
|
SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when |
|
creating rdatasets, allowing the rdataset to contain only RRSIGs |
|
of a particular type, e.g. RRSIG(NS). |
|
|
|
Returns a ``dns.rdatatype.RdataType``. |
|
""" |
|
|
|
return dns.rdatatype.NONE |
|
|
|
def extended_rdatatype(self) -> int: |
|
"""Return a 32-bit type value, the least significant 16 bits of |
|
which are the ordinary DNS type, and the upper 16 bits of which are |
|
the "covered" type, if any. |
|
|
|
Returns an ``int``. |
|
""" |
|
|
|
return self.covers() << 16 | self.rdtype |
|
|
|
def to_text( |
|
self, |
|
origin: Optional[dns.name.Name] = None, |
|
relativize: bool = True, |
|
**kw: Dict[str, Any], |
|
) -> str: |
|
"""Convert an rdata to text format. |
|
|
|
Returns a ``str``. |
|
""" |
|
|
|
raise NotImplementedError |
|
|
|
def _to_wire( |
|
self, |
|
file: Optional[Any], |
|
compress: Optional[dns.name.CompressType] = None, |
|
origin: Optional[dns.name.Name] = None, |
|
canonicalize: bool = False, |
|
) -> bytes: |
|
raise NotImplementedError |
|
|
|
def to_wire( |
|
self, |
|
file: Optional[Any] = None, |
|
compress: Optional[dns.name.CompressType] = None, |
|
origin: Optional[dns.name.Name] = None, |
|
canonicalize: bool = False, |
|
) -> bytes: |
|
"""Convert an rdata to wire format. |
|
|
|
Returns a ``bytes`` or ``None``. |
|
""" |
|
|
|
if file: |
|
return self._to_wire(file, compress, origin, canonicalize) |
|
else: |
|
f = io.BytesIO() |
|
self._to_wire(f, compress, origin, canonicalize) |
|
return f.getvalue() |
|
|
|
def to_generic( |
|
self, origin: Optional[dns.name.Name] = None |
|
) -> "dns.rdata.GenericRdata": |
|
"""Creates a dns.rdata.GenericRdata equivalent of this rdata. |
|
|
|
Returns a ``dns.rdata.GenericRdata``. |
|
""" |
|
return dns.rdata.GenericRdata( |
|
self.rdclass, self.rdtype, self.to_wire(origin=origin) |
|
) |
|
|
|
def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes: |
|
"""Convert rdata to a format suitable for digesting in hashes. This |
|
is also the DNSSEC canonical form. |
|
|
|
Returns a ``bytes``. |
|
""" |
|
|
|
return self.to_wire(origin=origin, canonicalize=True) |
|
|
|
def __repr__(self): |
|
covers = self.covers() |
|
if covers == dns.rdatatype.NONE: |
|
ctext = "" |
|
else: |
|
ctext = "(" + dns.rdatatype.to_text(covers) + ")" |
|
return ( |
|
"<DNS " |
|
+ dns.rdataclass.to_text(self.rdclass) |
|
+ " " |
|
+ dns.rdatatype.to_text(self.rdtype) |
|
+ ctext |
|
+ " rdata: " |
|
+ str(self) |
|
+ ">" |
|
) |
|
|
|
def __str__(self): |
|
return self.to_text() |
|
|
|
def _cmp(self, other): |
|
"""Compare an rdata with another rdata of the same rdtype and |
|
rdclass. |
|
|
|
For rdata with only absolute names: |
|
Return < 0 if self < other in the DNSSEC ordering, 0 if self |
|
== other, and > 0 if self > other. |
|
For rdata with at least one relative names: |
|
The rdata sorts before any rdata with only absolute names. |
|
When compared with another relative rdata, all names are |
|
made absolute as if they were relative to the root, as the |
|
proper origin is not available. While this creates a stable |
|
ordering, it is NOT guaranteed to be the DNSSEC ordering. |
|
In the future, all ordering comparisons for rdata with |
|
relative names will be disallowed. |
|
""" |
|
try: |
|
our = self.to_digestable() |
|
our_relative = False |
|
except dns.name.NeedAbsoluteNameOrOrigin: |
|
if _allow_relative_comparisons: |
|
our = self.to_digestable(dns.name.root) |
|
our_relative = True |
|
try: |
|
their = other.to_digestable() |
|
their_relative = False |
|
except dns.name.NeedAbsoluteNameOrOrigin: |
|
if _allow_relative_comparisons: |
|
their = other.to_digestable(dns.name.root) |
|
their_relative = True |
|
if _allow_relative_comparisons: |
|
if our_relative != their_relative: |
|
|
|
|
|
if our_relative: |
|
return -1 |
|
else: |
|
return 1 |
|
elif our_relative or their_relative: |
|
raise NoRelativeRdataOrdering |
|
if our == their: |
|
return 0 |
|
elif our > their: |
|
return 1 |
|
else: |
|
return -1 |
|
|
|
def __eq__(self, other): |
|
if not isinstance(other, Rdata): |
|
return False |
|
if self.rdclass != other.rdclass or self.rdtype != other.rdtype: |
|
return False |
|
our_relative = False |
|
their_relative = False |
|
try: |
|
our = self.to_digestable() |
|
except dns.name.NeedAbsoluteNameOrOrigin: |
|
our = self.to_digestable(dns.name.root) |
|
our_relative = True |
|
try: |
|
their = other.to_digestable() |
|
except dns.name.NeedAbsoluteNameOrOrigin: |
|
their = other.to_digestable(dns.name.root) |
|
their_relative = True |
|
if our_relative != their_relative: |
|
return False |
|
return our == their |
|
|
|
def __ne__(self, other): |
|
if not isinstance(other, Rdata): |
|
return True |
|
if self.rdclass != other.rdclass or self.rdtype != other.rdtype: |
|
return True |
|
return not self.__eq__(other) |
|
|
|
def __lt__(self, other): |
|
if ( |
|
not isinstance(other, Rdata) |
|
or self.rdclass != other.rdclass |
|
or self.rdtype != other.rdtype |
|
): |
|
return NotImplemented |
|
return self._cmp(other) < 0 |
|
|
|
def __le__(self, other): |
|
if ( |
|
not isinstance(other, Rdata) |
|
or self.rdclass != other.rdclass |
|
or self.rdtype != other.rdtype |
|
): |
|
return NotImplemented |
|
return self._cmp(other) <= 0 |
|
|
|
def __ge__(self, other): |
|
if ( |
|
not isinstance(other, Rdata) |
|
or self.rdclass != other.rdclass |
|
or self.rdtype != other.rdtype |
|
): |
|
return NotImplemented |
|
return self._cmp(other) >= 0 |
|
|
|
def __gt__(self, other): |
|
if ( |
|
not isinstance(other, Rdata) |
|
or self.rdclass != other.rdclass |
|
or self.rdtype != other.rdtype |
|
): |
|
return NotImplemented |
|
return self._cmp(other) > 0 |
|
|
|
def __hash__(self): |
|
return hash(self.to_digestable(dns.name.root)) |
|
|
|
@classmethod |
|
def from_text( |
|
cls, |
|
rdclass: dns.rdataclass.RdataClass, |
|
rdtype: dns.rdatatype.RdataType, |
|
tok: dns.tokenizer.Tokenizer, |
|
origin: Optional[dns.name.Name] = None, |
|
relativize: bool = True, |
|
relativize_to: Optional[dns.name.Name] = None, |
|
) -> "Rdata": |
|
raise NotImplementedError |
|
|
|
@classmethod |
|
def from_wire_parser( |
|
cls, |
|
rdclass: dns.rdataclass.RdataClass, |
|
rdtype: dns.rdatatype.RdataType, |
|
parser: dns.wire.Parser, |
|
origin: Optional[dns.name.Name] = None, |
|
) -> "Rdata": |
|
raise NotImplementedError |
|
|
|
def replace(self, **kwargs: Any) -> "Rdata": |
|
""" |
|
Create a new Rdata instance based on the instance replace was |
|
invoked on. It is possible to pass different parameters to |
|
override the corresponding properties of the base Rdata. |
|
|
|
Any field specific to the Rdata type can be replaced, but the |
|
*rdtype* and *rdclass* fields cannot. |
|
|
|
Returns an instance of the same Rdata subclass as *self*. |
|
""" |
|
|
|
|
|
parameters = inspect.signature(self.__init__).parameters |
|
|
|
|
|
|
|
for key in kwargs: |
|
if key == "rdcomment": |
|
continue |
|
if key not in parameters: |
|
raise AttributeError( |
|
"'{}' object has no attribute '{}'".format( |
|
self.__class__.__name__, key |
|
) |
|
) |
|
if key in ("rdclass", "rdtype"): |
|
raise AttributeError( |
|
"Cannot overwrite '{}' attribute '{}'".format( |
|
self.__class__.__name__, key |
|
) |
|
) |
|
|
|
|
|
|
|
args = (kwargs.get(key, getattr(self, key)) for key in parameters) |
|
|
|
|
|
rd = self.__class__(*args) |
|
|
|
|
|
rdcomment = kwargs.get("rdcomment", self.rdcomment) |
|
if rdcomment is not None: |
|
object.__setattr__(rd, "rdcomment", rdcomment) |
|
return rd |
|
|
|
|
|
|
|
|
|
@classmethod |
|
def _as_rdataclass(cls, value): |
|
return dns.rdataclass.RdataClass.make(value) |
|
|
|
@classmethod |
|
def _as_rdatatype(cls, value): |
|
return dns.rdatatype.RdataType.make(value) |
|
|
|
@classmethod |
|
def _as_bytes( |
|
cls, |
|
value: Any, |
|
encode: bool = False, |
|
max_length: Optional[int] = None, |
|
empty_ok: bool = True, |
|
) -> bytes: |
|
if encode and isinstance(value, str): |
|
bvalue = value.encode() |
|
elif isinstance(value, bytearray): |
|
bvalue = bytes(value) |
|
elif isinstance(value, bytes): |
|
bvalue = value |
|
else: |
|
raise ValueError("not bytes") |
|
if max_length is not None and len(bvalue) > max_length: |
|
raise ValueError("too long") |
|
if not empty_ok and len(bvalue) == 0: |
|
raise ValueError("empty bytes not allowed") |
|
return bvalue |
|
|
|
@classmethod |
|
def _as_name(cls, value): |
|
|
|
|
|
|
|
if isinstance(value, str): |
|
return dns.name.from_text(value) |
|
elif not isinstance(value, dns.name.Name): |
|
raise ValueError("not a name") |
|
return value |
|
|
|
@classmethod |
|
def _as_uint8(cls, value): |
|
if not isinstance(value, int): |
|
raise ValueError("not an integer") |
|
if value < 0 or value > 255: |
|
raise ValueError("not a uint8") |
|
return value |
|
|
|
@classmethod |
|
def _as_uint16(cls, value): |
|
if not isinstance(value, int): |
|
raise ValueError("not an integer") |
|
if value < 0 or value > 65535: |
|
raise ValueError("not a uint16") |
|
return value |
|
|
|
@classmethod |
|
def _as_uint32(cls, value): |
|
if not isinstance(value, int): |
|
raise ValueError("not an integer") |
|
if value < 0 or value > 4294967295: |
|
raise ValueError("not a uint32") |
|
return value |
|
|
|
@classmethod |
|
def _as_uint48(cls, value): |
|
if not isinstance(value, int): |
|
raise ValueError("not an integer") |
|
if value < 0 or value > 281474976710655: |
|
raise ValueError("not a uint48") |
|
return value |
|
|
|
@classmethod |
|
def _as_int(cls, value, low=None, high=None): |
|
if not isinstance(value, int): |
|
raise ValueError("not an integer") |
|
if low is not None and value < low: |
|
raise ValueError("value too small") |
|
if high is not None and value > high: |
|
raise ValueError("value too large") |
|
return value |
|
|
|
@classmethod |
|
def _as_ipv4_address(cls, value): |
|
if isinstance(value, str): |
|
return dns.ipv4.canonicalize(value) |
|
elif isinstance(value, bytes): |
|
return dns.ipv4.inet_ntoa(value) |
|
else: |
|
raise ValueError("not an IPv4 address") |
|
|
|
@classmethod |
|
def _as_ipv6_address(cls, value): |
|
if isinstance(value, str): |
|
return dns.ipv6.canonicalize(value) |
|
elif isinstance(value, bytes): |
|
return dns.ipv6.inet_ntoa(value) |
|
else: |
|
raise ValueError("not an IPv6 address") |
|
|
|
@classmethod |
|
def _as_bool(cls, value): |
|
if isinstance(value, bool): |
|
return value |
|
else: |
|
raise ValueError("not a boolean") |
|
|
|
@classmethod |
|
def _as_ttl(cls, value): |
|
if isinstance(value, int): |
|
return cls._as_int(value, 0, dns.ttl.MAX_TTL) |
|
elif isinstance(value, str): |
|
return dns.ttl.from_text(value) |
|
else: |
|
raise ValueError("not a TTL") |
|
|
|
@classmethod |
|
def _as_tuple(cls, value, as_value): |
|
try: |
|
|
|
|
|
return (as_value(value),) |
|
except Exception: |
|
|
|
|
|
return tuple(as_value(v) for v in value) |
|
|
|
|
|
|
|
@classmethod |
|
def _processing_order(cls, iterable): |
|
items = list(iterable) |
|
random.shuffle(items) |
|
return items |
|
|
|
|
|
@dns.immutable.immutable |
|
class GenericRdata(Rdata): |
|
"""Generic Rdata Class |
|
|
|
This class is used for rdata types for which we have no better |
|
implementation. It implements the DNS "unknown RRs" scheme. |
|
""" |
|
|
|
__slots__ = ["data"] |
|
|
|
def __init__(self, rdclass, rdtype, data): |
|
super().__init__(rdclass, rdtype) |
|
self.data = data |
|
|
|
def to_text( |
|
self, |
|
origin: Optional[dns.name.Name] = None, |
|
relativize: bool = True, |
|
**kw: Dict[str, Any], |
|
) -> str: |
|
return r"\# %d " % len(self.data) + _hexify(self.data, **kw) |
|
|
|
@classmethod |
|
def from_text( |
|
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None |
|
): |
|
token = tok.get() |
|
if not token.is_identifier() or token.value != r"\#": |
|
raise dns.exception.SyntaxError(r"generic rdata does not start with \#") |
|
length = tok.get_int() |
|
hex = tok.concatenate_remaining_identifiers(True).encode() |
|
data = binascii.unhexlify(hex) |
|
if len(data) != length: |
|
raise dns.exception.SyntaxError("generic rdata hex data has wrong length") |
|
return cls(rdclass, rdtype, data) |
|
|
|
def _to_wire(self, file, compress=None, origin=None, canonicalize=False): |
|
file.write(self.data) |
|
|
|
@classmethod |
|
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None): |
|
return cls(rdclass, rdtype, parser.get_remaining()) |
|
|
|
|
|
_rdata_classes: Dict[Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any] = ( |
|
{} |
|
) |
|
_module_prefix = "dns.rdtypes" |
|
|
|
|
|
def get_rdata_class(rdclass, rdtype): |
|
cls = _rdata_classes.get((rdclass, rdtype)) |
|
if not cls: |
|
cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype)) |
|
if not cls: |
|
rdclass_text = dns.rdataclass.to_text(rdclass) |
|
rdtype_text = dns.rdatatype.to_text(rdtype) |
|
rdtype_text = rdtype_text.replace("-", "_") |
|
try: |
|
mod = import_module( |
|
".".join([_module_prefix, rdclass_text, rdtype_text]) |
|
) |
|
cls = getattr(mod, rdtype_text) |
|
_rdata_classes[(rdclass, rdtype)] = cls |
|
except ImportError: |
|
try: |
|
mod = import_module(".".join([_module_prefix, "ANY", rdtype_text])) |
|
cls = getattr(mod, rdtype_text) |
|
_rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls |
|
_rdata_classes[(rdclass, rdtype)] = cls |
|
except ImportError: |
|
pass |
|
if not cls: |
|
cls = GenericRdata |
|
_rdata_classes[(rdclass, rdtype)] = cls |
|
return cls |
|
|
|
|
|
def from_text( |
|
rdclass: Union[dns.rdataclass.RdataClass, str], |
|
rdtype: Union[dns.rdatatype.RdataType, str], |
|
tok: Union[dns.tokenizer.Tokenizer, str], |
|
origin: Optional[dns.name.Name] = None, |
|
relativize: bool = True, |
|
relativize_to: Optional[dns.name.Name] = None, |
|
idna_codec: Optional[dns.name.IDNACodec] = None, |
|
) -> Rdata: |
|
"""Build an rdata object from text format. |
|
|
|
This function attempts to dynamically load a class which |
|
implements the specified rdata class and type. If there is no |
|
class-and-type-specific implementation, the GenericRdata class |
|
is used. |
|
|
|
Once a class is chosen, its from_text() class method is called |
|
with the parameters to this function. |
|
|
|
If *tok* is a ``str``, then a tokenizer is created and the string |
|
is used as its input. |
|
|
|
*rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. |
|
|
|
*rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. |
|
|
|
*tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``. |
|
|
|
*origin*, a ``dns.name.Name`` (or ``None``), the |
|
origin to use for relative names. |
|
|
|
*relativize*, a ``bool``. If true, name will be relativized. |
|
|
|
*relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use |
|
when relativizing names. If not set, the *origin* value will be used. |
|
|
|
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA |
|
encoder/decoder to use if a tokenizer needs to be created. If |
|
``None``, the default IDNA 2003 encoder/decoder is used. If a |
|
tokenizer is not created, then the codec associated with the tokenizer |
|
is the one that is used. |
|
|
|
Returns an instance of the chosen Rdata subclass. |
|
|
|
""" |
|
if isinstance(tok, str): |
|
tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec) |
|
rdclass = dns.rdataclass.RdataClass.make(rdclass) |
|
rdtype = dns.rdatatype.RdataType.make(rdtype) |
|
cls = get_rdata_class(rdclass, rdtype) |
|
with dns.exception.ExceptionWrapper(dns.exception.SyntaxError): |
|
rdata = None |
|
if cls != GenericRdata: |
|
|
|
token = tok.get() |
|
tok.unget(token) |
|
if token.is_identifier() and token.value == r"\#": |
|
|
|
|
|
|
|
|
|
|
|
grdata = GenericRdata.from_text( |
|
rdclass, rdtype, tok, origin, relativize, relativize_to |
|
) |
|
rdata = from_wire( |
|
rdclass, rdtype, grdata.data, 0, len(grdata.data), origin |
|
) |
|
|
|
|
|
|
|
|
|
|
|
rwire = rdata.to_wire() |
|
if rwire != grdata.data: |
|
raise dns.exception.SyntaxError( |
|
"compressed data in " |
|
"generic syntax form " |
|
"of known rdatatype" |
|
) |
|
if rdata is None: |
|
rdata = cls.from_text( |
|
rdclass, rdtype, tok, origin, relativize, relativize_to |
|
) |
|
token = tok.get_eol_as_token() |
|
if token.comment is not None: |
|
object.__setattr__(rdata, "rdcomment", token.comment) |
|
return rdata |
|
|
|
|
|
def from_wire_parser( |
|
rdclass: Union[dns.rdataclass.RdataClass, str], |
|
rdtype: Union[dns.rdatatype.RdataType, str], |
|
parser: dns.wire.Parser, |
|
origin: Optional[dns.name.Name] = None, |
|
) -> Rdata: |
|
"""Build an rdata object from wire format |
|
|
|
This function attempts to dynamically load a class which |
|
implements the specified rdata class and type. If there is no |
|
class-and-type-specific implementation, the GenericRdata class |
|
is used. |
|
|
|
Once a class is chosen, its from_wire() class method is called |
|
with the parameters to this function. |
|
|
|
*rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. |
|
|
|
*rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. |
|
|
|
*parser*, a ``dns.wire.Parser``, the parser, which should be |
|
restricted to the rdata length. |
|
|
|
*origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, |
|
then names will be relativized to this origin. |
|
|
|
Returns an instance of the chosen Rdata subclass. |
|
""" |
|
|
|
rdclass = dns.rdataclass.RdataClass.make(rdclass) |
|
rdtype = dns.rdatatype.RdataType.make(rdtype) |
|
cls = get_rdata_class(rdclass, rdtype) |
|
with dns.exception.ExceptionWrapper(dns.exception.FormError): |
|
return cls.from_wire_parser(rdclass, rdtype, parser, origin) |
|
|
|
|
|
def from_wire( |
|
rdclass: Union[dns.rdataclass.RdataClass, str], |
|
rdtype: Union[dns.rdatatype.RdataType, str], |
|
wire: bytes, |
|
current: int, |
|
rdlen: int, |
|
origin: Optional[dns.name.Name] = None, |
|
) -> Rdata: |
|
"""Build an rdata object from wire format |
|
|
|
This function attempts to dynamically load a class which |
|
implements the specified rdata class and type. If there is no |
|
class-and-type-specific implementation, the GenericRdata class |
|
is used. |
|
|
|
Once a class is chosen, its from_wire() class method is called |
|
with the parameters to this function. |
|
|
|
*rdclass*, an ``int``, the rdataclass. |
|
|
|
*rdtype*, an ``int``, the rdatatype. |
|
|
|
*wire*, a ``bytes``, the wire-format message. |
|
|
|
*current*, an ``int``, the offset in wire of the beginning of |
|
the rdata. |
|
|
|
*rdlen*, an ``int``, the length of the wire-format rdata |
|
|
|
*origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, |
|
then names will be relativized to this origin. |
|
|
|
Returns an instance of the chosen Rdata subclass. |
|
""" |
|
parser = dns.wire.Parser(wire, current) |
|
with parser.restrict_to(rdlen): |
|
return from_wire_parser(rdclass, rdtype, parser, origin) |
|
|
|
|
|
class RdatatypeExists(dns.exception.DNSException): |
|
"""DNS rdatatype already exists.""" |
|
|
|
supp_kwargs = {"rdclass", "rdtype"} |
|
fmt = ( |
|
"The rdata type with class {rdclass:d} and rdtype {rdtype:d} " |
|
+ "already exists." |
|
) |
|
|
|
|
|
def register_type( |
|
implementation: Any, |
|
rdtype: int, |
|
rdtype_text: str, |
|
is_singleton: bool = False, |
|
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, |
|
) -> None: |
|
"""Dynamically register a module to handle an rdatatype. |
|
|
|
*implementation*, a module implementing the type in the usual dnspython |
|
way. |
|
|
|
*rdtype*, an ``int``, the rdatatype to register. |
|
|
|
*rdtype_text*, a ``str``, the textual form of the rdatatype. |
|
|
|
*is_singleton*, a ``bool``, indicating if the type is a singleton (i.e. |
|
RRsets of the type can have only one member.) |
|
|
|
*rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if |
|
it applies to all classes. |
|
""" |
|
|
|
rdtype = dns.rdatatype.RdataType.make(rdtype) |
|
existing_cls = get_rdata_class(rdclass, rdtype) |
|
if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype): |
|
raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype) |
|
_rdata_classes[(rdclass, rdtype)] = getattr( |
|
implementation, rdtype_text.replace("-", "_") |
|
) |
|
dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton) |
|
|