Spaces:
Running
Running
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
# Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. | |
# | |
# Permission to use, copy, modify, and distribute this software and its | |
# documentation for any purpose with or without fee is hereby granted, | |
# provided that the above copyright notice and this permission notice | |
# appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
"""DNS TSIG support.""" | |
import base64 | |
import hashlib | |
import hmac | |
import struct | |
import dns.exception | |
import dns.name | |
import dns.rcode | |
import dns.rdataclass | |
class BadTime(dns.exception.DNSException): | |
"""The current time is not within the TSIG's validity time.""" | |
class BadSignature(dns.exception.DNSException): | |
"""The TSIG signature fails to verify.""" | |
class BadKey(dns.exception.DNSException): | |
"""The TSIG record owner name does not match the key.""" | |
class BadAlgorithm(dns.exception.DNSException): | |
"""The TSIG algorithm does not match the key.""" | |
class PeerError(dns.exception.DNSException): | |
"""Base class for all TSIG errors generated by the remote peer""" | |
class PeerBadKey(PeerError): | |
"""The peer didn't know the key we used""" | |
class PeerBadSignature(PeerError): | |
"""The peer didn't like the signature we sent""" | |
class PeerBadTime(PeerError): | |
"""The peer didn't like the time we sent""" | |
class PeerBadTruncation(PeerError): | |
"""The peer didn't like amount of truncation in the TSIG we sent""" | |
# TSIG Algorithms | |
HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT") | |
HMAC_SHA1 = dns.name.from_text("hmac-sha1") | |
HMAC_SHA224 = dns.name.from_text("hmac-sha224") | |
HMAC_SHA256 = dns.name.from_text("hmac-sha256") | |
HMAC_SHA256_128 = dns.name.from_text("hmac-sha256-128") | |
HMAC_SHA384 = dns.name.from_text("hmac-sha384") | |
HMAC_SHA384_192 = dns.name.from_text("hmac-sha384-192") | |
HMAC_SHA512 = dns.name.from_text("hmac-sha512") | |
HMAC_SHA512_256 = dns.name.from_text("hmac-sha512-256") | |
GSS_TSIG = dns.name.from_text("gss-tsig") | |
default_algorithm = HMAC_SHA256 | |
mac_sizes = { | |
HMAC_SHA1: 20, | |
HMAC_SHA224: 28, | |
HMAC_SHA256: 32, | |
HMAC_SHA256_128: 16, | |
HMAC_SHA384: 48, | |
HMAC_SHA384_192: 24, | |
HMAC_SHA512: 64, | |
HMAC_SHA512_256: 32, | |
HMAC_MD5: 16, | |
GSS_TSIG: 128, # This is what we assume to be the worst case! | |
} | |
class GSSTSig: | |
""" | |
GSS-TSIG TSIG implementation. This uses the GSS-API context established | |
in the TKEY message handshake to sign messages using GSS-API message | |
integrity codes, per the RFC. | |
In order to avoid a direct GSSAPI dependency, the keyring holds a ref | |
to the GSSAPI object required, rather than the key itself. | |
""" | |
def __init__(self, gssapi_context): | |
self.gssapi_context = gssapi_context | |
self.data = b"" | |
self.name = "gss-tsig" | |
def update(self, data): | |
self.data += data | |
def sign(self): | |
# defer to the GSSAPI function to sign | |
return self.gssapi_context.get_signature(self.data) | |
def verify(self, expected): | |
try: | |
# defer to the GSSAPI function to verify | |
return self.gssapi_context.verify_signature(self.data, expected) | |
except Exception: | |
# note the usage of a bare exception | |
raise BadSignature | |
class GSSTSigAdapter: | |
def __init__(self, keyring): | |
self.keyring = keyring | |
def __call__(self, message, keyname): | |
if keyname in self.keyring: | |
key = self.keyring[keyname] | |
if isinstance(key, Key) and key.algorithm == GSS_TSIG: | |
if message: | |
GSSTSigAdapter.parse_tkey_and_step(key, message, keyname) | |
return key | |
else: | |
return None | |
def parse_tkey_and_step(cls, key, message, keyname): | |
# if the message is a TKEY type, absorb the key material | |
# into the context using step(); this is used to allow the | |
# client to complete the GSSAPI negotiation before attempting | |
# to verify the signed response to a TKEY message exchange | |
try: | |
rrset = message.find_rrset( | |
message.answer, keyname, dns.rdataclass.ANY, dns.rdatatype.TKEY | |
) | |
if rrset: | |
token = rrset[0].key | |
gssapi_context = key.secret | |
return gssapi_context.step(token) | |
except KeyError: | |
pass | |
class HMACTSig: | |
""" | |
HMAC TSIG implementation. This uses the HMAC python module to handle the | |
sign/verify operations. | |
""" | |
_hashes = { | |
HMAC_SHA1: hashlib.sha1, | |
HMAC_SHA224: hashlib.sha224, | |
HMAC_SHA256: hashlib.sha256, | |
HMAC_SHA256_128: (hashlib.sha256, 128), | |
HMAC_SHA384: hashlib.sha384, | |
HMAC_SHA384_192: (hashlib.sha384, 192), | |
HMAC_SHA512: hashlib.sha512, | |
HMAC_SHA512_256: (hashlib.sha512, 256), | |
HMAC_MD5: hashlib.md5, | |
} | |
def __init__(self, key, algorithm): | |
try: | |
hashinfo = self._hashes[algorithm] | |
except KeyError: | |
raise NotImplementedError(f"TSIG algorithm {algorithm} is not supported") | |
# create the HMAC context | |
if isinstance(hashinfo, tuple): | |
self.hmac_context = hmac.new(key, digestmod=hashinfo[0]) | |
self.size = hashinfo[1] | |
else: | |
self.hmac_context = hmac.new(key, digestmod=hashinfo) | |
self.size = None | |
self.name = self.hmac_context.name | |
if self.size: | |
self.name += f"-{self.size}" | |
def update(self, data): | |
return self.hmac_context.update(data) | |
def sign(self): | |
# defer to the HMAC digest() function for that digestmod | |
digest = self.hmac_context.digest() | |
if self.size: | |
digest = digest[: (self.size // 8)] | |
return digest | |
def verify(self, expected): | |
# re-digest and compare the results | |
mac = self.sign() | |
if not hmac.compare_digest(mac, expected): | |
raise BadSignature | |
def _digest(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=None): | |
"""Return a context containing the TSIG rdata for the input parameters | |
@rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object | |
@raises ValueError: I{other_data} is too long | |
@raises NotImplementedError: I{algorithm} is not supported | |
""" | |
first = not (ctx and multi) | |
if first: | |
ctx = get_context(key) | |
if request_mac: | |
ctx.update(struct.pack("!H", len(request_mac))) | |
ctx.update(request_mac) | |
ctx.update(struct.pack("!H", rdata.original_id)) | |
ctx.update(wire[2:]) | |
if first: | |
ctx.update(key.name.to_digestable()) | |
ctx.update(struct.pack("!H", dns.rdataclass.ANY)) | |
ctx.update(struct.pack("!I", 0)) | |
if time is None: | |
time = rdata.time_signed | |
upper_time = (time >> 32) & 0xFFFF | |
lower_time = time & 0xFFFFFFFF | |
time_encoded = struct.pack("!HIH", upper_time, lower_time, rdata.fudge) | |
other_len = len(rdata.other) | |
if other_len > 65535: | |
raise ValueError("TSIG Other Data is > 65535 bytes") | |
if first: | |
ctx.update(key.algorithm.to_digestable() + time_encoded) | |
ctx.update(struct.pack("!HH", rdata.error, other_len) + rdata.other) | |
else: | |
ctx.update(time_encoded) | |
return ctx | |
def _maybe_start_digest(key, mac, multi): | |
"""If this is the first message in a multi-message sequence, | |
start a new context. | |
@rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object | |
""" | |
if multi: | |
ctx = get_context(key) | |
ctx.update(struct.pack("!H", len(mac))) | |
ctx.update(mac) | |
return ctx | |
else: | |
return None | |
def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False): | |
"""Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata | |
for the input parameters, the HMAC MAC calculated by applying the | |
TSIG signature algorithm, and the TSIG digest context. | |
@rtype: (string, dns.tsig.HMACTSig or dns.tsig.GSSTSig object) | |
@raises ValueError: I{other_data} is too long | |
@raises NotImplementedError: I{algorithm} is not supported | |
""" | |
ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi) | |
mac = ctx.sign() | |
tsig = rdata.replace(time_signed=time, mac=mac) | |
return (tsig, _maybe_start_digest(key, mac, multi)) | |
def validate( | |
wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None, multi=False | |
): | |
"""Validate the specified TSIG rdata against the other input parameters. | |
@raises FormError: The TSIG is badly formed. | |
@raises BadTime: There is too much time skew between the client and the | |
server. | |
@raises BadSignature: The TSIG signature did not validate | |
@rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object""" | |
(adcount,) = struct.unpack("!H", wire[10:12]) | |
if adcount == 0: | |
raise dns.exception.FormError | |
adcount -= 1 | |
new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start] | |
if rdata.error != 0: | |
if rdata.error == dns.rcode.BADSIG: | |
raise PeerBadSignature | |
elif rdata.error == dns.rcode.BADKEY: | |
raise PeerBadKey | |
elif rdata.error == dns.rcode.BADTIME: | |
raise PeerBadTime | |
elif rdata.error == dns.rcode.BADTRUNC: | |
raise PeerBadTruncation | |
else: | |
raise PeerError("unknown TSIG error code %d" % rdata.error) | |
if abs(rdata.time_signed - now) > rdata.fudge: | |
raise BadTime | |
if key.name != owner: | |
raise BadKey | |
if key.algorithm != rdata.algorithm: | |
raise BadAlgorithm | |
ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi) | |
ctx.verify(rdata.mac) | |
return _maybe_start_digest(key, rdata.mac, multi) | |
def get_context(key): | |
"""Returns an HMAC context for the specified key. | |
@rtype: HMAC context | |
@raises NotImplementedError: I{algorithm} is not supported | |
""" | |
if key.algorithm == GSS_TSIG: | |
return GSSTSig(key.secret) | |
else: | |
return HMACTSig(key.secret, key.algorithm) | |
class Key: | |
def __init__(self, name, secret, algorithm=default_algorithm): | |
if isinstance(name, str): | |
name = dns.name.from_text(name) | |
self.name = name | |
if isinstance(secret, str): | |
secret = base64.decodebytes(secret.encode()) | |
self.secret = secret | |
if isinstance(algorithm, str): | |
algorithm = dns.name.from_text(algorithm) | |
self.algorithm = algorithm | |
def __eq__(self, other): | |
return ( | |
isinstance(other, Key) | |
and self.name == other.name | |
and self.secret == other.secret | |
and self.algorithm == other.algorithm | |
) | |
def __repr__(self): | |
r = f"<DNS key name='{self.name}', " + f"algorithm='{self.algorithm}'" | |
if self.algorithm != GSS_TSIG: | |
r += f", secret='{base64.b64encode(self.secret).decode()}'" | |
r += ">" | |
return r | |