Spaces:
Running
Running
# Copyright (c) 2019-2022 by Ron Frederick <ronf@timeheart.net> and others. | |
# | |
# This program and the accompanying materials are made available under | |
# the terms of the Eclipse Public License v2.0 which accompanies this | |
# distribution and is available at: | |
# | |
# http://www.eclipse.org/legal/epl-2.0/ | |
# | |
# This program may also be made available under the following secondary | |
# licenses when the conditions for such availability set forth in the | |
# Eclipse Public License v2.0 are satisfied: | |
# | |
# GNU General Public License, Version 2.0, or any later versions of | |
# that license | |
# | |
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later | |
# | |
# Contributors: | |
# Ron Frederick - initial implementation, API, and documentation | |
"""U2F security key handler""" | |
from hashlib import sha256 | |
import hmac | |
import time | |
from typing import Callable, List, Mapping, NoReturn, Optional | |
from typing import Sequence, Tuple, TypeVar, cast | |
_PollResult = TypeVar('_PollResult') | |
_SKResidentKey = Tuple[int, str, bytes, bytes] | |
_CTAP1_POLL_INTERVAL = 0.1 | |
_dummy_hash = 32 * b'\0' | |
# Flags | |
SSH_SK_USER_PRESENCE_REQD = 0x01 | |
# Algorithms | |
SSH_SK_ECDSA = -7 | |
SSH_SK_ED25519 = -8 | |
def _decode_public_key(alg: int, public_key: Mapping[int, object]) -> bytes: | |
"""Decode algorithm and public value from a CTAP public key""" | |
result = cast(bytes, public_key[-2]) | |
if alg == SSH_SK_ED25519: | |
return result | |
else: | |
return b'\x04' + result + cast(bytes, public_key[-3]) | |
def _ctap1_poll(poll_interval: float, func: Callable[..., _PollResult], | |
*args: object) -> _PollResult: | |
"""Poll until a CTAP1 response is received""" | |
while True: | |
try: | |
return func(*args) | |
except ApduError as exc: | |
if exc.code != APDU.USE_NOT_SATISFIED: | |
raise | |
time.sleep(poll_interval) | |
def _ctap1_enroll(dev: 'CtapHidDevice', alg: int, | |
application: bytes) -> Tuple[bytes, bytes]: | |
"""Enroll a new security key using CTAP version 1""" | |
ctap1 = Ctap1(dev) | |
if alg != SSH_SK_ECDSA: | |
raise ValueError('Unsupported algorithm') | |
app_hash = sha256(application).digest() | |
registration = _ctap1_poll(_CTAP1_POLL_INTERVAL, ctap1.register, | |
_dummy_hash, app_hash) | |
return registration.public_key, registration.key_handle | |
def _ctap2_enroll(dev: 'CtapHidDevice', alg: int, application: bytes, | |
user: str, pin: Optional[str], | |
resident: bool) -> Tuple[bytes, bytes]: | |
"""Enroll a new security key using CTAP version 2""" | |
ctap2 = Ctap2(dev) | |
application = application.decode('utf-8') | |
rp = {'id': application, 'name': application} | |
user_cred = {'id': user.encode('utf-8'), 'name': user} | |
key_params = [{'type': 'public-key', 'alg': alg}] | |
options = {'rk': resident} | |
pin_protocol: Optional[PinProtocolV1] | |
pin_auth: Optional[bytes] | |
if pin: | |
pin_protocol = PinProtocolV1() | |
pin_token = ClientPin(ctap2, pin_protocol).get_pin_token(pin) | |
pin_auth = hmac.new(pin_token, _dummy_hash, sha256).digest()[:16] | |
else: | |
pin_protocol = None | |
pin_auth = None | |
pin_version = pin_protocol.VERSION if pin_protocol else None | |
cred = ctap2.make_credential(_dummy_hash, rp, user_cred, key_params, | |
options=options, pin_uv_param=pin_auth, | |
pin_uv_protocol=pin_version) | |
cdata = cred.auth_data.credential_data | |
# pylint: disable=no-member | |
return _decode_public_key(alg, cdata.public_key), cdata.credential_id | |
def _ctap1_sign(dev: 'CtapHidDevice', message_hash: bytes, application: bytes, | |
key_handle: bytes) -> Tuple[int, int, bytes]: | |
"""Sign a message with a security key using CTAP version 1""" | |
ctap1 = Ctap1(dev) | |
app_hash = sha256(application).digest() | |
auth_response = _ctap1_poll(_CTAP1_POLL_INTERVAL, ctap1.authenticate, | |
message_hash, app_hash, key_handle) | |
flags = auth_response[0] | |
counter = int.from_bytes(auth_response[1:5], 'big') | |
sig = auth_response[5:] | |
return flags, counter, sig | |
def _ctap2_sign(dev: 'CtapHidDevice', message_hash: bytes, | |
application: bytes, key_handle: bytes, | |
touch_required: bool) -> Tuple[int, int, bytes]: | |
"""Sign a message with a security key using CTAP version 2""" | |
ctap2 = Ctap2(dev) | |
application = application.decode('utf-8') | |
allow_creds = [{'type': 'public-key', 'id': key_handle}] | |
options = {'up': touch_required} | |
assertion = ctap2.get_assertions(application, message_hash, allow_creds, | |
options=options)[0] | |
auth_data = assertion.auth_data | |
return auth_data.flags, auth_data.counter, assertion.signature | |
def sk_enroll(alg: int, application: bytes, user: str, | |
pin: Optional[str], resident: bool) -> Tuple[bytes, bytes]: | |
"""Enroll a new security key""" | |
try: | |
dev = next(CtapHidDevice.list_devices()) | |
except StopIteration: | |
raise ValueError('No security key found') from None | |
try: | |
return _ctap2_enroll(dev, alg, application, user, pin, resident) | |
except CtapError as exc: | |
if exc.code == CtapError.ERR.PUAT_REQUIRED: | |
raise ValueError('PIN required') from None | |
elif exc.code == CtapError.ERR.PIN_INVALID: | |
raise ValueError('Invalid PIN') from None | |
else: | |
raise ValueError(str(exc)) from None | |
except ValueError as exc: | |
try: | |
return _ctap1_enroll(dev, alg, application) | |
except ApduError as exc: | |
raise ValueError(str(exc)) from None | |
finally: | |
dev.close() | |
def sk_sign(message_hash: bytes, application: bytes, key_handle: bytes, | |
flags: int) -> Tuple[int, int, bytes]: | |
"""Sign a message with a security key""" | |
touch_required = bool(flags & SSH_SK_USER_PRESENCE_REQD) | |
for dev in CtapHidDevice.list_devices(): | |
try: | |
return _ctap2_sign(dev, message_hash, application, | |
key_handle, touch_required) | |
except CtapError as exc: | |
if exc.code != CtapError.ERR.NO_CREDENTIALS: | |
raise ValueError(str(exc)) from None | |
except ValueError: | |
try: | |
return _ctap1_sign(dev, message_hash, application, key_handle) | |
except ApduError as exc: | |
if exc.code != APDU.WRONG_DATA: | |
raise ValueError(str(exc)) from None | |
finally: | |
dev.close() | |
raise ValueError('Security key credential not found') | |
def sk_get_resident(application: bytes, user: Optional[str], | |
pin: str) -> Sequence[_SKResidentKey]: | |
"""Get keys resident on a security key""" | |
app_hash = sha256(application).digest() | |
result: List[_SKResidentKey] = [] | |
for dev in CtapHidDevice.list_devices(): | |
try: | |
ctap2 = Ctap2(dev) | |
pin_protocol = PinProtocolV1() | |
pin_token = ClientPin(ctap2, pin_protocol).get_pin_token(pin) | |
cred_mgmt = CredentialManagement(ctap2, pin_protocol, pin_token) | |
for cred in cred_mgmt.enumerate_creds(app_hash): | |
user_info = cast(Mapping[str, object], | |
cred[CredentialManagement.RESULT.USER]) | |
name = cast(str, user_info['name']) | |
if user and name != user: | |
continue | |
cred_id = cast(Mapping[str, object], | |
cred[CredentialManagement.RESULT.CREDENTIAL_ID]) | |
key_handle = cast(bytes, cred_id['id']) | |
public_key = cast(Mapping[int, object], | |
cred[CredentialManagement.RESULT.PUBLIC_KEY]) | |
alg = cast(int, public_key[3]) | |
public_value = _decode_public_key(alg, public_key) | |
result.append((alg, name, public_value, key_handle)) | |
except CtapError as exc: | |
if exc.code == CtapError.ERR.NO_CREDENTIALS: | |
continue | |
elif exc.code == CtapError.ERR.PIN_INVALID: | |
raise ValueError('Invalid PIN') from None | |
elif exc.code == CtapError.ERR.PIN_NOT_SET: | |
raise ValueError('PIN not set') from None | |
else: | |
raise ValueError(str(exc)) from None | |
finally: | |
dev.close() | |
return result | |
try: | |
from fido2.hid import CtapHidDevice | |
from fido2.ctap import CtapError | |
from fido2.ctap1 import Ctap1, APDU, ApduError | |
from fido2.ctap2 import Ctap2, ClientPin, PinProtocolV1 | |
from fido2.ctap2 import CredentialManagement | |
sk_available = True | |
except (ImportError, OSError, AttributeError): # pragma: no cover | |
sk_available = False | |
def _sk_not_available(*args: object, **kwargs: object) -> NoReturn: | |
"""Report that security key support is unavailable""" | |
raise ValueError('Security key support not available') | |
sk_enroll = _sk_not_available | |
sk_sign = _sk_not_available | |
sk_get_resident = _sk_not_available | |