Spaces:
Build error
Build error
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import uuid | |
from typing import Dict, List, Union | |
from unittest.mock import Mock | |
import pytest | |
from botbuilder.schema import Activity, ConversationReference, ChannelAccount, RoleTypes | |
from botframework.connector import Channels | |
from botframework.connector.auth import ( | |
AuthenticationConfiguration, | |
AuthenticationConstants, | |
JwtTokenValidation, | |
SimpleCredentialProvider, | |
EmulatorValidation, | |
EnterpriseChannelValidation, | |
ChannelValidation, | |
ClaimsIdentity, | |
MicrosoftAppCredentials, | |
# GovernmentConstants, | |
GovernmentChannelValidation, | |
SimpleChannelProvider, | |
ChannelProvider, | |
# AppCredentials, | |
) | |
async def jwt_token_validation_validate_auth_header_with_channel_service_succeeds( | |
app_id: str, | |
pwd: str, | |
channel_service_or_provider: Union[str, ChannelProvider], | |
header: str = None, | |
): | |
if header is None: | |
header = f"Bearer {MicrosoftAppCredentials(app_id, pwd).get_access_token()}" | |
credentials = SimpleCredentialProvider(app_id, pwd) | |
result = await JwtTokenValidation.validate_auth_header( | |
header, | |
credentials, | |
channel_service_or_provider, | |
"", | |
"https://webchat.botframework.com/", | |
) | |
assert result.is_authenticated | |
# TODO: Consider changing to unittest to use ddt for Credentials tests | |
class TestAuth: | |
EmulatorValidation.TO_BOT_FROM_EMULATOR_TOKEN_VALIDATION_PARAMETERS.ignore_expiration = ( | |
True | |
) | |
ChannelValidation.TO_BOT_FROM_CHANNEL_TOKEN_VALIDATION_PARAMETERS.ignore_expiration = ( | |
True | |
) | |
async def test_claims_validation(self): | |
claims: List[Dict] = {} | |
default_auth_config = AuthenticationConfiguration() | |
# No validator should pass. | |
await JwtTokenValidation.validate_claims(default_auth_config, claims) | |
mock_validator = Mock() | |
auth_with_validator = AuthenticationConfiguration( | |
claims_validator=mock_validator | |
) | |
# Configure IClaimsValidator to fail | |
mock_validator.side_effect = PermissionError("Invalid claims.") | |
with pytest.raises(PermissionError) as excinfo: | |
await JwtTokenValidation.validate_claims(auth_with_validator, claims) | |
assert "Invalid claims." in str(excinfo.value) | |
# No validator with not skill cliams should pass. | |
default_auth_config.claims_validator = None | |
claims: List[Dict] = { | |
AuthenticationConstants.VERSION_CLAIM: "1.0", | |
AuthenticationConstants.AUDIENCE_CLAIM: "this_bot_id", | |
AuthenticationConstants.APP_ID_CLAIM: "this_bot_id", # Skill claims aud!=azp | |
} | |
await JwtTokenValidation.validate_claims(default_auth_config, claims) | |
# No validator with skill cliams should fail. | |
claims: List[Dict] = { | |
AuthenticationConstants.VERSION_CLAIM: "1.0", | |
AuthenticationConstants.AUDIENCE_CLAIM: "this_bot_id", | |
AuthenticationConstants.APP_ID_CLAIM: "not_this_bot_id", # Skill claims aud!=azp | |
} | |
mock_validator.side_effect = PermissionError( | |
"Unauthorized Access. Request is not authorized. Skill Claims require validation." | |
) | |
with pytest.raises(PermissionError) as excinfo_skill: | |
await JwtTokenValidation.validate_claims(auth_with_validator, claims) | |
assert ( | |
"Unauthorized Access. Request is not authorized. Skill Claims require validation." | |
in str(excinfo_skill.value) | |
) | |
# @pytest.mark.asyncio | |
# async def test_connector_auth_header_correct_app_id_and_service_url_should_validate( | |
# self, | |
# ): | |
# header = ( | |
# "Bearer " | |
# MicrosoftAppCredentials( | |
# "", "" | |
# ).get_access_token() | |
# ) | |
# credentials = SimpleCredentialProvider( | |
# "", "" | |
# ) | |
# result = await JwtTokenValidation.validate_auth_header( | |
# header, credentials, "", "https://webchat.botframework.com/" | |
# ) | |
# | |
# result_with_provider = await JwtTokenValidation.validate_auth_header( | |
# header, | |
# credentials, | |
# SimpleChannelProvider(), | |
# "https://webchat.botframework.com/", | |
# ) | |
# | |
# assert result | |
# assert result_with_provider | |
# @pytest.mark.asyncio | |
# async def test_connector_auth_header_with_different_bot_app_id_should_not_validate( | |
# self, | |
# ): | |
# header = ( | |
# "Bearer " | |
# MicrosoftAppCredentials( | |
# "", "" | |
# ).get_access_token() | |
# ) | |
# credentials = SimpleCredentialProvider( | |
# "00000000-0000-0000-0000-000000000000", "" | |
# ) | |
# with pytest.raises(Exception) as excinfo: | |
# await JwtTokenValidation.validate_auth_header( | |
# header, credentials, "", "https://webchat.botframework.com/" | |
# ) | |
# assert "Unauthorized" in str(excinfo.value) | |
# | |
# with pytest.raises(Exception) as excinfo2: | |
# await JwtTokenValidation.validate_auth_header( | |
# header, | |
# credentials, | |
# SimpleChannelProvider(), | |
# "https://webchat.botframework.com/", | |
# ) | |
# assert "Unauthorized" in str(excinfo2.value) | |
# @pytest.mark.asyncio | |
# async def test_connector_auth_header_and_no_credential_should_not_validate(self): | |
# header = ( | |
# "Bearer " | |
# MicrosoftAppCredentials( | |
# "", "" | |
# ).get_access_token() | |
# ) | |
# credentials = SimpleCredentialProvider("", "") | |
# with pytest.raises(Exception) as excinfo: | |
# await JwtTokenValidation.validate_auth_header( | |
# header, credentials, "", "https://webchat.botframework.com/" | |
# ) | |
# assert "Unauthorized" in str(excinfo.value) | |
# | |
# with pytest.raises(Exception) as excinfo2: | |
# await JwtTokenValidation.validate_auth_header( | |
# header, | |
# credentials, | |
# SimpleChannelProvider(), | |
# "https://webchat.botframework.com/", | |
# ) | |
# assert "Unauthorized" in str(excinfo2.value) | |
async def test_empty_header_and_no_credential_should_throw(self): | |
header = "" | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await JwtTokenValidation.validate_auth_header(header, credentials, "", None) | |
assert "auth_header" in str(excinfo.value) | |
with pytest.raises(Exception) as excinfo2: | |
await JwtTokenValidation.validate_auth_header( | |
header, credentials, SimpleChannelProvider(), None | |
) | |
assert "auth_header" in str(excinfo2.value) | |
# @pytest.mark.asyncio | |
# async def test_emulator_msa_header_correct_app_id_and_service_url_should_validate( | |
# self, | |
# ): | |
# header = ( | |
# "Bearer " | |
# MicrosoftAppCredentials( | |
# "", "" | |
# ).get_access_token() | |
# ) | |
# credentials = SimpleCredentialProvider( | |
# "", "" | |
# ) | |
# result = await JwtTokenValidation.validate_auth_header( | |
# header, credentials, "", "https://webchat.botframework.com/" | |
# ) | |
# | |
# result_with_provider = await JwtTokenValidation.validate_auth_header( | |
# header, | |
# credentials, | |
# SimpleChannelProvider(), | |
# "https://webchat.botframework.com/", | |
# ) | |
# | |
# assert result | |
# assert result_with_provider | |
# @pytest.mark.asyncio | |
# async def test_emulator_msa_header_and_no_credential_should_not_validate(self): | |
# # pylint: disable=protected-access | |
# header = ( | |
# "Bearer " | |
# MicrosoftAppCredentials( | |
# "", "" | |
# ).get_access_token() | |
# ) | |
# credentials = SimpleCredentialProvider( | |
# "00000000-0000-0000-0000-000000000000", "" | |
# ) | |
# with pytest.raises(Exception) as excinfo: | |
# await JwtTokenValidation.validate_auth_header(header, credentials, "", None) | |
# assert "Unauthorized" in str(excinfo._excinfo) | |
# | |
# with pytest.raises(Exception) as excinfo2: | |
# await JwtTokenValidation.validate_auth_header( | |
# header, credentials, SimpleChannelProvider(), None | |
# ) | |
# assert "Unauthorized" in str(excinfo2._excinfo) | |
# Tests with a valid Token and service url; and ensures that Service url is added to Trusted service url list. | |
# @pytest.mark.asyncio | |
# async def test_channel_msa_header_valid_service_url_should_be_trusted(self): | |
# activity = Activity( | |
# service_url="https://smba.trafficmanager.net/amer-client-ss.msg/" | |
# ) | |
# header = ( | |
# "Bearer " | |
# MicrosoftAppCredentials( | |
# "", "" | |
# ).get_access_token() | |
# ) | |
# credentials = SimpleCredentialProvider( | |
# "", "" | |
# ) | |
# | |
# await JwtTokenValidation.authenticate_request(activity, header, credentials) | |
# | |
# assert AppCredentials.is_trusted_service( | |
# "https://smba.trafficmanager.net/amer-client-ss.msg/" | |
# ) | |
# @pytest.mark.asyncio | |
# # Tests with a valid Token and invalid service url and ensures that Service url is NOT added to | |
# # Trusted service url list. | |
# async def test_channel_msa_header_invalid_service_url_should_not_be_trusted(self): | |
# activity = Activity(service_url="https://webchat.botframework.com/") | |
# header = ( | |
# "Bearer " | |
# MicrosoftAppCredentials( | |
# "", "" | |
# ).get_access_token() | |
# ) | |
# credentials = SimpleCredentialProvider( | |
# "7f74513e-6f96-4dbc-be9d-9a81fea22b88", "" | |
# ) | |
# | |
# with pytest.raises(Exception) as excinfo: | |
# await JwtTokenValidation.authenticate_request(activity, header, credentials) | |
# assert "Unauthorized" in str(excinfo.value) | |
# | |
# assert not MicrosoftAppCredentials.is_trusted_service( | |
# "https://webchat.botframework.com/" | |
# ) | |
# Tests with a valid Token and invalid service url and ensures that Service url is NOT added to | |
# Trusted service url list. | |
async def test_channel_authentication_disabled_and_skill_should_be_anonymous(self): | |
activity = Activity( | |
channel_id=Channels.emulator, | |
service_url="https://webchat.botframework.com/", | |
relates_to=ConversationReference(), | |
recipient=ChannelAccount(role=RoleTypes.skill), | |
) | |
header = "" | |
credentials = SimpleCredentialProvider("", "") | |
claims_principal = await JwtTokenValidation.authenticate_request( | |
activity, header, credentials | |
) | |
assert ( | |
claims_principal.authentication_type | |
== AuthenticationConstants.ANONYMOUS_AUTH_TYPE | |
) | |
assert ( | |
JwtTokenValidation.get_app_id_from_claims(claims_principal.claims) | |
== AuthenticationConstants.ANONYMOUS_SKILL_APP_ID | |
) | |
# @pytest.mark.asyncio | |
# async def test_channel_msa_header_from_user_specified_tenant(self): | |
# activity = Activity( | |
# service_url="https://smba.trafficmanager.net/amer-client-ss.msg/" | |
# ) | |
# header = "Bearer " MicrosoftAppCredentials( | |
# "", "", "microsoft.com" | |
# ).get_access_token(True) | |
# credentials = SimpleCredentialProvider( | |
# "", "" | |
# ) | |
# | |
# claims = await JwtTokenValidation.authenticate_request( | |
# activity, header, credentials | |
# ) | |
# | |
# assert claims.get_claim_value("tid") == "72f988bf-86f1-41af-91ab-2d7cd011db47" | |
# Tests with no authentication header and makes sure the service URL is not added to the trusted list. | |
async def test_channel_authentication_disabled_should_be_anonymous(self): | |
activity = Activity(service_url="https://webchat.botframework.com/") | |
header = "" | |
credentials = SimpleCredentialProvider("", "") | |
claims_principal = await JwtTokenValidation.authenticate_request( | |
activity, header, credentials | |
) | |
assert ( | |
claims_principal.authentication_type | |
== AuthenticationConstants.ANONYMOUS_AUTH_TYPE | |
) | |
async def test_government_channel_validation_succeeds(self): | |
credentials = SimpleCredentialProvider("", "") | |
await GovernmentChannelValidation.validate_identity( | |
ClaimsIdentity( | |
{"iss": "https://api.botframework.us", "aud": credentials.app_id}, True | |
), | |
credentials, | |
) | |
async def test_government_channel_validation_no_authentication_fails(self): | |
with pytest.raises(Exception) as excinfo: | |
await GovernmentChannelValidation.validate_identity( | |
ClaimsIdentity({}, False), None | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
async def test_government_channel_validation_no_issuer_fails(self): | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await GovernmentChannelValidation.validate_identity( | |
ClaimsIdentity({"peanut": "peanut"}, True), credentials | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
async def test_government_channel_validation_wrong_issuer_fails(self): | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await GovernmentChannelValidation.validate_identity( | |
ClaimsIdentity({"iss": "peanut"}, True), credentials | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
# @pytest.mark.asyncio | |
# async def test_government_channel_validation_no_audience_fails(self): | |
# credentials = SimpleCredentialProvider( | |
# "", "" | |
# ) | |
# with pytest.raises(Exception) as excinfo: | |
# await GovernmentChannelValidation.validate_identity( | |
# ClaimsIdentity({"iss": "https://api.botframework.us"}, True), | |
# credentials, | |
# ) | |
# assert "Unauthorized" in str(excinfo.value) | |
async def test_government_channel_validation_wrong_audience_fails(self): | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await GovernmentChannelValidation.validate_identity( | |
ClaimsIdentity( | |
{"iss": "https://api.botframework.us", "aud": "peanut"}, True | |
), | |
credentials, | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
async def test_enterprise_channel_validation_succeeds(self): | |
credentials = SimpleCredentialProvider("", "") | |
await EnterpriseChannelValidation.validate_identity( | |
ClaimsIdentity( | |
{"iss": "https://api.botframework.com", "aud": credentials.app_id}, True | |
), | |
credentials, | |
) | |
async def test_enterprise_channel_validation_no_authentication_fails(self): | |
with pytest.raises(Exception) as excinfo: | |
await EnterpriseChannelValidation.validate_identity( | |
ClaimsIdentity({}, False), None | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
async def test_enterprise_channel_validation_no_issuer_fails(self): | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await EnterpriseChannelValidation.validate_identity( | |
ClaimsIdentity({"peanut": "peanut"}, True), credentials | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
async def test_enterprise_channel_validation_wrong_issuer_fails(self): | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await EnterpriseChannelValidation.validate_identity( | |
ClaimsIdentity({"iss": "peanut"}, True), credentials | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
async def test_enterprise_channel_validation_no_audience_fails(self): | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await GovernmentChannelValidation.validate_identity( | |
ClaimsIdentity({"iss": "https://api.botframework.com"}, True), | |
credentials, | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
async def test_enterprise_channel_validation_wrong_audience_fails(self): | |
credentials = SimpleCredentialProvider("", "") | |
with pytest.raises(Exception) as excinfo: | |
await GovernmentChannelValidation.validate_identity( | |
ClaimsIdentity( | |
{"iss": "https://api.botframework.com", "aud": "peanut"}, True | |
), | |
credentials, | |
) | |
assert "Unauthorized" in str(excinfo.value) | |
def test_get_app_id_from_claims(self): | |
v1_claims = {} | |
v2_claims = {} | |
app_id = str(uuid.uuid4()) | |
# Empty list | |
assert not JwtTokenValidation.get_app_id_from_claims(v1_claims) | |
# AppId there but no version (assumes v1) | |
v1_claims[AuthenticationConstants.APP_ID_CLAIM] = app_id | |
assert JwtTokenValidation.get_app_id_from_claims(v1_claims) == app_id | |
# AppId there with v1 version | |
v1_claims[AuthenticationConstants.VERSION_CLAIM] = "1.0" | |
assert JwtTokenValidation.get_app_id_from_claims(v1_claims) == app_id | |
# v2 version but no azp | |
v2_claims[AuthenticationConstants.VERSION_CLAIM] = "2.0" | |
assert not JwtTokenValidation.get_app_id_from_claims(v2_claims) | |
# v2 version but no azp | |
v2_claims[AuthenticationConstants.AUTHORIZED_PARTY] = app_id | |
assert JwtTokenValidation.get_app_id_from_claims(v2_claims) == app_id | |