|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Common DNSSEC-related functions and constants.""" |
|
|
|
|
|
import base64 |
|
import contextlib |
|
import functools |
|
import hashlib |
|
import struct |
|
import time |
|
from datetime import datetime |
|
from typing import Callable, Dict, List, Optional, Set, Tuple, Union, cast |
|
|
|
import dns._features |
|
import dns.exception |
|
import dns.name |
|
import dns.node |
|
import dns.rdata |
|
import dns.rdataclass |
|
import dns.rdataset |
|
import dns.rdatatype |
|
import dns.rrset |
|
import dns.transaction |
|
import dns.zone |
|
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash |
|
from dns.exception import ( |
|
AlgorithmKeyMismatch, |
|
DeniedByPolicy, |
|
UnsupportedAlgorithm, |
|
ValidationFailure, |
|
) |
|
from dns.rdtypes.ANY.CDNSKEY import CDNSKEY |
|
from dns.rdtypes.ANY.CDS import CDS |
|
from dns.rdtypes.ANY.DNSKEY import DNSKEY |
|
from dns.rdtypes.ANY.DS import DS |
|
from dns.rdtypes.ANY.NSEC import NSEC, Bitmap |
|
from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM |
|
from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime |
|
from dns.rdtypes.dnskeybase import Flag |
|
|
|
PublicKey = Union[ |
|
"GenericPublicKey", |
|
"rsa.RSAPublicKey", |
|
"ec.EllipticCurvePublicKey", |
|
"ed25519.Ed25519PublicKey", |
|
"ed448.Ed448PublicKey", |
|
] |
|
|
|
PrivateKey = Union[ |
|
"GenericPrivateKey", |
|
"rsa.RSAPrivateKey", |
|
"ec.EllipticCurvePrivateKey", |
|
"ed25519.Ed25519PrivateKey", |
|
"ed448.Ed448PrivateKey", |
|
] |
|
|
|
RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None] |
|
|
|
|
|
def algorithm_from_text(text: str) -> Algorithm: |
|
"""Convert text into a DNSSEC algorithm value. |
|
|
|
*text*, a ``str``, the text to convert to into an algorithm value. |
|
|
|
Returns an ``int``. |
|
""" |
|
|
|
return Algorithm.from_text(text) |
|
|
|
|
|
def algorithm_to_text(value: Union[Algorithm, int]) -> str: |
|
"""Convert a DNSSEC algorithm value to text |
|
|
|
*value*, a ``dns.dnssec.Algorithm``. |
|
|
|
Returns a ``str``, the name of a DNSSEC algorithm. |
|
""" |
|
|
|
return Algorithm.to_text(value) |
|
|
|
|
|
def to_timestamp(value: Union[datetime, str, float, int]) -> int: |
|
"""Convert various format to a timestamp""" |
|
if isinstance(value, datetime): |
|
return int(value.timestamp()) |
|
elif isinstance(value, str): |
|
return sigtime_to_posixtime(value) |
|
elif isinstance(value, float): |
|
return int(value) |
|
elif isinstance(value, int): |
|
return value |
|
else: |
|
raise TypeError("Unsupported timestamp type") |
|
|
|
|
|
def key_id(key: Union[DNSKEY, CDNSKEY]) -> int: |
|
"""Return the key id (a 16-bit number) for the specified key. |
|
|
|
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` |
|
|
|
Returns an ``int`` between 0 and 65535 |
|
""" |
|
|
|
rdata = key.to_wire() |
|
if key.algorithm == Algorithm.RSAMD5: |
|
return (rdata[-3] << 8) + rdata[-2] |
|
else: |
|
total = 0 |
|
for i in range(len(rdata) // 2): |
|
total += (rdata[2 * i] << 8) + rdata[2 * i + 1] |
|
if len(rdata) % 2 != 0: |
|
total += rdata[len(rdata) - 1] << 8 |
|
total += (total >> 16) & 0xFFFF |
|
return total & 0xFFFF |
|
|
|
|
|
class Policy: |
|
def __init__(self): |
|
pass |
|
|
|
def ok_to_sign(self, _: DNSKEY) -> bool: |
|
return False |
|
|
|
def ok_to_validate(self, _: DNSKEY) -> bool: |
|
return False |
|
|
|
def ok_to_create_ds(self, _: DSDigest) -> bool: |
|
return False |
|
|
|
def ok_to_validate_ds(self, _: DSDigest) -> bool: |
|
return False |
|
|
|
|
|
class SimpleDeny(Policy): |
|
def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds): |
|
super().__init__() |
|
self._deny_sign = deny_sign |
|
self._deny_validate = deny_validate |
|
self._deny_create_ds = deny_create_ds |
|
self._deny_validate_ds = deny_validate_ds |
|
|
|
def ok_to_sign(self, key: DNSKEY) -> bool: |
|
return key.algorithm not in self._deny_sign |
|
|
|
def ok_to_validate(self, key: DNSKEY) -> bool: |
|
return key.algorithm not in self._deny_validate |
|
|
|
def ok_to_create_ds(self, algorithm: DSDigest) -> bool: |
|
return algorithm not in self._deny_create_ds |
|
|
|
def ok_to_validate_ds(self, algorithm: DSDigest) -> bool: |
|
return algorithm not in self._deny_validate_ds |
|
|
|
|
|
rfc_8624_policy = SimpleDeny( |
|
{Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST}, |
|
{Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1}, |
|
{DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST}, |
|
{DSDigest.NULL}, |
|
) |
|
|
|
allow_all_policy = SimpleDeny(set(), set(), set(), set()) |
|
|
|
|
|
default_policy = rfc_8624_policy |
|
|
|
|
|
def make_ds( |
|
name: Union[dns.name.Name, str], |
|
key: dns.rdata.Rdata, |
|
algorithm: Union[DSDigest, str], |
|
origin: Optional[dns.name.Name] = None, |
|
policy: Optional[Policy] = None, |
|
validating: bool = False, |
|
) -> DS: |
|
"""Create a DS record for a DNSSEC key. |
|
|
|
*name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record. |
|
|
|
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, |
|
the key the DS is about. |
|
|
|
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm. |
|
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case |
|
does not matter for these strings. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``. If *key* is a relative name, |
|
then it will be made absolute using the specified origin. |
|
|
|
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, |
|
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. |
|
|
|
*validating*, a ``bool``. If ``True``, then policy is checked in |
|
validating mode, i.e. "Is it ok to validate using this digest algorithm?". |
|
Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with |
|
this digest algorithm?". |
|
|
|
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. |
|
|
|
Raises ``DeniedByPolicy`` if the algorithm is denied by policy. |
|
|
|
Returns a ``dns.rdtypes.ANY.DS.DS`` |
|
""" |
|
|
|
if policy is None: |
|
policy = default_policy |
|
try: |
|
if isinstance(algorithm, str): |
|
algorithm = DSDigest[algorithm.upper()] |
|
except Exception: |
|
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) |
|
if validating: |
|
check = policy.ok_to_validate_ds |
|
else: |
|
check = policy.ok_to_create_ds |
|
if not check(algorithm): |
|
raise DeniedByPolicy |
|
if not isinstance(key, (DNSKEY, CDNSKEY)): |
|
raise ValueError("key is not a DNSKEY/CDNSKEY") |
|
if algorithm == DSDigest.SHA1: |
|
dshash = hashlib.sha1() |
|
elif algorithm == DSDigest.SHA256: |
|
dshash = hashlib.sha256() |
|
elif algorithm == DSDigest.SHA384: |
|
dshash = hashlib.sha384() |
|
else: |
|
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) |
|
|
|
if isinstance(name, str): |
|
name = dns.name.from_text(name, origin) |
|
wire = name.canonicalize().to_wire() |
|
assert wire is not None |
|
dshash.update(wire) |
|
dshash.update(key.to_wire(origin=origin)) |
|
digest = dshash.digest() |
|
|
|
dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, algorithm) + digest |
|
ds = dns.rdata.from_wire( |
|
dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, len(dsrdata) |
|
) |
|
return cast(DS, ds) |
|
|
|
|
|
def make_cds( |
|
name: Union[dns.name.Name, str], |
|
key: dns.rdata.Rdata, |
|
algorithm: Union[DSDigest, str], |
|
origin: Optional[dns.name.Name] = None, |
|
) -> CDS: |
|
"""Create a CDS record for a DNSSEC key. |
|
|
|
*name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record. |
|
|
|
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, |
|
the key the DS is about. |
|
|
|
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm. |
|
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case |
|
does not matter for these strings. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``. If *key* is a relative name, |
|
then it will be made absolute using the specified origin. |
|
|
|
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. |
|
|
|
Returns a ``dns.rdtypes.ANY.DS.CDS`` |
|
""" |
|
|
|
ds = make_ds(name, key, algorithm, origin) |
|
return CDS( |
|
rdclass=ds.rdclass, |
|
rdtype=dns.rdatatype.CDS, |
|
key_tag=ds.key_tag, |
|
algorithm=ds.algorithm, |
|
digest_type=ds.digest_type, |
|
digest=ds.digest, |
|
) |
|
|
|
|
|
def _find_candidate_keys( |
|
keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG |
|
) -> Optional[List[DNSKEY]]: |
|
value = keys.get(rrsig.signer) |
|
if isinstance(value, dns.node.Node): |
|
rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY) |
|
else: |
|
rdataset = value |
|
if rdataset is None: |
|
return None |
|
return [ |
|
cast(DNSKEY, rd) |
|
for rd in rdataset |
|
if rd.algorithm == rrsig.algorithm |
|
and key_id(rd) == rrsig.key_tag |
|
and (rd.flags & Flag.ZONE) == Flag.ZONE |
|
and rd.protocol == 3 |
|
] |
|
|
|
|
|
def _get_rrname_rdataset( |
|
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], |
|
) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]: |
|
if isinstance(rrset, tuple): |
|
return rrset[0], rrset[1] |
|
else: |
|
return rrset.name, rrset |
|
|
|
|
|
def _validate_signature(sig: bytes, data: bytes, key: DNSKEY) -> None: |
|
public_cls = get_algorithm_cls_from_dnskey(key).public_cls |
|
try: |
|
public_key = public_cls.from_dnskey(key) |
|
except ValueError: |
|
raise ValidationFailure("invalid public key") |
|
public_key.verify(sig, data) |
|
|
|
|
|
def _validate_rrsig( |
|
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], |
|
rrsig: RRSIG, |
|
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], |
|
origin: Optional[dns.name.Name] = None, |
|
now: Optional[float] = None, |
|
policy: Optional[Policy] = None, |
|
) -> None: |
|
"""Validate an RRset against a single signature rdata, throwing an |
|
exception if validation is not successful. |
|
|
|
*rrset*, the RRset to validate. This can be a |
|
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) |
|
tuple. |
|
|
|
*rrsig*, a ``dns.rdata.Rdata``, the signature to validate. |
|
|
|
*keys*, the key dictionary, used to find the DNSKEY associated |
|
with a given name. The dictionary is keyed by a |
|
``dns.name.Name``, and has ``dns.node.Node`` or |
|
``dns.rdataset.Rdataset`` values. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative |
|
names. |
|
|
|
*now*, a ``float`` or ``None``, the time, in seconds since the epoch, to |
|
use as the current time when validating. If ``None``, the actual current |
|
time is used. |
|
|
|
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, |
|
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. |
|
|
|
Raises ``ValidationFailure`` if the signature is expired, not yet valid, |
|
the public key is invalid, the algorithm is unknown, the verification |
|
fails, etc. |
|
|
|
Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by |
|
dnspython but not implemented. |
|
""" |
|
|
|
if policy is None: |
|
policy = default_policy |
|
|
|
candidate_keys = _find_candidate_keys(keys, rrsig) |
|
if candidate_keys is None: |
|
raise ValidationFailure("unknown key") |
|
|
|
if now is None: |
|
now = time.time() |
|
if rrsig.expiration < now: |
|
raise ValidationFailure("expired") |
|
if rrsig.inception > now: |
|
raise ValidationFailure("not yet valid") |
|
|
|
data = _make_rrsig_signature_data(rrset, rrsig, origin) |
|
|
|
for candidate_key in candidate_keys: |
|
if not policy.ok_to_validate(candidate_key): |
|
continue |
|
try: |
|
_validate_signature(rrsig.signature, data, candidate_key) |
|
return |
|
except (InvalidSignature, ValidationFailure): |
|
|
|
continue |
|
|
|
raise ValidationFailure("verify failure") |
|
|
|
|
|
def _validate( |
|
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], |
|
rrsigset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], |
|
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], |
|
origin: Optional[dns.name.Name] = None, |
|
now: Optional[float] = None, |
|
policy: Optional[Policy] = None, |
|
) -> None: |
|
"""Validate an RRset against a signature RRset, throwing an exception |
|
if none of the signatures validate. |
|
|
|
*rrset*, the RRset to validate. This can be a |
|
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) |
|
tuple. |
|
|
|
*rrsigset*, the signature RRset. This can be a |
|
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) |
|
tuple. |
|
|
|
*keys*, the key dictionary, used to find the DNSKEY associated |
|
with a given name. The dictionary is keyed by a |
|
``dns.name.Name``, and has ``dns.node.Node`` or |
|
``dns.rdataset.Rdataset`` values. |
|
|
|
*origin*, a ``dns.name.Name``, the origin to use for relative names; |
|
defaults to None. |
|
|
|
*now*, an ``int`` or ``None``, the time, in seconds since the epoch, to |
|
use as the current time when validating. If ``None``, the actual current |
|
time is used. |
|
|
|
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, |
|
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. |
|
|
|
Raises ``ValidationFailure`` if the signature is expired, not yet valid, |
|
the public key is invalid, the algorithm is unknown, the verification |
|
fails, etc. |
|
""" |
|
|
|
if policy is None: |
|
policy = default_policy |
|
|
|
if isinstance(origin, str): |
|
origin = dns.name.from_text(origin, dns.name.root) |
|
|
|
if isinstance(rrset, tuple): |
|
rrname = rrset[0] |
|
else: |
|
rrname = rrset.name |
|
|
|
if isinstance(rrsigset, tuple): |
|
rrsigname = rrsigset[0] |
|
rrsigrdataset = rrsigset[1] |
|
else: |
|
rrsigname = rrsigset.name |
|
rrsigrdataset = rrsigset |
|
|
|
rrname = rrname.choose_relativity(origin) |
|
rrsigname = rrsigname.choose_relativity(origin) |
|
if rrname != rrsigname: |
|
raise ValidationFailure("owner names do not match") |
|
|
|
for rrsig in rrsigrdataset: |
|
if not isinstance(rrsig, RRSIG): |
|
raise ValidationFailure("expected an RRSIG") |
|
try: |
|
_validate_rrsig(rrset, rrsig, keys, origin, now, policy) |
|
return |
|
except (ValidationFailure, UnsupportedAlgorithm): |
|
pass |
|
raise ValidationFailure("no RRSIGs validated") |
|
|
|
|
|
def _sign( |
|
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], |
|
private_key: PrivateKey, |
|
signer: dns.name.Name, |
|
dnskey: DNSKEY, |
|
inception: Optional[Union[datetime, str, int, float]] = None, |
|
expiration: Optional[Union[datetime, str, int, float]] = None, |
|
lifetime: Optional[int] = None, |
|
verify: bool = False, |
|
policy: Optional[Policy] = None, |
|
origin: Optional[dns.name.Name] = None, |
|
) -> RRSIG: |
|
"""Sign RRset using private key. |
|
|
|
*rrset*, the RRset to validate. This can be a |
|
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) |
|
tuple. |
|
|
|
*private_key*, the private key to use for signing, a |
|
``cryptography.hazmat.primitives.asymmetric`` private key class applicable |
|
for DNSSEC. |
|
|
|
*signer*, a ``dns.name.Name``, the Signer's name. |
|
|
|
*dnskey*, a ``DNSKEY`` matching ``private_key``. |
|
|
|
*inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the |
|
signature inception time. If ``None``, the current time is used. If a ``str``, the |
|
format is "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX |
|
epoch in text form; this is the same the RRSIG rdata's text form. |
|
Values of type `int` or `float` are interpreted as seconds since the UNIX epoch. |
|
|
|
*expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature |
|
expiration time. If ``None``, the expiration time will be the inception time plus |
|
the value of the *lifetime* parameter. See the description of *inception* above |
|
for how the various parameter types are interpreted. |
|
|
|
*lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This |
|
parameter is only meaningful if *expiration* is ``None``. |
|
|
|
*verify*, a ``bool``. If set to ``True``, the signer will verify signatures |
|
after they are created; the default is ``False``. |
|
|
|
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, |
|
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``. If ``None``, the default, then all |
|
names in the rrset (including its owner name) must be absolute; otherwise the |
|
specified origin will be used to make names absolute when signing. |
|
|
|
Raises ``DeniedByPolicy`` if the signature is denied by policy. |
|
""" |
|
|
|
if policy is None: |
|
policy = default_policy |
|
if not policy.ok_to_sign(dnskey): |
|
raise DeniedByPolicy |
|
|
|
if isinstance(rrset, tuple): |
|
rdclass = rrset[1].rdclass |
|
rdtype = rrset[1].rdtype |
|
rrname = rrset[0] |
|
original_ttl = rrset[1].ttl |
|
else: |
|
rdclass = rrset.rdclass |
|
rdtype = rrset.rdtype |
|
rrname = rrset.name |
|
original_ttl = rrset.ttl |
|
|
|
if inception is not None: |
|
rrsig_inception = to_timestamp(inception) |
|
else: |
|
rrsig_inception = int(time.time()) |
|
|
|
if expiration is not None: |
|
rrsig_expiration = to_timestamp(expiration) |
|
elif lifetime is not None: |
|
rrsig_expiration = rrsig_inception + lifetime |
|
else: |
|
raise ValueError("expiration or lifetime must be specified") |
|
|
|
|
|
|
|
if origin is not None: |
|
rrname = rrname.derelativize(origin) |
|
labels = len(rrname) - 1 |
|
|
|
|
|
if rrname.is_wild(): |
|
labels -= 1 |
|
|
|
rrsig_template = RRSIG( |
|
rdclass=rdclass, |
|
rdtype=dns.rdatatype.RRSIG, |
|
type_covered=rdtype, |
|
algorithm=dnskey.algorithm, |
|
labels=labels, |
|
original_ttl=original_ttl, |
|
expiration=rrsig_expiration, |
|
inception=rrsig_inception, |
|
key_tag=key_id(dnskey), |
|
signer=signer, |
|
signature=b"", |
|
) |
|
|
|
data = dns.dnssec._make_rrsig_signature_data(rrset, rrsig_template, origin) |
|
|
|
if isinstance(private_key, GenericPrivateKey): |
|
signing_key = private_key |
|
else: |
|
try: |
|
private_cls = get_algorithm_cls_from_dnskey(dnskey) |
|
signing_key = private_cls(key=private_key) |
|
except UnsupportedAlgorithm: |
|
raise TypeError("Unsupported key algorithm") |
|
|
|
signature = signing_key.sign(data, verify) |
|
|
|
return cast(RRSIG, rrsig_template.replace(signature=signature)) |
|
|
|
|
|
def _make_rrsig_signature_data( |
|
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], |
|
rrsig: RRSIG, |
|
origin: Optional[dns.name.Name] = None, |
|
) -> bytes: |
|
"""Create signature rdata. |
|
|
|
*rrset*, the RRset to sign/validate. This can be a |
|
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) |
|
tuple. |
|
|
|
*rrsig*, a ``dns.rdata.Rdata``, the signature to validate, or the |
|
signature template used when signing. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative |
|
names. |
|
|
|
Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by |
|
dnspython but not implemented. |
|
""" |
|
|
|
if isinstance(origin, str): |
|
origin = dns.name.from_text(origin, dns.name.root) |
|
|
|
signer = rrsig.signer |
|
if not signer.is_absolute(): |
|
if origin is None: |
|
raise ValidationFailure("relative RR name without an origin specified") |
|
signer = signer.derelativize(origin) |
|
|
|
|
|
|
|
rrname, rdataset = _get_rrname_rdataset(rrset) |
|
|
|
data = b"" |
|
data += rrsig.to_wire(origin=signer)[:18] |
|
data += rrsig.signer.to_digestable(signer) |
|
|
|
|
|
if not rrname.is_absolute(): |
|
if origin is None: |
|
raise ValidationFailure("relative RR name without an origin specified") |
|
rrname = rrname.derelativize(origin) |
|
|
|
name_len = len(rrname) |
|
if rrname.is_wild() and rrsig.labels != name_len - 2: |
|
raise ValidationFailure("wild owner name has wrong label length") |
|
if name_len - 1 < rrsig.labels: |
|
raise ValidationFailure("owner name longer than RRSIG labels") |
|
elif rrsig.labels < name_len - 1: |
|
suffix = rrname.split(rrsig.labels + 1)[1] |
|
rrname = dns.name.from_text("*", suffix) |
|
rrnamebuf = rrname.to_digestable() |
|
rrfixed = struct.pack("!HHI", rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl) |
|
rdatas = [rdata.to_digestable(origin) for rdata in rdataset] |
|
for rdata in sorted(rdatas): |
|
data += rrnamebuf |
|
data += rrfixed |
|
rrlen = struct.pack("!H", len(rdata)) |
|
data += rrlen |
|
data += rdata |
|
|
|
return data |
|
|
|
|
|
def _make_dnskey( |
|
public_key: PublicKey, |
|
algorithm: Union[int, str], |
|
flags: int = Flag.ZONE, |
|
protocol: int = 3, |
|
) -> DNSKEY: |
|
"""Convert a public key to DNSKEY Rdata |
|
|
|
*public_key*, a ``PublicKey`` (``GenericPublicKey`` or |
|
``cryptography.hazmat.primitives.asymmetric``) to convert. |
|
|
|
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. |
|
|
|
*flags*: DNSKEY flags field as an integer. |
|
|
|
*protocol*: DNSKEY protocol field as an integer. |
|
|
|
Raises ``ValueError`` if the specified key algorithm parameters are not |
|
unsupported, ``TypeError`` if the key type is unsupported, |
|
`UnsupportedAlgorithm` if the algorithm is unknown and |
|
`AlgorithmKeyMismatch` if the algorithm does not match the key type. |
|
|
|
Return DNSKEY ``Rdata``. |
|
""" |
|
|
|
algorithm = Algorithm.make(algorithm) |
|
|
|
if isinstance(public_key, GenericPublicKey): |
|
return public_key.to_dnskey(flags=flags, protocol=protocol) |
|
else: |
|
public_cls = get_algorithm_cls(algorithm).public_cls |
|
return public_cls(key=public_key).to_dnskey(flags=flags, protocol=protocol) |
|
|
|
|
|
def _make_cdnskey( |
|
public_key: PublicKey, |
|
algorithm: Union[int, str], |
|
flags: int = Flag.ZONE, |
|
protocol: int = 3, |
|
) -> CDNSKEY: |
|
"""Convert a public key to CDNSKEY Rdata |
|
|
|
*public_key*, the public key to convert, a |
|
``cryptography.hazmat.primitives.asymmetric`` public key class applicable |
|
for DNSSEC. |
|
|
|
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. |
|
|
|
*flags*: DNSKEY flags field as an integer. |
|
|
|
*protocol*: DNSKEY protocol field as an integer. |
|
|
|
Raises ``ValueError`` if the specified key algorithm parameters are not |
|
unsupported, ``TypeError`` if the key type is unsupported, |
|
`UnsupportedAlgorithm` if the algorithm is unknown and |
|
`AlgorithmKeyMismatch` if the algorithm does not match the key type. |
|
|
|
Return CDNSKEY ``Rdata``. |
|
""" |
|
|
|
dnskey = _make_dnskey(public_key, algorithm, flags, protocol) |
|
|
|
return CDNSKEY( |
|
rdclass=dnskey.rdclass, |
|
rdtype=dns.rdatatype.CDNSKEY, |
|
flags=dnskey.flags, |
|
protocol=dnskey.protocol, |
|
algorithm=dnskey.algorithm, |
|
key=dnskey.key, |
|
) |
|
|
|
|
|
def nsec3_hash( |
|
domain: Union[dns.name.Name, str], |
|
salt: Optional[Union[str, bytes]], |
|
iterations: int, |
|
algorithm: Union[int, str], |
|
) -> str: |
|
""" |
|
Calculate the NSEC3 hash, according to |
|
https://tools.ietf.org/html/rfc5155#section-5 |
|
|
|
*domain*, a ``dns.name.Name`` or ``str``, the name to hash. |
|
|
|
*salt*, a ``str``, ``bytes``, or ``None``, the hash salt. If a |
|
string, it is decoded as a hex string. |
|
|
|
*iterations*, an ``int``, the number of iterations. |
|
|
|
*algorithm*, a ``str`` or ``int``, the hash algorithm. |
|
The only defined algorithm is SHA1. |
|
|
|
Returns a ``str``, the encoded NSEC3 hash. |
|
""" |
|
|
|
b32_conversion = str.maketrans( |
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567", "0123456789ABCDEFGHIJKLMNOPQRSTUV" |
|
) |
|
|
|
try: |
|
if isinstance(algorithm, str): |
|
algorithm = NSEC3Hash[algorithm.upper()] |
|
except Exception: |
|
raise ValueError("Wrong hash algorithm (only SHA1 is supported)") |
|
|
|
if algorithm != NSEC3Hash.SHA1: |
|
raise ValueError("Wrong hash algorithm (only SHA1 is supported)") |
|
|
|
if salt is None: |
|
salt_encoded = b"" |
|
elif isinstance(salt, str): |
|
if len(salt) % 2 == 0: |
|
salt_encoded = bytes.fromhex(salt) |
|
else: |
|
raise ValueError("Invalid salt length") |
|
else: |
|
salt_encoded = salt |
|
|
|
if not isinstance(domain, dns.name.Name): |
|
domain = dns.name.from_text(domain) |
|
domain_encoded = domain.canonicalize().to_wire() |
|
assert domain_encoded is not None |
|
|
|
digest = hashlib.sha1(domain_encoded + salt_encoded).digest() |
|
for _ in range(iterations): |
|
digest = hashlib.sha1(digest + salt_encoded).digest() |
|
|
|
output = base64.b32encode(digest).decode("utf-8") |
|
output = output.translate(b32_conversion) |
|
|
|
return output |
|
|
|
|
|
def make_ds_rdataset( |
|
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], |
|
algorithms: Set[Union[DSDigest, str]], |
|
origin: Optional[dns.name.Name] = None, |
|
) -> dns.rdataset.Rdataset: |
|
"""Create a DS record from DNSKEY/CDNSKEY/CDS. |
|
|
|
*rrset*, the RRset to create DS Rdataset for. This can be a |
|
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) |
|
tuple. |
|
|
|
*algorithms*, a set of ``str`` or ``int`` specifying the hash algorithms. |
|
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case |
|
does not matter for these strings. If the RRset is a CDS, only digest |
|
algorithms matching algorithms are accepted. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, |
|
then it will be made absolute using the specified origin. |
|
|
|
Raises ``UnsupportedAlgorithm`` if any of the algorithms are unknown and |
|
``ValueError`` if the given RRset is not usable. |
|
|
|
Returns a ``dns.rdataset.Rdataset`` |
|
""" |
|
|
|
rrname, rdataset = _get_rrname_rdataset(rrset) |
|
|
|
if rdataset.rdtype not in ( |
|
dns.rdatatype.DNSKEY, |
|
dns.rdatatype.CDNSKEY, |
|
dns.rdatatype.CDS, |
|
): |
|
raise ValueError("rrset not a DNSKEY/CDNSKEY/CDS") |
|
|
|
_algorithms = set() |
|
for algorithm in algorithms: |
|
try: |
|
if isinstance(algorithm, str): |
|
algorithm = DSDigest[algorithm.upper()] |
|
except Exception: |
|
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) |
|
_algorithms.add(algorithm) |
|
|
|
if rdataset.rdtype == dns.rdatatype.CDS: |
|
res = [] |
|
for rdata in cds_rdataset_to_ds_rdataset(rdataset): |
|
if rdata.digest_type in _algorithms: |
|
res.append(rdata) |
|
if len(res) == 0: |
|
raise ValueError("no acceptable CDS rdata found") |
|
return dns.rdataset.from_rdata_list(rdataset.ttl, res) |
|
|
|
res = [] |
|
for algorithm in _algorithms: |
|
res.extend(dnskey_rdataset_to_cds_rdataset(rrname, rdataset, algorithm, origin)) |
|
return dns.rdataset.from_rdata_list(rdataset.ttl, res) |
|
|
|
|
|
def cds_rdataset_to_ds_rdataset( |
|
rdataset: dns.rdataset.Rdataset, |
|
) -> dns.rdataset.Rdataset: |
|
"""Create a CDS record from DS. |
|
|
|
*rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for. |
|
|
|
Raises ``ValueError`` if the rdataset is not CDS. |
|
|
|
Returns a ``dns.rdataset.Rdataset`` |
|
""" |
|
|
|
if rdataset.rdtype != dns.rdatatype.CDS: |
|
raise ValueError("rdataset not a CDS") |
|
res = [] |
|
for rdata in rdataset: |
|
res.append( |
|
CDS( |
|
rdclass=rdata.rdclass, |
|
rdtype=dns.rdatatype.DS, |
|
key_tag=rdata.key_tag, |
|
algorithm=rdata.algorithm, |
|
digest_type=rdata.digest_type, |
|
digest=rdata.digest, |
|
) |
|
) |
|
return dns.rdataset.from_rdata_list(rdataset.ttl, res) |
|
|
|
|
|
def dnskey_rdataset_to_cds_rdataset( |
|
name: Union[dns.name.Name, str], |
|
rdataset: dns.rdataset.Rdataset, |
|
algorithm: Union[DSDigest, str], |
|
origin: Optional[dns.name.Name] = None, |
|
) -> dns.rdataset.Rdataset: |
|
"""Create a CDS record from DNSKEY/CDNSKEY. |
|
|
|
*name*, a ``dns.name.Name`` or ``str``, the owner name of the CDS record. |
|
|
|
*rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for. |
|
|
|
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm. |
|
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case |
|
does not matter for these strings. |
|
|
|
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, |
|
then it will be made absolute using the specified origin. |
|
|
|
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown or |
|
``ValueError`` if the rdataset is not DNSKEY/CDNSKEY. |
|
|
|
Returns a ``dns.rdataset.Rdataset`` |
|
""" |
|
|
|
if rdataset.rdtype not in (dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY): |
|
raise ValueError("rdataset not a DNSKEY/CDNSKEY") |
|
res = [] |
|
for rdata in rdataset: |
|
res.append(make_cds(name, rdata, algorithm, origin)) |
|
return dns.rdataset.from_rdata_list(rdataset.ttl, res) |
|
|
|
|
|
def dnskey_rdataset_to_cdnskey_rdataset( |
|
rdataset: dns.rdataset.Rdataset, |
|
) -> dns.rdataset.Rdataset: |
|
"""Create a CDNSKEY record from DNSKEY. |
|
|
|
*rdataset*, a ``dns.rdataset.Rdataset``, to create CDNSKEY Rdataset for. |
|
|
|
Returns a ``dns.rdataset.Rdataset`` |
|
""" |
|
|
|
if rdataset.rdtype != dns.rdatatype.DNSKEY: |
|
raise ValueError("rdataset not a DNSKEY") |
|
res = [] |
|
for rdata in rdataset: |
|
res.append( |
|
CDNSKEY( |
|
rdclass=rdataset.rdclass, |
|
rdtype=rdataset.rdtype, |
|
flags=rdata.flags, |
|
protocol=rdata.protocol, |
|
algorithm=rdata.algorithm, |
|
key=rdata.key, |
|
) |
|
) |
|
return dns.rdataset.from_rdata_list(rdataset.ttl, res) |
|
|
|
|
|
def default_rrset_signer( |
|
txn: dns.transaction.Transaction, |
|
rrset: dns.rrset.RRset, |
|
signer: dns.name.Name, |
|
ksks: List[Tuple[PrivateKey, DNSKEY]], |
|
zsks: List[Tuple[PrivateKey, DNSKEY]], |
|
inception: Optional[Union[datetime, str, int, float]] = None, |
|
expiration: Optional[Union[datetime, str, int, float]] = None, |
|
lifetime: Optional[int] = None, |
|
policy: Optional[Policy] = None, |
|
origin: Optional[dns.name.Name] = None, |
|
) -> None: |
|
"""Default RRset signer""" |
|
|
|
if rrset.rdtype in set( |
|
[ |
|
dns.rdatatype.RdataType.DNSKEY, |
|
dns.rdatatype.RdataType.CDS, |
|
dns.rdatatype.RdataType.CDNSKEY, |
|
] |
|
): |
|
keys = ksks |
|
else: |
|
keys = zsks |
|
|
|
for private_key, dnskey in keys: |
|
rrsig = dns.dnssec.sign( |
|
rrset=rrset, |
|
private_key=private_key, |
|
dnskey=dnskey, |
|
inception=inception, |
|
expiration=expiration, |
|
lifetime=lifetime, |
|
signer=signer, |
|
policy=policy, |
|
origin=origin, |
|
) |
|
txn.add(rrset.name, rrset.ttl, rrsig) |
|
|
|
|
|
def sign_zone( |
|
zone: dns.zone.Zone, |
|
txn: Optional[dns.transaction.Transaction] = None, |
|
keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None, |
|
add_dnskey: bool = True, |
|
dnskey_ttl: Optional[int] = None, |
|
inception: Optional[Union[datetime, str, int, float]] = None, |
|
expiration: Optional[Union[datetime, str, int, float]] = None, |
|
lifetime: Optional[int] = None, |
|
nsec3: Optional[NSEC3PARAM] = None, |
|
rrset_signer: Optional[RRsetSigner] = None, |
|
policy: Optional[Policy] = None, |
|
) -> None: |
|
"""Sign zone. |
|
|
|
*zone*, a ``dns.zone.Zone``, the zone to sign. |
|
|
|
*txn*, a ``dns.transaction.Transaction``, an optional transaction to use for |
|
signing. |
|
|
|
*keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing. KSK/ZSK |
|
roles are assigned automatically if the SEP flag is used, otherwise all RRsets are |
|
signed by all keys. |
|
|
|
*add_dnskey*, a ``bool``. If ``True``, the default, all specified DNSKEYs are |
|
automatically added to the zone on signing. |
|
|
|
*dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified the TTL |
|
of the existing DNSKEY RRset used or the TTL of the SOA RRset. |
|
|
|
*inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature |
|
inception time. If ``None``, the current time is used. If a ``str``, the format is |
|
"YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX epoch in text |
|
form; this is the same the RRSIG rdata's text form. Values of type `int` or `float` |
|
are interpreted as seconds since the UNIX epoch. |
|
|
|
*expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature |
|
expiration time. If ``None``, the expiration time will be the inception time plus |
|
the value of the *lifetime* parameter. See the description of *inception* above for |
|
how the various parameter types are interpreted. |
|
|
|
*lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This |
|
parameter is only meaningful if *expiration* is ``None``. |
|
|
|
*nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet |
|
implemented. |
|
|
|
*rrset_signer*, a ``Callable``, an optional function for signing RRsets. The |
|
function requires two arguments: transaction and RRset. If the not specified, |
|
``dns.dnssec.default_rrset_signer`` will be used. |
|
|
|
Returns ``None``. |
|
""" |
|
|
|
ksks = [] |
|
zsks = [] |
|
|
|
|
|
|
|
if keys: |
|
for key in keys: |
|
if key[1].flags & Flag.SEP: |
|
ksks.append(key) |
|
else: |
|
zsks.append(key) |
|
if not ksks: |
|
ksks = keys |
|
if not zsks: |
|
zsks = keys |
|
else: |
|
keys = [] |
|
|
|
if txn: |
|
cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn) |
|
else: |
|
cm = zone.writer() |
|
|
|
with cm as _txn: |
|
if add_dnskey: |
|
if dnskey_ttl is None: |
|
dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY) |
|
if dnskey: |
|
dnskey_ttl = dnskey.ttl |
|
else: |
|
soa = _txn.get(zone.origin, dns.rdatatype.SOA) |
|
dnskey_ttl = soa.ttl |
|
for _, dnskey in keys: |
|
_txn.add(zone.origin, dnskey_ttl, dnskey) |
|
|
|
if nsec3: |
|
raise NotImplementedError("Signing with NSEC3 not yet implemented") |
|
else: |
|
_rrset_signer = rrset_signer or functools.partial( |
|
default_rrset_signer, |
|
signer=zone.origin, |
|
ksks=ksks, |
|
zsks=zsks, |
|
inception=inception, |
|
expiration=expiration, |
|
lifetime=lifetime, |
|
policy=policy, |
|
origin=zone.origin, |
|
) |
|
return _sign_zone_nsec(zone, _txn, _rrset_signer) |
|
|
|
|
|
def _sign_zone_nsec( |
|
zone: dns.zone.Zone, |
|
txn: dns.transaction.Transaction, |
|
rrset_signer: Optional[RRsetSigner] = None, |
|
) -> None: |
|
"""NSEC zone signer""" |
|
|
|
def _txn_add_nsec( |
|
txn: dns.transaction.Transaction, |
|
name: dns.name.Name, |
|
next_secure: Optional[dns.name.Name], |
|
rdclass: dns.rdataclass.RdataClass, |
|
ttl: int, |
|
rrset_signer: Optional[RRsetSigner] = None, |
|
) -> None: |
|
"""NSEC zone signer helper""" |
|
mandatory_types = set( |
|
[dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC] |
|
) |
|
node = txn.get_node(name) |
|
if node and next_secure: |
|
types = ( |
|
set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types |
|
) |
|
windows = Bitmap.from_rdtypes(list(types)) |
|
rrset = dns.rrset.from_rdata( |
|
name, |
|
ttl, |
|
NSEC( |
|
rdclass=rdclass, |
|
rdtype=dns.rdatatype.RdataType.NSEC, |
|
next=next_secure, |
|
windows=windows, |
|
), |
|
) |
|
txn.add(rrset) |
|
if rrset_signer: |
|
rrset_signer(txn, rrset) |
|
|
|
rrsig_ttl = zone.get_soa().minimum |
|
delegation = None |
|
last_secure = None |
|
|
|
for name in sorted(txn.iterate_names()): |
|
if delegation and name.is_subdomain(delegation): |
|
|
|
continue |
|
elif txn.get(name, dns.rdatatype.NS) and name != zone.origin: |
|
|
|
delegation = name |
|
else: |
|
|
|
delegation = None |
|
|
|
if rrset_signer: |
|
node = txn.get_node(name) |
|
if node: |
|
for rdataset in node.rdatasets: |
|
if rdataset.rdtype == dns.rdatatype.RRSIG: |
|
|
|
continue |
|
elif delegation and rdataset.rdtype != dns.rdatatype.DS: |
|
|
|
continue |
|
else: |
|
rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset) |
|
rrset_signer(txn, rrset) |
|
|
|
|
|
if last_secure is not None: |
|
_txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer) |
|
last_secure = name |
|
|
|
if last_secure: |
|
_txn_add_nsec( |
|
txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer |
|
) |
|
|
|
|
|
def _need_pyca(*args, **kwargs): |
|
raise ImportError( |
|
"DNSSEC validation requires python cryptography" |
|
) |
|
|
|
|
|
if dns._features.have("dnssec"): |
|
from cryptography.exceptions import InvalidSignature |
|
from cryptography.hazmat.primitives.asymmetric import dsa |
|
from cryptography.hazmat.primitives.asymmetric import ec |
|
from cryptography.hazmat.primitives.asymmetric import ed448 |
|
from cryptography.hazmat.primitives.asymmetric import rsa |
|
from cryptography.hazmat.primitives.asymmetric import ( |
|
ed25519, |
|
) |
|
|
|
from dns.dnssecalgs import ( |
|
get_algorithm_cls, |
|
get_algorithm_cls_from_dnskey, |
|
) |
|
from dns.dnssecalgs.base import GenericPrivateKey, GenericPublicKey |
|
|
|
validate = _validate |
|
validate_rrsig = _validate_rrsig |
|
sign = _sign |
|
make_dnskey = _make_dnskey |
|
make_cdnskey = _make_cdnskey |
|
_have_pyca = True |
|
else: |
|
validate = _need_pyca |
|
validate_rrsig = _need_pyca |
|
sign = _need_pyca |
|
make_dnskey = _need_pyca |
|
make_cdnskey = _need_pyca |
|
_have_pyca = False |
|
|
|
|
|
|
|
RSAMD5 = Algorithm.RSAMD5 |
|
DH = Algorithm.DH |
|
DSA = Algorithm.DSA |
|
ECC = Algorithm.ECC |
|
RSASHA1 = Algorithm.RSASHA1 |
|
DSANSEC3SHA1 = Algorithm.DSANSEC3SHA1 |
|
RSASHA1NSEC3SHA1 = Algorithm.RSASHA1NSEC3SHA1 |
|
RSASHA256 = Algorithm.RSASHA256 |
|
RSASHA512 = Algorithm.RSASHA512 |
|
ECCGOST = Algorithm.ECCGOST |
|
ECDSAP256SHA256 = Algorithm.ECDSAP256SHA256 |
|
ECDSAP384SHA384 = Algorithm.ECDSAP384SHA384 |
|
ED25519 = Algorithm.ED25519 |
|
ED448 = Algorithm.ED448 |
|
INDIRECT = Algorithm.INDIRECT |
|
PRIVATEDNS = Algorithm.PRIVATEDNS |
|
PRIVATEOID = Algorithm.PRIVATEOID |
|
|
|
|
|
|