|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""DNS Messages""" |
|
|
|
import contextlib |
|
import io |
|
import time |
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
import dns.edns |
|
import dns.entropy |
|
import dns.enum |
|
import dns.exception |
|
import dns.flags |
|
import dns.name |
|
import dns.opcode |
|
import dns.rcode |
|
import dns.rdata |
|
import dns.rdataclass |
|
import dns.rdatatype |
|
import dns.rdtypes.ANY.OPT |
|
import dns.rdtypes.ANY.TSIG |
|
import dns.renderer |
|
import dns.rrset |
|
import dns.tsig |
|
import dns.ttl |
|
import dns.wire |
|
|
|
|
|
class ShortHeader(dns.exception.FormError): |
|
"""The DNS packet passed to from_wire() is too short.""" |
|
|
|
|
|
class TrailingJunk(dns.exception.FormError): |
|
"""The DNS packet passed to from_wire() has extra junk at the end of it.""" |
|
|
|
|
|
class UnknownHeaderField(dns.exception.DNSException): |
|
"""The header field name was not recognized when converting from text |
|
into a message.""" |
|
|
|
|
|
class BadEDNS(dns.exception.FormError): |
|
"""An OPT record occurred somewhere other than |
|
the additional data section.""" |
|
|
|
|
|
class BadTSIG(dns.exception.FormError): |
|
"""A TSIG record occurred somewhere other than the end of |
|
the additional data section.""" |
|
|
|
|
|
class UnknownTSIGKey(dns.exception.DNSException): |
|
"""A TSIG with an unknown key was received.""" |
|
|
|
|
|
class Truncated(dns.exception.DNSException): |
|
"""The truncated flag is set.""" |
|
|
|
supp_kwargs = {"message"} |
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
def message(self): |
|
"""As much of the message as could be processed. |
|
|
|
Returns a ``dns.message.Message``. |
|
""" |
|
return self.kwargs["message"] |
|
|
|
|
|
class NotQueryResponse(dns.exception.DNSException): |
|
"""Message is not a response to a query.""" |
|
|
|
|
|
class ChainTooLong(dns.exception.DNSException): |
|
"""The CNAME chain is too long.""" |
|
|
|
|
|
class AnswerForNXDOMAIN(dns.exception.DNSException): |
|
"""The rcode is NXDOMAIN but an answer was found.""" |
|
|
|
|
|
class NoPreviousName(dns.exception.SyntaxError): |
|
"""No previous name was known.""" |
|
|
|
|
|
class MessageSection(dns.enum.IntEnum): |
|
"""Message sections""" |
|
|
|
QUESTION = 0 |
|
ANSWER = 1 |
|
AUTHORITY = 2 |
|
ADDITIONAL = 3 |
|
|
|
@classmethod |
|
def _maximum(cls): |
|
return 3 |
|
|
|
|
|
class MessageError: |
|
def __init__(self, exception: Exception, offset: int): |
|
self.exception = exception |
|
self.offset = offset |
|
|
|
|
|
DEFAULT_EDNS_PAYLOAD = 1232 |
|
MAX_CHAIN = 16 |
|
|
|
IndexKeyType = Tuple[ |
|
int, |
|
dns.name.Name, |
|
dns.rdataclass.RdataClass, |
|
dns.rdatatype.RdataType, |
|
Optional[dns.rdatatype.RdataType], |
|
Optional[dns.rdataclass.RdataClass], |
|
] |
|
IndexType = Dict[IndexKeyType, dns.rrset.RRset] |
|
SectionType = Union[int, str, List[dns.rrset.RRset]] |
|
|
|
|
|
class Message: |
|
"""A DNS message.""" |
|
|
|
_section_enum = MessageSection |
|
|
|
def __init__(self, id: Optional[int] = None): |
|
if id is None: |
|
self.id = dns.entropy.random_16() |
|
else: |
|
self.id = id |
|
self.flags = 0 |
|
self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []] |
|
self.opt: Optional[dns.rrset.RRset] = None |
|
self.request_payload = 0 |
|
self.pad = 0 |
|
self.keyring: Any = None |
|
self.tsig: Optional[dns.rrset.RRset] = None |
|
self.request_mac = b"" |
|
self.xfr = False |
|
self.origin: Optional[dns.name.Name] = None |
|
self.tsig_ctx: Optional[Any] = None |
|
self.index: IndexType = {} |
|
self.errors: List[MessageError] = [] |
|
self.time = 0.0 |
|
|
|
@property |
|
def question(self) -> List[dns.rrset.RRset]: |
|
"""The question section.""" |
|
return self.sections[0] |
|
|
|
@question.setter |
|
def question(self, v): |
|
self.sections[0] = v |
|
|
|
@property |
|
def answer(self) -> List[dns.rrset.RRset]: |
|
"""The answer section.""" |
|
return self.sections[1] |
|
|
|
@answer.setter |
|
def answer(self, v): |
|
self.sections[1] = v |
|
|
|
@property |
|
def authority(self) -> List[dns.rrset.RRset]: |
|
"""The authority section.""" |
|
return self.sections[2] |
|
|
|
@authority.setter |
|
def authority(self, v): |
|
self.sections[2] = v |
|
|
|
@property |
|
def additional(self) -> List[dns.rrset.RRset]: |
|
"""The additional data section.""" |
|
return self.sections[3] |
|
|
|
@additional.setter |
|
def additional(self, v): |
|
self.sections[3] = v |
|
|
|
def __repr__(self): |
|
return "<DNS message, ID " + repr(self.id) + ">" |
|
|
|
def __str__(self): |
|
return self.to_text() |
|
|
|
def to_text( |
|
self, |
|
origin: Optional[dns.name.Name] = None, |
|
relativize: bool = True, |
|
**kw: Dict[str, Any], |
|
) -> str: |
|
"""Convert the message to text. |
|
|
|
The *origin*, *relativize*, and any other keyword |
|
arguments are passed to the RRset ``to_wire()`` method. |
|
|
|
Returns a ``str``. |
|
""" |
|
|
|
s = io.StringIO() |
|
s.write("id %d\n" % self.id) |
|
s.write("opcode %s\n" % dns.opcode.to_text(self.opcode())) |
|
s.write("rcode %s\n" % dns.rcode.to_text(self.rcode())) |
|
s.write("flags %s\n" % dns.flags.to_text(self.flags)) |
|
if self.edns >= 0: |
|
s.write("edns %s\n" % self.edns) |
|
if self.ednsflags != 0: |
|
s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags)) |
|
s.write("payload %d\n" % self.payload) |
|
for opt in self.options: |
|
s.write("option %s\n" % opt.to_text()) |
|
for name, which in self._section_enum.__members__.items(): |
|
s.write(f";{name}\n") |
|
for rrset in self.section_from_number(which): |
|
s.write(rrset.to_text(origin, relativize, **kw)) |
|
s.write("\n") |
|
|
|
|
|
|
|
|
|
|
|
return s.getvalue()[:-1] |
|
|
|
def __eq__(self, other): |
|
"""Two messages are equal if they have the same content in the |
|
header, question, answer, and authority sections. |
|
|
|
Returns a ``bool``. |
|
""" |
|
|
|
if not isinstance(other, Message): |
|
return False |
|
if self.id != other.id: |
|
return False |
|
if self.flags != other.flags: |
|
return False |
|
for i, section in enumerate(self.sections): |
|
other_section = other.sections[i] |
|
for n in section: |
|
if n not in other_section: |
|
return False |
|
for n in other_section: |
|
if n not in section: |
|
return False |
|
return True |
|
|
|
def __ne__(self, other): |
|
return not self.__eq__(other) |
|
|
|
def is_response(self, other: "Message") -> bool: |
|
"""Is *other*, also a ``dns.message.Message``, a response to this |
|
message? |
|
|
|
Returns a ``bool``. |
|
""" |
|
|
|
if ( |
|
other.flags & dns.flags.QR == 0 |
|
or self.id != other.id |
|
or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags) |
|
): |
|
return False |
|
if other.rcode() in { |
|
dns.rcode.FORMERR, |
|
dns.rcode.SERVFAIL, |
|
dns.rcode.NOTIMP, |
|
dns.rcode.REFUSED, |
|
}: |
|
|
|
|
|
|
|
if len(other.question) == 0: |
|
return True |
|
if dns.opcode.is_update(self.flags): |
|
|
|
|
|
|
|
|
|
return True |
|
for n in self.question: |
|
if n not in other.question: |
|
return False |
|
for n in other.question: |
|
if n not in self.question: |
|
return False |
|
return True |
|
|
|
def section_number(self, section: List[dns.rrset.RRset]) -> int: |
|
"""Return the "section number" of the specified section for use |
|
in indexing. |
|
|
|
*section* is one of the section attributes of this message. |
|
|
|
Raises ``ValueError`` if the section isn't known. |
|
|
|
Returns an ``int``. |
|
""" |
|
|
|
for i, our_section in enumerate(self.sections): |
|
if section is our_section: |
|
return self._section_enum(i) |
|
raise ValueError("unknown section") |
|
|
|
def section_from_number(self, number: int) -> List[dns.rrset.RRset]: |
|
"""Return the section list associated with the specified section |
|
number. |
|
|
|
*number* is a section number `int` or the text form of a section |
|
name. |
|
|
|
Raises ``ValueError`` if the section isn't known. |
|
|
|
Returns a ``list``. |
|
""" |
|
|
|
section = self._section_enum.make(number) |
|
return self.sections[section] |
|
|
|
def find_rrset( |
|
self, |
|
section: SectionType, |
|
name: dns.name.Name, |
|
rdclass: dns.rdataclass.RdataClass, |
|
rdtype: dns.rdatatype.RdataType, |
|
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, |
|
deleting: Optional[dns.rdataclass.RdataClass] = None, |
|
create: bool = False, |
|
force_unique: bool = False, |
|
idna_codec: Optional[dns.name.IDNACodec] = None, |
|
) -> dns.rrset.RRset: |
|
"""Find the RRset with the given attributes in the specified section. |
|
|
|
*section*, an ``int`` section number, a ``str`` section name, or one of |
|
the section attributes of this message. This specifies the |
|
the section of the message to search. For example:: |
|
|
|
my_message.find_rrset(my_message.answer, name, rdclass, rdtype) |
|
my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype) |
|
my_message.find_rrset("ANSWER", name, rdclass, rdtype) |
|
|
|
*name*, a ``dns.name.Name`` or ``str``, the name of the RRset. |
|
|
|
*rdclass*, an ``int`` or ``str``, the class of the RRset. |
|
|
|
*rdtype*, an ``int`` or ``str``, the type of the RRset. |
|
|
|
*covers*, an ``int`` or ``str``, the covers value of the RRset. |
|
The default is ``dns.rdatatype.NONE``. |
|
|
|
*deleting*, an ``int``, ``str``, or ``None``, the deleting value of the |
|
RRset. The default is ``None``. |
|
|
|
*create*, a ``bool``. If ``True``, create the RRset if it is not found. |
|
The created RRset is appended to *section*. |
|
|
|
*force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, |
|
create a new RRset regardless of whether a matching RRset exists |
|
already. The default is ``False``. This is useful when creating |
|
DDNS Update messages, as order matters for them. |
|
|
|
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA |
|
encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder |
|
is used. |
|
|
|
Raises ``KeyError`` if the RRset was not found and create was |
|
``False``. |
|
|
|
Returns a ``dns.rrset.RRset object``. |
|
""" |
|
|
|
if isinstance(section, int): |
|
section_number = section |
|
section = self.section_from_number(section_number) |
|
elif isinstance(section, str): |
|
section_number = self._section_enum.from_text(section) |
|
section = self.section_from_number(section_number) |
|
else: |
|
section_number = self.section_number(section) |
|
if isinstance(name, str): |
|
name = dns.name.from_text(name, idna_codec=idna_codec) |
|
rdtype = dns.rdatatype.RdataType.make(rdtype) |
|
rdclass = dns.rdataclass.RdataClass.make(rdclass) |
|
covers = dns.rdatatype.RdataType.make(covers) |
|
if deleting is not None: |
|
deleting = dns.rdataclass.RdataClass.make(deleting) |
|
key = (section_number, name, rdclass, rdtype, covers, deleting) |
|
if not force_unique: |
|
if self.index is not None: |
|
rrset = self.index.get(key) |
|
if rrset is not None: |
|
return rrset |
|
else: |
|
for rrset in section: |
|
if rrset.full_match(name, rdclass, rdtype, covers, deleting): |
|
return rrset |
|
if not create: |
|
raise KeyError |
|
rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting) |
|
section.append(rrset) |
|
if self.index is not None: |
|
self.index[key] = rrset |
|
return rrset |
|
|
|
def get_rrset( |
|
self, |
|
section: SectionType, |
|
name: dns.name.Name, |
|
rdclass: dns.rdataclass.RdataClass, |
|
rdtype: dns.rdatatype.RdataType, |
|
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, |
|
deleting: Optional[dns.rdataclass.RdataClass] = None, |
|
create: bool = False, |
|
force_unique: bool = False, |
|
idna_codec: Optional[dns.name.IDNACodec] = None, |
|
) -> Optional[dns.rrset.RRset]: |
|
"""Get the RRset with the given attributes in the specified section. |
|
|
|
If the RRset is not found, None is returned. |
|
|
|
*section*, an ``int`` section number, a ``str`` section name, or one of |
|
the section attributes of this message. This specifies the |
|
the section of the message to search. For example:: |
|
|
|
my_message.get_rrset(my_message.answer, name, rdclass, rdtype) |
|
my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype) |
|
my_message.get_rrset("ANSWER", name, rdclass, rdtype) |
|
|
|
*name*, a ``dns.name.Name`` or ``str``, the name of the RRset. |
|
|
|
*rdclass*, an ``int`` or ``str``, the class of the RRset. |
|
|
|
*rdtype*, an ``int`` or ``str``, the type of the RRset. |
|
|
|
*covers*, an ``int`` or ``str``, the covers value of the RRset. |
|
The default is ``dns.rdatatype.NONE``. |
|
|
|
*deleting*, an ``int``, ``str``, or ``None``, the deleting value of the |
|
RRset. The default is ``None``. |
|
|
|
*create*, a ``bool``. If ``True``, create the RRset if it is not found. |
|
The created RRset is appended to *section*. |
|
|
|
*force_unique*, a ``bool``. If ``True`` and *create* is also ``True``, |
|
create a new RRset regardless of whether a matching RRset exists |
|
already. The default is ``False``. This is useful when creating |
|
DDNS Update messages, as order matters for them. |
|
|
|
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA |
|
encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder |
|
is used. |
|
|
|
Returns a ``dns.rrset.RRset object`` or ``None``. |
|
""" |
|
|
|
try: |
|
rrset = self.find_rrset( |
|
section, |
|
name, |
|
rdclass, |
|
rdtype, |
|
covers, |
|
deleting, |
|
create, |
|
force_unique, |
|
idna_codec, |
|
) |
|
except KeyError: |
|
rrset = None |
|
return rrset |
|
|
|
def section_count(self, section: SectionType) -> int: |
|
"""Returns the number of records in the specified section. |
|
|
|
*section*, an ``int`` section number, a ``str`` section name, or one of |
|
the section attributes of this message. This specifies the |
|
the section of the message to count. For example:: |
|
|
|
my_message.section_count(my_message.answer) |
|
my_message.section_count(dns.message.ANSWER) |
|
my_message.section_count("ANSWER") |
|
""" |
|
|
|
if isinstance(section, int): |
|
section_number = section |
|
section = self.section_from_number(section_number) |
|
elif isinstance(section, str): |
|
section_number = self._section_enum.from_text(section) |
|
section = self.section_from_number(section_number) |
|
else: |
|
section_number = self.section_number(section) |
|
count = sum(max(1, len(rrs)) for rrs in section) |
|
if section_number == MessageSection.ADDITIONAL: |
|
if self.opt is not None: |
|
count += 1 |
|
if self.tsig is not None: |
|
count += 1 |
|
return count |
|
|
|
def _compute_opt_reserve(self) -> int: |
|
"""Compute the size required for the OPT RR, padding excluded""" |
|
if not self.opt: |
|
return 0 |
|
|
|
size = 11 |
|
|
|
|
|
|
|
|
|
for option in self.opt[0].options: |
|
wire = option.to_wire() |
|
|
|
size += len(wire) + 4 |
|
if self.pad: |
|
|
|
size += 4 |
|
return size |
|
|
|
def _compute_tsig_reserve(self) -> int: |
|
"""Compute the size required for the TSIG RR""" |
|
|
|
|
|
|
|
|
|
|
|
if not self.tsig: |
|
return 0 |
|
f = io.BytesIO() |
|
self.tsig.to_wire(f) |
|
return len(f.getvalue()) |
|
|
|
def to_wire( |
|
self, |
|
origin: Optional[dns.name.Name] = None, |
|
max_size: int = 0, |
|
multi: bool = False, |
|
tsig_ctx: Optional[Any] = None, |
|
prepend_length: bool = False, |
|
prefer_truncation: bool = False, |
|
**kw: Dict[str, Any], |
|
) -> bytes: |
|
"""Return a string containing the message in DNS compressed wire |
|
format. |
|
|
|
Additional keyword arguments are passed to the RRset ``to_wire()`` |
|
method. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``, the origin to be appended |
|
to any relative names. If ``None``, and the message has an origin |
|
attribute that is not ``None``, then it will be used. |
|
|
|
*max_size*, an ``int``, the maximum size of the wire format |
|
output; default is 0, which means "the message's request |
|
payload, if nonzero, or 65535". |
|
|
|
*multi*, a ``bool``, should be set to ``True`` if this message is |
|
part of a multiple message sequence. |
|
|
|
*tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the |
|
ongoing TSIG context, used when signing zone transfers. |
|
|
|
*prepend_length*, a ``bool``, should be set to ``True`` if the caller |
|
wants the message length prepended to the message itself. This is |
|
useful for messages sent over TCP, TLS (DoT), or QUIC (DoQ). |
|
|
|
*prefer_truncation*, a ``bool``, should be set to ``True`` if the caller |
|
wants the message to be truncated if it would otherwise exceed the |
|
maximum length. If the truncation occurs before the additional section, |
|
the TC bit will be set. |
|
|
|
Raises ``dns.exception.TooBig`` if *max_size* was exceeded. |
|
|
|
Returns a ``bytes``. |
|
""" |
|
|
|
if origin is None and self.origin is not None: |
|
origin = self.origin |
|
if max_size == 0: |
|
if self.request_payload != 0: |
|
max_size = self.request_payload |
|
else: |
|
max_size = 65535 |
|
if max_size < 512: |
|
max_size = 512 |
|
elif max_size > 65535: |
|
max_size = 65535 |
|
r = dns.renderer.Renderer(self.id, self.flags, max_size, origin) |
|
opt_reserve = self._compute_opt_reserve() |
|
r.reserve(opt_reserve) |
|
tsig_reserve = self._compute_tsig_reserve() |
|
r.reserve(tsig_reserve) |
|
try: |
|
for rrset in self.question: |
|
r.add_question(rrset.name, rrset.rdtype, rrset.rdclass) |
|
for rrset in self.answer: |
|
r.add_rrset(dns.renderer.ANSWER, rrset, **kw) |
|
for rrset in self.authority: |
|
r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw) |
|
for rrset in self.additional: |
|
r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw) |
|
except dns.exception.TooBig: |
|
if prefer_truncation: |
|
if r.section < dns.renderer.ADDITIONAL: |
|
r.flags |= dns.flags.TC |
|
else: |
|
raise |
|
r.release_reserved() |
|
if self.opt is not None: |
|
r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve) |
|
r.write_header() |
|
if self.tsig is not None: |
|
(new_tsig, ctx) = dns.tsig.sign( |
|
r.get_wire(), |
|
self.keyring, |
|
self.tsig[0], |
|
int(time.time()), |
|
self.request_mac, |
|
tsig_ctx, |
|
multi, |
|
) |
|
self.tsig.clear() |
|
self.tsig.add(new_tsig) |
|
r.add_rrset(dns.renderer.ADDITIONAL, self.tsig) |
|
r.write_header() |
|
if multi: |
|
self.tsig_ctx = ctx |
|
wire = r.get_wire() |
|
if prepend_length: |
|
wire = len(wire).to_bytes(2, "big") + wire |
|
return wire |
|
|
|
@staticmethod |
|
def _make_tsig( |
|
keyname, algorithm, time_signed, fudge, mac, original_id, error, other |
|
): |
|
tsig = dns.rdtypes.ANY.TSIG.TSIG( |
|
dns.rdataclass.ANY, |
|
dns.rdatatype.TSIG, |
|
algorithm, |
|
time_signed, |
|
fudge, |
|
mac, |
|
original_id, |
|
error, |
|
other, |
|
) |
|
return dns.rrset.from_rdata(keyname, 0, tsig) |
|
|
|
def use_tsig( |
|
self, |
|
keyring: Any, |
|
keyname: Optional[Union[dns.name.Name, str]] = None, |
|
fudge: int = 300, |
|
original_id: Optional[int] = None, |
|
tsig_error: int = 0, |
|
other_data: bytes = b"", |
|
algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, |
|
) -> None: |
|
"""When sending, a TSIG signature using the specified key |
|
should be added. |
|
|
|
*key*, a ``dns.tsig.Key`` is the key to use. If a key is specified, |
|
the *keyring* and *algorithm* fields are not used. |
|
|
|
*keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either |
|
the TSIG keyring or key to use. |
|
|
|
The format of a keyring dict is a mapping from TSIG key name, as |
|
``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``. |
|
If a ``dict`` *keyring* is specified but a *keyname* is not, the key |
|
used will be the first key in the *keyring*. Note that the order of |
|
keys in a dictionary is not defined, so applications should supply a |
|
keyname when a ``dict`` keyring is used, unless they know the keyring |
|
contains only one key. If a ``callable`` keyring is specified, the |
|
callable will be called with the message and the keyname, and is |
|
expected to return a key. |
|
|
|
*keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of |
|
this TSIG key to use; defaults to ``None``. If *keyring* is a |
|
``dict``, the key must be defined in it. If *keyring* is a |
|
``dns.tsig.Key``, this is ignored. |
|
|
|
*fudge*, an ``int``, the TSIG time fudge. |
|
|
|
*original_id*, an ``int``, the TSIG original id. If ``None``, |
|
the message's id is used. |
|
|
|
*tsig_error*, an ``int``, the TSIG error code. |
|
|
|
*other_data*, a ``bytes``, the TSIG other data. |
|
|
|
*algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is |
|
only used if *keyring* is a ``dict``, and the key entry is a ``bytes``. |
|
""" |
|
|
|
if isinstance(keyring, dns.tsig.Key): |
|
key = keyring |
|
keyname = key.name |
|
elif callable(keyring): |
|
key = keyring(self, keyname) |
|
else: |
|
if isinstance(keyname, str): |
|
keyname = dns.name.from_text(keyname) |
|
if keyname is None: |
|
keyname = next(iter(keyring)) |
|
key = keyring[keyname] |
|
if isinstance(key, bytes): |
|
key = dns.tsig.Key(keyname, key, algorithm) |
|
self.keyring = key |
|
if original_id is None: |
|
original_id = self.id |
|
self.tsig = self._make_tsig( |
|
keyname, |
|
self.keyring.algorithm, |
|
0, |
|
fudge, |
|
b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm], |
|
original_id, |
|
tsig_error, |
|
other_data, |
|
) |
|
|
|
@property |
|
def keyname(self) -> Optional[dns.name.Name]: |
|
if self.tsig: |
|
return self.tsig.name |
|
else: |
|
return None |
|
|
|
@property |
|
def keyalgorithm(self) -> Optional[dns.name.Name]: |
|
if self.tsig: |
|
return self.tsig[0].algorithm |
|
else: |
|
return None |
|
|
|
@property |
|
def mac(self) -> Optional[bytes]: |
|
if self.tsig: |
|
return self.tsig[0].mac |
|
else: |
|
return None |
|
|
|
@property |
|
def tsig_error(self) -> Optional[int]: |
|
if self.tsig: |
|
return self.tsig[0].error |
|
else: |
|
return None |
|
|
|
@property |
|
def had_tsig(self) -> bool: |
|
return bool(self.tsig) |
|
|
|
@staticmethod |
|
def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None): |
|
opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ()) |
|
return dns.rrset.from_rdata(dns.name.root, int(flags), opt) |
|
|
|
def use_edns( |
|
self, |
|
edns: Optional[Union[int, bool]] = 0, |
|
ednsflags: int = 0, |
|
payload: int = DEFAULT_EDNS_PAYLOAD, |
|
request_payload: Optional[int] = None, |
|
options: Optional[List[dns.edns.Option]] = None, |
|
pad: int = 0, |
|
) -> None: |
|
"""Configure EDNS behavior. |
|
|
|
*edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``, |
|
or ``-1`` means "do not use EDNS", and in this case the other parameters are |
|
ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0". |
|
|
|
*ednsflags*, an ``int``, the EDNS flag values. |
|
|
|
*payload*, an ``int``, is the EDNS sender's payload field, which is the maximum |
|
size of UDP datagram the sender can handle. I.e. how big a response to this |
|
message can be. |
|
|
|
*request_payload*, an ``int``, is the EDNS payload size to use when sending this |
|
message. If not specified, defaults to the value of *payload*. |
|
|
|
*options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options. |
|
|
|
*pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add |
|
padding bytes to make the message size a multiple of *pad*. Note that if |
|
padding is non-zero, an EDNS PADDING option will always be added to the |
|
message. |
|
""" |
|
|
|
if edns is None or edns is False: |
|
edns = -1 |
|
elif edns is True: |
|
edns = 0 |
|
if edns < 0: |
|
self.opt = None |
|
self.request_payload = 0 |
|
else: |
|
|
|
ednsflags &= 0xFF00FFFF |
|
ednsflags |= edns << 16 |
|
if options is None: |
|
options = [] |
|
self.opt = self._make_opt(ednsflags, payload, options) |
|
if request_payload is None: |
|
request_payload = payload |
|
self.request_payload = request_payload |
|
if pad < 0: |
|
raise ValueError("pad must be non-negative") |
|
self.pad = pad |
|
|
|
@property |
|
def edns(self) -> int: |
|
if self.opt: |
|
return (self.ednsflags & 0xFF0000) >> 16 |
|
else: |
|
return -1 |
|
|
|
@property |
|
def ednsflags(self) -> int: |
|
if self.opt: |
|
return self.opt.ttl |
|
else: |
|
return 0 |
|
|
|
@ednsflags.setter |
|
def ednsflags(self, v): |
|
if self.opt: |
|
self.opt.ttl = v |
|
elif v: |
|
self.opt = self._make_opt(v) |
|
|
|
@property |
|
def payload(self) -> int: |
|
if self.opt: |
|
return self.opt[0].payload |
|
else: |
|
return 0 |
|
|
|
@property |
|
def options(self) -> Tuple: |
|
if self.opt: |
|
return self.opt[0].options |
|
else: |
|
return () |
|
|
|
def want_dnssec(self, wanted: bool = True) -> None: |
|
"""Enable or disable 'DNSSEC desired' flag in requests. |
|
|
|
*wanted*, a ``bool``. If ``True``, then DNSSEC data is |
|
desired in the response, EDNS is enabled if required, and then |
|
the DO bit is set. If ``False``, the DO bit is cleared if |
|
EDNS is enabled. |
|
""" |
|
|
|
if wanted: |
|
self.ednsflags |= dns.flags.DO |
|
elif self.opt: |
|
self.ednsflags &= ~int(dns.flags.DO) |
|
|
|
def rcode(self) -> dns.rcode.Rcode: |
|
"""Return the rcode. |
|
|
|
Returns a ``dns.rcode.Rcode``. |
|
""" |
|
return dns.rcode.from_flags(int(self.flags), int(self.ednsflags)) |
|
|
|
def set_rcode(self, rcode: dns.rcode.Rcode) -> None: |
|
"""Set the rcode. |
|
|
|
*rcode*, a ``dns.rcode.Rcode``, is the rcode to set. |
|
""" |
|
(value, evalue) = dns.rcode.to_flags(rcode) |
|
self.flags &= 0xFFF0 |
|
self.flags |= value |
|
self.ednsflags &= 0x00FFFFFF |
|
self.ednsflags |= evalue |
|
|
|
def opcode(self) -> dns.opcode.Opcode: |
|
"""Return the opcode. |
|
|
|
Returns a ``dns.opcode.Opcode``. |
|
""" |
|
return dns.opcode.from_flags(int(self.flags)) |
|
|
|
def set_opcode(self, opcode: dns.opcode.Opcode) -> None: |
|
"""Set the opcode. |
|
|
|
*opcode*, a ``dns.opcode.Opcode``, is the opcode to set. |
|
""" |
|
self.flags &= 0x87FF |
|
self.flags |= dns.opcode.to_flags(opcode) |
|
|
|
def _get_one_rr_per_rrset(self, value): |
|
|
|
return value |
|
|
|
|
|
|
|
def _parse_rr_header(self, section, name, rdclass, rdtype): |
|
return (rdclass, rdtype, None, False) |
|
|
|
|
|
|
|
def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype): |
|
if rdtype == dns.rdatatype.OPT: |
|
if ( |
|
section != MessageSection.ADDITIONAL |
|
or self.opt |
|
or name != dns.name.root |
|
): |
|
raise BadEDNS |
|
elif rdtype == dns.rdatatype.TSIG: |
|
if ( |
|
section != MessageSection.ADDITIONAL |
|
or rdclass != dns.rdatatype.ANY |
|
or position != count - 1 |
|
): |
|
raise BadTSIG |
|
return (rdclass, rdtype, None, False) |
|
|
|
|
|
class ChainingResult: |
|
"""The result of a call to dns.message.QueryMessage.resolve_chaining(). |
|
|
|
The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't |
|
exist. |
|
|
|
The ``canonical_name`` attribute is the canonical name after all |
|
chaining has been applied (this is the same name as ``rrset.name`` in cases |
|
where rrset is not ``None``). |
|
|
|
The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to |
|
use if caching the data. It is the smallest of all the CNAME TTLs |
|
and either the answer TTL if it exists or the SOA TTL and SOA |
|
minimum values for negative answers. |
|
|
|
The ``cnames`` attribute is a list of all the CNAME RRSets followed to |
|
get to the canonical name. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
canonical_name: dns.name.Name, |
|
answer: Optional[dns.rrset.RRset], |
|
minimum_ttl: int, |
|
cnames: List[dns.rrset.RRset], |
|
): |
|
self.canonical_name = canonical_name |
|
self.answer = answer |
|
self.minimum_ttl = minimum_ttl |
|
self.cnames = cnames |
|
|
|
|
|
class QueryMessage(Message): |
|
def resolve_chaining(self) -> ChainingResult: |
|
"""Follow the CNAME chain in the response to determine the answer |
|
RRset. |
|
|
|
Raises ``dns.message.NotQueryResponse`` if the message is not |
|
a response. |
|
|
|
Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. |
|
|
|
Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN |
|
but an answer was found. |
|
|
|
Raises ``dns.exception.FormError`` if the question count is not 1. |
|
|
|
Returns a ChainingResult object. |
|
""" |
|
if self.flags & dns.flags.QR == 0: |
|
raise NotQueryResponse |
|
if len(self.question) != 1: |
|
raise dns.exception.FormError |
|
question = self.question[0] |
|
qname = question.name |
|
min_ttl = dns.ttl.MAX_TTL |
|
answer = None |
|
count = 0 |
|
cnames = [] |
|
while count < MAX_CHAIN: |
|
try: |
|
answer = self.find_rrset( |
|
self.answer, qname, question.rdclass, question.rdtype |
|
) |
|
min_ttl = min(min_ttl, answer.ttl) |
|
break |
|
except KeyError: |
|
if question.rdtype != dns.rdatatype.CNAME: |
|
try: |
|
crrset = self.find_rrset( |
|
self.answer, qname, question.rdclass, dns.rdatatype.CNAME |
|
) |
|
cnames.append(crrset) |
|
min_ttl = min(min_ttl, crrset.ttl) |
|
for rd in crrset: |
|
qname = rd.target |
|
break |
|
count += 1 |
|
continue |
|
except KeyError: |
|
|
|
break |
|
else: |
|
|
|
break |
|
if count >= MAX_CHAIN: |
|
raise ChainTooLong |
|
if self.rcode() == dns.rcode.NXDOMAIN and answer is not None: |
|
raise AnswerForNXDOMAIN |
|
if answer is None: |
|
|
|
auname = qname |
|
while True: |
|
|
|
|
|
try: |
|
srrset = self.find_rrset( |
|
self.authority, auname, question.rdclass, dns.rdatatype.SOA |
|
) |
|
min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum) |
|
break |
|
except KeyError: |
|
try: |
|
auname = auname.parent() |
|
except dns.name.NoParent: |
|
break |
|
return ChainingResult(qname, answer, min_ttl, cnames) |
|
|
|
def canonical_name(self) -> dns.name.Name: |
|
"""Return the canonical name of the first name in the question |
|
section. |
|
|
|
Raises ``dns.message.NotQueryResponse`` if the message is not |
|
a response. |
|
|
|
Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long. |
|
|
|
Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN |
|
but an answer was found. |
|
|
|
Raises ``dns.exception.FormError`` if the question count is not 1. |
|
""" |
|
return self.resolve_chaining().canonical_name |
|
|
|
|
|
def _maybe_import_update(): |
|
|
|
|
|
|
|
|
|
|
|
import dns.update |
|
|
|
|
|
def _message_factory_from_opcode(opcode): |
|
if opcode == dns.opcode.QUERY: |
|
return QueryMessage |
|
elif opcode == dns.opcode.UPDATE: |
|
_maybe_import_update() |
|
return dns.update.UpdateMessage |
|
else: |
|
return Message |
|
|
|
|
|
class _WireReader: |
|
"""Wire format reader. |
|
|
|
parser: the binary parser |
|
message: The message object being built |
|
initialize_message: Callback to set message parsing options |
|
question_only: Are we only reading the question? |
|
one_rr_per_rrset: Put each RR into its own RRset? |
|
keyring: TSIG keyring |
|
ignore_trailing: Ignore trailing junk at end of request? |
|
multi: Is this message part of a multi-message sequence? |
|
DNS dynamic updates. |
|
continue_on_error: try to extract as much information as possible from |
|
the message, accumulating MessageErrors in the *errors* attribute instead of |
|
raising them. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
wire, |
|
initialize_message, |
|
question_only=False, |
|
one_rr_per_rrset=False, |
|
ignore_trailing=False, |
|
keyring=None, |
|
multi=False, |
|
continue_on_error=False, |
|
): |
|
self.parser = dns.wire.Parser(wire) |
|
self.message = None |
|
self.initialize_message = initialize_message |
|
self.question_only = question_only |
|
self.one_rr_per_rrset = one_rr_per_rrset |
|
self.ignore_trailing = ignore_trailing |
|
self.keyring = keyring |
|
self.multi = multi |
|
self.continue_on_error = continue_on_error |
|
self.errors = [] |
|
|
|
def _get_question(self, section_number, qcount): |
|
"""Read the next *qcount* records from the wire data and add them to |
|
the question section. |
|
""" |
|
assert self.message is not None |
|
section = self.message.sections[section_number] |
|
for _ in range(qcount): |
|
qname = self.parser.get_name(self.message.origin) |
|
(rdtype, rdclass) = self.parser.get_struct("!HH") |
|
(rdclass, rdtype, _, _) = self.message._parse_rr_header( |
|
section_number, qname, rdclass, rdtype |
|
) |
|
self.message.find_rrset( |
|
section, qname, rdclass, rdtype, create=True, force_unique=True |
|
) |
|
|
|
def _add_error(self, e): |
|
self.errors.append(MessageError(e, self.parser.current)) |
|
|
|
def _get_section(self, section_number, count): |
|
"""Read the next I{count} records from the wire data and add them to |
|
the specified section. |
|
|
|
section_number: the section of the message to which to add records |
|
count: the number of records to read |
|
""" |
|
assert self.message is not None |
|
section = self.message.sections[section_number] |
|
force_unique = self.one_rr_per_rrset |
|
for i in range(count): |
|
rr_start = self.parser.current |
|
absolute_name = self.parser.get_name() |
|
if self.message.origin is not None: |
|
name = absolute_name.relativize(self.message.origin) |
|
else: |
|
name = absolute_name |
|
(rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH") |
|
if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG): |
|
( |
|
rdclass, |
|
rdtype, |
|
deleting, |
|
empty, |
|
) = self.message._parse_special_rr_header( |
|
section_number, count, i, name, rdclass, rdtype |
|
) |
|
else: |
|
(rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( |
|
section_number, name, rdclass, rdtype |
|
) |
|
rdata_start = self.parser.current |
|
try: |
|
if empty: |
|
if rdlen > 0: |
|
raise dns.exception.FormError |
|
rd = None |
|
covers = dns.rdatatype.NONE |
|
else: |
|
with self.parser.restrict_to(rdlen): |
|
rd = dns.rdata.from_wire_parser( |
|
rdclass, rdtype, self.parser, self.message.origin |
|
) |
|
covers = rd.covers() |
|
if self.message.xfr and rdtype == dns.rdatatype.SOA: |
|
force_unique = True |
|
if rdtype == dns.rdatatype.OPT: |
|
self.message.opt = dns.rrset.from_rdata(name, ttl, rd) |
|
elif rdtype == dns.rdatatype.TSIG: |
|
if self.keyring is None: |
|
raise UnknownTSIGKey("got signed message without keyring") |
|
if isinstance(self.keyring, dict): |
|
key = self.keyring.get(absolute_name) |
|
if isinstance(key, bytes): |
|
key = dns.tsig.Key(absolute_name, key, rd.algorithm) |
|
elif callable(self.keyring): |
|
key = self.keyring(self.message, absolute_name) |
|
else: |
|
key = self.keyring |
|
if key is None: |
|
raise UnknownTSIGKey("key '%s' unknown" % name) |
|
self.message.keyring = key |
|
self.message.tsig_ctx = dns.tsig.validate( |
|
self.parser.wire, |
|
key, |
|
absolute_name, |
|
rd, |
|
int(time.time()), |
|
self.message.request_mac, |
|
rr_start, |
|
self.message.tsig_ctx, |
|
self.multi, |
|
) |
|
self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd) |
|
else: |
|
rrset = self.message.find_rrset( |
|
section, |
|
name, |
|
rdclass, |
|
rdtype, |
|
covers, |
|
deleting, |
|
True, |
|
force_unique, |
|
) |
|
if rd is not None: |
|
if ttl > 0x7FFFFFFF: |
|
ttl = 0 |
|
rrset.add(rd, ttl) |
|
except Exception as e: |
|
if self.continue_on_error: |
|
self._add_error(e) |
|
self.parser.seek(rdata_start + rdlen) |
|
else: |
|
raise |
|
|
|
def read(self): |
|
"""Read a wire format DNS message and build a dns.message.Message |
|
object.""" |
|
|
|
if self.parser.remaining() < 12: |
|
raise ShortHeader |
|
(id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct( |
|
"!HHHHHH" |
|
) |
|
factory = _message_factory_from_opcode(dns.opcode.from_flags(flags)) |
|
self.message = factory(id=id) |
|
self.message.flags = dns.flags.Flag(flags) |
|
self.initialize_message(self.message) |
|
self.one_rr_per_rrset = self.message._get_one_rr_per_rrset( |
|
self.one_rr_per_rrset |
|
) |
|
try: |
|
self._get_question(MessageSection.QUESTION, qcount) |
|
if self.question_only: |
|
return self.message |
|
self._get_section(MessageSection.ANSWER, ancount) |
|
self._get_section(MessageSection.AUTHORITY, aucount) |
|
self._get_section(MessageSection.ADDITIONAL, adcount) |
|
if not self.ignore_trailing and self.parser.remaining() != 0: |
|
raise TrailingJunk |
|
if self.multi and self.message.tsig_ctx and not self.message.had_tsig: |
|
self.message.tsig_ctx.update(self.parser.wire) |
|
except Exception as e: |
|
if self.continue_on_error: |
|
self._add_error(e) |
|
else: |
|
raise |
|
return self.message |
|
|
|
|
|
def from_wire( |
|
wire: bytes, |
|
keyring: Optional[Any] = None, |
|
request_mac: Optional[bytes] = b"", |
|
xfr: bool = False, |
|
origin: Optional[dns.name.Name] = None, |
|
tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None, |
|
multi: bool = False, |
|
question_only: bool = False, |
|
one_rr_per_rrset: bool = False, |
|
ignore_trailing: bool = False, |
|
raise_on_truncation: bool = False, |
|
continue_on_error: bool = False, |
|
) -> Message: |
|
"""Convert a DNS wire format message into a message object. |
|
|
|
*keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message |
|
is signed. |
|
|
|
*request_mac*, a ``bytes`` or ``None``. If the message is a response to a |
|
TSIG-signed request, *request_mac* should be set to the MAC of that request. |
|
|
|
*xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone |
|
transfer. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``. If the message is part of a zone |
|
transfer, *origin* should be the origin name of the zone. If not ``None``, names |
|
will be relativized to the origin. |
|
|
|
*tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG |
|
context, used when validating zone transfers. |
|
|
|
*multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple |
|
message sequence. |
|
|
|
*question_only*, a ``bool``. If ``True``, read only up to the end of the question |
|
section. |
|
|
|
*one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset. |
|
|
|
*ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the |
|
message. |
|
|
|
*raise_on_truncation*, a ``bool``. If ``True``, raise an exception if the TC bit is |
|
set. |
|
|
|
*continue_on_error*, a ``bool``. If ``True``, try to continue parsing even if |
|
errors occur. Erroneous rdata will be ignored. Errors will be accumulated as a |
|
list of MessageError objects in the message's ``errors`` attribute. This option is |
|
recommended only for DNS analysis tools, or for use in a server as part of an error |
|
handling path. The default is ``False``. |
|
|
|
Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long. |
|
|
|
Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end |
|
of the proper DNS message, and *ignore_trailing* is ``False``. |
|
|
|
Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or |
|
occurred more than once. |
|
|
|
Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the |
|
additional data section. |
|
|
|
Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is |
|
``True``. |
|
|
|
Returns a ``dns.message.Message``. |
|
""" |
|
|
|
|
|
if request_mac is None: |
|
request_mac = b"" |
|
|
|
def initialize_message(message): |
|
message.request_mac = request_mac |
|
message.xfr = xfr |
|
message.origin = origin |
|
message.tsig_ctx = tsig_ctx |
|
|
|
reader = _WireReader( |
|
wire, |
|
initialize_message, |
|
question_only, |
|
one_rr_per_rrset, |
|
ignore_trailing, |
|
keyring, |
|
multi, |
|
continue_on_error, |
|
) |
|
try: |
|
m = reader.read() |
|
except dns.exception.FormError: |
|
if ( |
|
reader.message |
|
and (reader.message.flags & dns.flags.TC) |
|
and raise_on_truncation |
|
): |
|
raise Truncated(message=reader.message) |
|
else: |
|
raise |
|
|
|
|
|
if m.flags & dns.flags.TC and raise_on_truncation: |
|
raise Truncated(message=m) |
|
if continue_on_error: |
|
m.errors = reader.errors |
|
|
|
return m |
|
|
|
|
|
class _TextReader: |
|
"""Text format reader. |
|
|
|
tok: the tokenizer. |
|
message: The message object being built. |
|
DNS dynamic updates. |
|
last_name: The most recently read name when building a message object. |
|
one_rr_per_rrset: Put each RR into its own RRset? |
|
origin: The origin for relative names |
|
relativize: relativize names? |
|
relativize_to: the origin to relativize to. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
text, |
|
idna_codec, |
|
one_rr_per_rrset=False, |
|
origin=None, |
|
relativize=True, |
|
relativize_to=None, |
|
): |
|
self.message = None |
|
self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec) |
|
self.last_name = None |
|
self.one_rr_per_rrset = one_rr_per_rrset |
|
self.origin = origin |
|
self.relativize = relativize |
|
self.relativize_to = relativize_to |
|
self.id = None |
|
self.edns = -1 |
|
self.ednsflags = 0 |
|
self.payload = DEFAULT_EDNS_PAYLOAD |
|
self.rcode = None |
|
self.opcode = dns.opcode.QUERY |
|
self.flags = 0 |
|
|
|
def _header_line(self, _): |
|
"""Process one line from the text format header section.""" |
|
|
|
token = self.tok.get() |
|
what = token.value |
|
if what == "id": |
|
self.id = self.tok.get_int() |
|
elif what == "flags": |
|
while True: |
|
token = self.tok.get() |
|
if not token.is_identifier(): |
|
self.tok.unget(token) |
|
break |
|
self.flags = self.flags | dns.flags.from_text(token.value) |
|
elif what == "edns": |
|
self.edns = self.tok.get_int() |
|
self.ednsflags = self.ednsflags | (self.edns << 16) |
|
elif what == "eflags": |
|
if self.edns < 0: |
|
self.edns = 0 |
|
while True: |
|
token = self.tok.get() |
|
if not token.is_identifier(): |
|
self.tok.unget(token) |
|
break |
|
self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value) |
|
elif what == "payload": |
|
self.payload = self.tok.get_int() |
|
if self.edns < 0: |
|
self.edns = 0 |
|
elif what == "opcode": |
|
text = self.tok.get_string() |
|
self.opcode = dns.opcode.from_text(text) |
|
self.flags = self.flags | dns.opcode.to_flags(self.opcode) |
|
elif what == "rcode": |
|
text = self.tok.get_string() |
|
self.rcode = dns.rcode.from_text(text) |
|
else: |
|
raise UnknownHeaderField |
|
self.tok.get_eol() |
|
|
|
def _question_line(self, section_number): |
|
"""Process one line from the text format question section.""" |
|
|
|
section = self.message.sections[section_number] |
|
token = self.tok.get(want_leading=True) |
|
if not token.is_whitespace(): |
|
self.last_name = self.tok.as_name( |
|
token, self.message.origin, self.relativize, self.relativize_to |
|
) |
|
name = self.last_name |
|
if name is None: |
|
raise NoPreviousName |
|
token = self.tok.get() |
|
if not token.is_identifier(): |
|
raise dns.exception.SyntaxError |
|
|
|
try: |
|
rdclass = dns.rdataclass.from_text(token.value) |
|
token = self.tok.get() |
|
if not token.is_identifier(): |
|
raise dns.exception.SyntaxError |
|
except dns.exception.SyntaxError: |
|
raise dns.exception.SyntaxError |
|
except Exception: |
|
rdclass = dns.rdataclass.IN |
|
|
|
rdtype = dns.rdatatype.from_text(token.value) |
|
(rdclass, rdtype, _, _) = self.message._parse_rr_header( |
|
section_number, name, rdclass, rdtype |
|
) |
|
self.message.find_rrset( |
|
section, name, rdclass, rdtype, create=True, force_unique=True |
|
) |
|
self.tok.get_eol() |
|
|
|
def _rr_line(self, section_number): |
|
"""Process one line from the text format answer, authority, or |
|
additional data sections. |
|
""" |
|
|
|
section = self.message.sections[section_number] |
|
|
|
token = self.tok.get(want_leading=True) |
|
if not token.is_whitespace(): |
|
self.last_name = self.tok.as_name( |
|
token, self.message.origin, self.relativize, self.relativize_to |
|
) |
|
name = self.last_name |
|
if name is None: |
|
raise NoPreviousName |
|
token = self.tok.get() |
|
if not token.is_identifier(): |
|
raise dns.exception.SyntaxError |
|
|
|
try: |
|
ttl = int(token.value, 0) |
|
token = self.tok.get() |
|
if not token.is_identifier(): |
|
raise dns.exception.SyntaxError |
|
except dns.exception.SyntaxError: |
|
raise dns.exception.SyntaxError |
|
except Exception: |
|
ttl = 0 |
|
|
|
try: |
|
rdclass = dns.rdataclass.from_text(token.value) |
|
token = self.tok.get() |
|
if not token.is_identifier(): |
|
raise dns.exception.SyntaxError |
|
except dns.exception.SyntaxError: |
|
raise dns.exception.SyntaxError |
|
except Exception: |
|
rdclass = dns.rdataclass.IN |
|
|
|
rdtype = dns.rdatatype.from_text(token.value) |
|
(rdclass, rdtype, deleting, empty) = self.message._parse_rr_header( |
|
section_number, name, rdclass, rdtype |
|
) |
|
token = self.tok.get() |
|
if empty and not token.is_eol_or_eof(): |
|
raise dns.exception.SyntaxError |
|
if not empty and token.is_eol_or_eof(): |
|
raise dns.exception.UnexpectedEnd |
|
if not token.is_eol_or_eof(): |
|
self.tok.unget(token) |
|
rd = dns.rdata.from_text( |
|
rdclass, |
|
rdtype, |
|
self.tok, |
|
self.message.origin, |
|
self.relativize, |
|
self.relativize_to, |
|
) |
|
covers = rd.covers() |
|
else: |
|
rd = None |
|
covers = dns.rdatatype.NONE |
|
rrset = self.message.find_rrset( |
|
section, |
|
name, |
|
rdclass, |
|
rdtype, |
|
covers, |
|
deleting, |
|
True, |
|
self.one_rr_per_rrset, |
|
) |
|
if rd is not None: |
|
rrset.add(rd, ttl) |
|
|
|
def _make_message(self): |
|
factory = _message_factory_from_opcode(self.opcode) |
|
message = factory(id=self.id) |
|
message.flags = self.flags |
|
if self.edns >= 0: |
|
message.use_edns(self.edns, self.ednsflags, self.payload) |
|
if self.rcode: |
|
message.set_rcode(self.rcode) |
|
if self.origin: |
|
message.origin = self.origin |
|
return message |
|
|
|
def read(self): |
|
"""Read a text format DNS message and build a dns.message.Message |
|
object.""" |
|
|
|
line_method = self._header_line |
|
section_number = None |
|
while 1: |
|
token = self.tok.get(True, True) |
|
if token.is_eol_or_eof(): |
|
break |
|
if token.is_comment(): |
|
u = token.value.upper() |
|
if u == "HEADER": |
|
line_method = self._header_line |
|
|
|
if self.message: |
|
message = self.message |
|
else: |
|
|
|
|
|
message = self._make_message() |
|
try: |
|
section_number = message._section_enum.from_text(u) |
|
|
|
|
|
if not self.message: |
|
self.message = message |
|
self.one_rr_per_rrset = message._get_one_rr_per_rrset( |
|
self.one_rr_per_rrset |
|
) |
|
if section_number == MessageSection.QUESTION: |
|
line_method = self._question_line |
|
else: |
|
line_method = self._rr_line |
|
except Exception: |
|
|
|
pass |
|
self.tok.get_eol() |
|
continue |
|
self.tok.unget(token) |
|
line_method(section_number) |
|
if not self.message: |
|
self.message = self._make_message() |
|
return self.message |
|
|
|
|
|
def from_text( |
|
text: str, |
|
idna_codec: Optional[dns.name.IDNACodec] = None, |
|
one_rr_per_rrset: bool = False, |
|
origin: Optional[dns.name.Name] = None, |
|
relativize: bool = True, |
|
relativize_to: Optional[dns.name.Name] = None, |
|
) -> Message: |
|
"""Convert the text format message into a message object. |
|
|
|
The reader stops after reading the first blank line in the input to |
|
facilitate reading multiple messages from a single file with |
|
``dns.message.from_file()``. |
|
|
|
*text*, a ``str``, the text format message. |
|
|
|
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA |
|
encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder |
|
is used. |
|
|
|
*one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put |
|
into its own rrset. The default is ``False``. |
|
|
|
*origin*, a ``dns.name.Name`` (or ``None``), the |
|
origin to use for relative names. |
|
|
|
*relativize*, a ``bool``. If true, name will be relativized. |
|
|
|
*relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use |
|
when relativizing names. If not set, the *origin* value will be used. |
|
|
|
Raises ``dns.message.UnknownHeaderField`` if a header is unknown. |
|
|
|
Raises ``dns.exception.SyntaxError`` if the text is badly formed. |
|
|
|
Returns a ``dns.message.Message object`` |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
reader = _TextReader( |
|
text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to |
|
) |
|
return reader.read() |
|
|
|
|
|
def from_file( |
|
f: Any, |
|
idna_codec: Optional[dns.name.IDNACodec] = None, |
|
one_rr_per_rrset: bool = False, |
|
) -> Message: |
|
"""Read the next text format message from the specified file. |
|
|
|
Message blocks are separated by a single blank line. |
|
|
|
*f*, a ``file`` or ``str``. If *f* is text, it is treated as the |
|
pathname of a file to open. |
|
|
|
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA |
|
encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder |
|
is used. |
|
|
|
*one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put |
|
into its own rrset. The default is ``False``. |
|
|
|
Raises ``dns.message.UnknownHeaderField`` if a header is unknown. |
|
|
|
Raises ``dns.exception.SyntaxError`` if the text is badly formed. |
|
|
|
Returns a ``dns.message.Message object`` |
|
""" |
|
|
|
if isinstance(f, str): |
|
cm: contextlib.AbstractContextManager = open(f) |
|
else: |
|
cm = contextlib.nullcontext(f) |
|
with cm as f: |
|
return from_text(f, idna_codec, one_rr_per_rrset) |
|
assert False |
|
|
|
|
|
def make_query( |
|
qname: Union[dns.name.Name, str], |
|
rdtype: Union[dns.rdatatype.RdataType, str], |
|
rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
|
use_edns: Optional[Union[int, bool]] = None, |
|
want_dnssec: bool = False, |
|
ednsflags: Optional[int] = None, |
|
payload: Optional[int] = None, |
|
request_payload: Optional[int] = None, |
|
options: Optional[List[dns.edns.Option]] = None, |
|
idna_codec: Optional[dns.name.IDNACodec] = None, |
|
id: Optional[int] = None, |
|
flags: int = dns.flags.RD, |
|
pad: int = 0, |
|
) -> QueryMessage: |
|
"""Make a query message. |
|
|
|
The query name, type, and class may all be specified either |
|
as objects of the appropriate type, or as strings. |
|
|
|
The query will have a randomly chosen query id, and its DNS flags |
|
will be set to dns.flags.RD. |
|
|
|
qname, a ``dns.name.Name`` or ``str``, the query name. |
|
|
|
*rdtype*, an ``int`` or ``str``, the desired rdata type. |
|
|
|
*rdclass*, an ``int`` or ``str``, the desired rdata class; the default |
|
is class IN. |
|
|
|
*use_edns*, an ``int``, ``bool`` or ``None``. The EDNS level to use; the |
|
default is ``None``. If ``None``, EDNS will be enabled only if other |
|
parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are |
|
set. |
|
See the description of dns.message.Message.use_edns() for the possible |
|
values for use_edns and their meanings. |
|
|
|
*want_dnssec*, a ``bool``. If ``True``, DNSSEC data is desired. |
|
|
|
*ednsflags*, an ``int``, the EDNS flag values. |
|
|
|
*payload*, an ``int``, is the EDNS sender's payload field, which is the |
|
maximum size of UDP datagram the sender can handle. I.e. how big |
|
a response to this message can be. |
|
|
|
*request_payload*, an ``int``, is the EDNS payload size to use when |
|
sending this message. If not specified, defaults to the value of |
|
*payload*. |
|
|
|
*options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS |
|
options. |
|
|
|
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA |
|
encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder |
|
is used. |
|
|
|
*id*, an ``int`` or ``None``, the desired query id. The default is |
|
``None``, which generates a random query id. |
|
|
|
*flags*, an ``int``, the desired query flags. The default is |
|
``dns.flags.RD``. |
|
|
|
*pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add |
|
padding bytes to make the message size a multiple of *pad*. Note that if |
|
padding is non-zero, an EDNS PADDING option will always be added to the |
|
message. |
|
|
|
Returns a ``dns.message.QueryMessage`` |
|
""" |
|
|
|
if isinstance(qname, str): |
|
qname = dns.name.from_text(qname, idna_codec=idna_codec) |
|
rdtype = dns.rdatatype.RdataType.make(rdtype) |
|
rdclass = dns.rdataclass.RdataClass.make(rdclass) |
|
m = QueryMessage(id=id) |
|
m.flags = dns.flags.Flag(flags) |
|
m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True) |
|
|
|
|
|
|
|
kwargs: Dict[str, Any] = {} |
|
if ednsflags is not None: |
|
kwargs["ednsflags"] = ednsflags |
|
if payload is not None: |
|
kwargs["payload"] = payload |
|
if request_payload is not None: |
|
kwargs["request_payload"] = request_payload |
|
if options is not None: |
|
kwargs["options"] = options |
|
if kwargs and use_edns is None: |
|
use_edns = 0 |
|
kwargs["edns"] = use_edns |
|
kwargs["pad"] = pad |
|
m.use_edns(**kwargs) |
|
m.want_dnssec(want_dnssec) |
|
return m |
|
|
|
|
|
def make_response( |
|
query: Message, |
|
recursion_available: bool = False, |
|
our_payload: int = 8192, |
|
fudge: int = 300, |
|
tsig_error: int = 0, |
|
pad: Optional[int] = None, |
|
) -> Message: |
|
"""Make a message which is a response for the specified query. |
|
The message returned is really a response skeleton; it has all of the infrastructure |
|
required of a response, but none of the content. |
|
|
|
The response's question section is a shallow copy of the query's question section, |
|
so the query's question RRsets should not be changed. |
|
|
|
*query*, a ``dns.message.Message``, the query to respond to. |
|
|
|
*recursion_available*, a ``bool``, should RA be set in the response? |
|
|
|
*our_payload*, an ``int``, the payload size to advertise in EDNS responses. |
|
|
|
*fudge*, an ``int``, the TSIG time fudge. |
|
|
|
*tsig_error*, an ``int``, the TSIG error. |
|
|
|
*pad*, a non-negative ``int`` or ``None``. If 0, the default, do not pad; otherwise |
|
if not ``None`` add padding bytes to make the message size a multiple of *pad*. |
|
Note that if padding is non-zero, an EDNS PADDING option will always be added to the |
|
message. If ``None``, add padding following RFC 8467, namely if the request is |
|
padded, pad the response to 468 otherwise do not pad. |
|
|
|
Returns a ``dns.message.Message`` object whose specific class is appropriate for the |
|
query. For example, if query is a ``dns.update.UpdateMessage``, response will be |
|
too. |
|
""" |
|
|
|
if query.flags & dns.flags.QR: |
|
raise dns.exception.FormError("specified query message is not a query") |
|
factory = _message_factory_from_opcode(query.opcode()) |
|
response = factory(id=query.id) |
|
response.flags = dns.flags.QR | (query.flags & dns.flags.RD) |
|
if recursion_available: |
|
response.flags |= dns.flags.RA |
|
response.set_opcode(query.opcode()) |
|
response.question = list(query.question) |
|
if query.edns >= 0: |
|
if pad is None: |
|
|
|
pad = 0 |
|
for option in query.options: |
|
if option.otype == dns.edns.OptionType.PADDING: |
|
pad = 468 |
|
response.use_edns(0, 0, our_payload, query.payload, pad=pad) |
|
if query.had_tsig: |
|
response.use_tsig( |
|
query.keyring, |
|
query.keyname, |
|
fudge, |
|
None, |
|
tsig_error, |
|
b"", |
|
query.keyalgorithm, |
|
) |
|
response.request_mac = query.mac |
|
return response |
|
|
|
|
|
|
|
|
|
QUESTION = MessageSection.QUESTION |
|
ANSWER = MessageSection.ANSWER |
|
AUTHORITY = MessageSection.AUTHORITY |
|
ADDITIONAL = MessageSection.ADDITIONAL |
|
|
|
|
|
|