cohit's picture
Upload folder using huggingface_hub
0827183 verified
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import uuid
from typing import Dict, List, Union
from unittest.mock import Mock
import pytest
from botbuilder.schema import Activity, ConversationReference, ChannelAccount, RoleTypes
from botframework.connector import Channels
from botframework.connector.auth import (
AuthenticationConfiguration,
AuthenticationConstants,
JwtTokenValidation,
SimpleCredentialProvider,
EmulatorValidation,
EnterpriseChannelValidation,
ChannelValidation,
ClaimsIdentity,
MicrosoftAppCredentials,
# GovernmentConstants,
GovernmentChannelValidation,
SimpleChannelProvider,
ChannelProvider,
# AppCredentials,
)
async def jwt_token_validation_validate_auth_header_with_channel_service_succeeds(
app_id: str,
pwd: str,
channel_service_or_provider: Union[str, ChannelProvider],
header: str = None,
):
if header is None:
header = f"Bearer {MicrosoftAppCredentials(app_id, pwd).get_access_token()}"
credentials = SimpleCredentialProvider(app_id, pwd)
result = await JwtTokenValidation.validate_auth_header(
header,
credentials,
channel_service_or_provider,
"",
"https://webchat.botframework.com/",
)
assert result.is_authenticated
# TODO: Consider changing to unittest to use ddt for Credentials tests
class TestAuth:
EmulatorValidation.TO_BOT_FROM_EMULATOR_TOKEN_VALIDATION_PARAMETERS.ignore_expiration = (
True
)
ChannelValidation.TO_BOT_FROM_CHANNEL_TOKEN_VALIDATION_PARAMETERS.ignore_expiration = (
True
)
@pytest.mark.asyncio
async def test_claims_validation(self):
claims: List[Dict] = {}
default_auth_config = AuthenticationConfiguration()
# No validator should pass.
await JwtTokenValidation.validate_claims(default_auth_config, claims)
mock_validator = Mock()
auth_with_validator = AuthenticationConfiguration(
claims_validator=mock_validator
)
# Configure IClaimsValidator to fail
mock_validator.side_effect = PermissionError("Invalid claims.")
with pytest.raises(PermissionError) as excinfo:
await JwtTokenValidation.validate_claims(auth_with_validator, claims)
assert "Invalid claims." in str(excinfo.value)
# No validator with not skill cliams should pass.
default_auth_config.claims_validator = None
claims: List[Dict] = {
AuthenticationConstants.VERSION_CLAIM: "1.0",
AuthenticationConstants.AUDIENCE_CLAIM: "this_bot_id",
AuthenticationConstants.APP_ID_CLAIM: "this_bot_id", # Skill claims aud!=azp
}
await JwtTokenValidation.validate_claims(default_auth_config, claims)
# No validator with skill cliams should fail.
claims: List[Dict] = {
AuthenticationConstants.VERSION_CLAIM: "1.0",
AuthenticationConstants.AUDIENCE_CLAIM: "this_bot_id",
AuthenticationConstants.APP_ID_CLAIM: "not_this_bot_id", # Skill claims aud!=azp
}
mock_validator.side_effect = PermissionError(
"Unauthorized Access. Request is not authorized. Skill Claims require validation."
)
with pytest.raises(PermissionError) as excinfo_skill:
await JwtTokenValidation.validate_claims(auth_with_validator, claims)
assert (
"Unauthorized Access. Request is not authorized. Skill Claims require validation."
in str(excinfo_skill.value)
)
# @pytest.mark.asyncio
# async def test_connector_auth_header_correct_app_id_and_service_url_should_validate(
# self,
# ):
# header = (
# "Bearer "
# MicrosoftAppCredentials(
# "", ""
# ).get_access_token()
# )
# credentials = SimpleCredentialProvider(
# "", ""
# )
# result = await JwtTokenValidation.validate_auth_header(
# header, credentials, "", "https://webchat.botframework.com/"
# )
#
# result_with_provider = await JwtTokenValidation.validate_auth_header(
# header,
# credentials,
# SimpleChannelProvider(),
# "https://webchat.botframework.com/",
# )
#
# assert result
# assert result_with_provider
# @pytest.mark.asyncio
# async def test_connector_auth_header_with_different_bot_app_id_should_not_validate(
# self,
# ):
# header = (
# "Bearer "
# MicrosoftAppCredentials(
# "", ""
# ).get_access_token()
# )
# credentials = SimpleCredentialProvider(
# "00000000-0000-0000-0000-000000000000", ""
# )
# with pytest.raises(Exception) as excinfo:
# await JwtTokenValidation.validate_auth_header(
# header, credentials, "", "https://webchat.botframework.com/"
# )
# assert "Unauthorized" in str(excinfo.value)
#
# with pytest.raises(Exception) as excinfo2:
# await JwtTokenValidation.validate_auth_header(
# header,
# credentials,
# SimpleChannelProvider(),
# "https://webchat.botframework.com/",
# )
# assert "Unauthorized" in str(excinfo2.value)
# @pytest.mark.asyncio
# async def test_connector_auth_header_and_no_credential_should_not_validate(self):
# header = (
# "Bearer "
# MicrosoftAppCredentials(
# "", ""
# ).get_access_token()
# )
# credentials = SimpleCredentialProvider("", "")
# with pytest.raises(Exception) as excinfo:
# await JwtTokenValidation.validate_auth_header(
# header, credentials, "", "https://webchat.botframework.com/"
# )
# assert "Unauthorized" in str(excinfo.value)
#
# with pytest.raises(Exception) as excinfo2:
# await JwtTokenValidation.validate_auth_header(
# header,
# credentials,
# SimpleChannelProvider(),
# "https://webchat.botframework.com/",
# )
# assert "Unauthorized" in str(excinfo2.value)
@pytest.mark.asyncio
async def test_empty_header_and_no_credential_should_throw(self):
header = ""
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await JwtTokenValidation.validate_auth_header(header, credentials, "", None)
assert "auth_header" in str(excinfo.value)
with pytest.raises(Exception) as excinfo2:
await JwtTokenValidation.validate_auth_header(
header, credentials, SimpleChannelProvider(), None
)
assert "auth_header" in str(excinfo2.value)
# @pytest.mark.asyncio
# async def test_emulator_msa_header_correct_app_id_and_service_url_should_validate(
# self,
# ):
# header = (
# "Bearer "
# MicrosoftAppCredentials(
# "", ""
# ).get_access_token()
# )
# credentials = SimpleCredentialProvider(
# "", ""
# )
# result = await JwtTokenValidation.validate_auth_header(
# header, credentials, "", "https://webchat.botframework.com/"
# )
#
# result_with_provider = await JwtTokenValidation.validate_auth_header(
# header,
# credentials,
# SimpleChannelProvider(),
# "https://webchat.botframework.com/",
# )
#
# assert result
# assert result_with_provider
# @pytest.mark.asyncio
# async def test_emulator_msa_header_and_no_credential_should_not_validate(self):
# # pylint: disable=protected-access
# header = (
# "Bearer "
# MicrosoftAppCredentials(
# "", ""
# ).get_access_token()
# )
# credentials = SimpleCredentialProvider(
# "00000000-0000-0000-0000-000000000000", ""
# )
# with pytest.raises(Exception) as excinfo:
# await JwtTokenValidation.validate_auth_header(header, credentials, "", None)
# assert "Unauthorized" in str(excinfo._excinfo)
#
# with pytest.raises(Exception) as excinfo2:
# await JwtTokenValidation.validate_auth_header(
# header, credentials, SimpleChannelProvider(), None
# )
# assert "Unauthorized" in str(excinfo2._excinfo)
# Tests with a valid Token and service url; and ensures that Service url is added to Trusted service url list.
# @pytest.mark.asyncio
# async def test_channel_msa_header_valid_service_url_should_be_trusted(self):
# activity = Activity(
# service_url="https://smba.trafficmanager.net/amer-client-ss.msg/"
# )
# header = (
# "Bearer "
# MicrosoftAppCredentials(
# "", ""
# ).get_access_token()
# )
# credentials = SimpleCredentialProvider(
# "", ""
# )
#
# await JwtTokenValidation.authenticate_request(activity, header, credentials)
#
# assert AppCredentials.is_trusted_service(
# "https://smba.trafficmanager.net/amer-client-ss.msg/"
# )
# @pytest.mark.asyncio
# # Tests with a valid Token and invalid service url and ensures that Service url is NOT added to
# # Trusted service url list.
# async def test_channel_msa_header_invalid_service_url_should_not_be_trusted(self):
# activity = Activity(service_url="https://webchat.botframework.com/")
# header = (
# "Bearer "
# MicrosoftAppCredentials(
# "", ""
# ).get_access_token()
# )
# credentials = SimpleCredentialProvider(
# "7f74513e-6f96-4dbc-be9d-9a81fea22b88", ""
# )
#
# with pytest.raises(Exception) as excinfo:
# await JwtTokenValidation.authenticate_request(activity, header, credentials)
# assert "Unauthorized" in str(excinfo.value)
#
# assert not MicrosoftAppCredentials.is_trusted_service(
# "https://webchat.botframework.com/"
# )
@pytest.mark.asyncio
# Tests with a valid Token and invalid service url and ensures that Service url is NOT added to
# Trusted service url list.
async def test_channel_authentication_disabled_and_skill_should_be_anonymous(self):
activity = Activity(
channel_id=Channels.emulator,
service_url="https://webchat.botframework.com/",
relates_to=ConversationReference(),
recipient=ChannelAccount(role=RoleTypes.skill),
)
header = ""
credentials = SimpleCredentialProvider("", "")
claims_principal = await JwtTokenValidation.authenticate_request(
activity, header, credentials
)
assert (
claims_principal.authentication_type
== AuthenticationConstants.ANONYMOUS_AUTH_TYPE
)
assert (
JwtTokenValidation.get_app_id_from_claims(claims_principal.claims)
== AuthenticationConstants.ANONYMOUS_SKILL_APP_ID
)
# @pytest.mark.asyncio
# async def test_channel_msa_header_from_user_specified_tenant(self):
# activity = Activity(
# service_url="https://smba.trafficmanager.net/amer-client-ss.msg/"
# )
# header = "Bearer " MicrosoftAppCredentials(
# "", "", "microsoft.com"
# ).get_access_token(True)
# credentials = SimpleCredentialProvider(
# "", ""
# )
#
# claims = await JwtTokenValidation.authenticate_request(
# activity, header, credentials
# )
#
# assert claims.get_claim_value("tid") == "72f988bf-86f1-41af-91ab-2d7cd011db47"
@pytest.mark.asyncio
# Tests with no authentication header and makes sure the service URL is not added to the trusted list.
async def test_channel_authentication_disabled_should_be_anonymous(self):
activity = Activity(service_url="https://webchat.botframework.com/")
header = ""
credentials = SimpleCredentialProvider("", "")
claims_principal = await JwtTokenValidation.authenticate_request(
activity, header, credentials
)
assert (
claims_principal.authentication_type
== AuthenticationConstants.ANONYMOUS_AUTH_TYPE
)
@pytest.mark.asyncio
async def test_government_channel_validation_succeeds(self):
credentials = SimpleCredentialProvider("", "")
await GovernmentChannelValidation.validate_identity(
ClaimsIdentity(
{"iss": "https://api.botframework.us", "aud": credentials.app_id}, True
),
credentials,
)
@pytest.mark.asyncio
async def test_government_channel_validation_no_authentication_fails(self):
with pytest.raises(Exception) as excinfo:
await GovernmentChannelValidation.validate_identity(
ClaimsIdentity({}, False), None
)
assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_government_channel_validation_no_issuer_fails(self):
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await GovernmentChannelValidation.validate_identity(
ClaimsIdentity({"peanut": "peanut"}, True), credentials
)
assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_government_channel_validation_wrong_issuer_fails(self):
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await GovernmentChannelValidation.validate_identity(
ClaimsIdentity({"iss": "peanut"}, True), credentials
)
assert "Unauthorized" in str(excinfo.value)
# @pytest.mark.asyncio
# async def test_government_channel_validation_no_audience_fails(self):
# credentials = SimpleCredentialProvider(
# "", ""
# )
# with pytest.raises(Exception) as excinfo:
# await GovernmentChannelValidation.validate_identity(
# ClaimsIdentity({"iss": "https://api.botframework.us"}, True),
# credentials,
# )
# assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_government_channel_validation_wrong_audience_fails(self):
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await GovernmentChannelValidation.validate_identity(
ClaimsIdentity(
{"iss": "https://api.botframework.us", "aud": "peanut"}, True
),
credentials,
)
assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_enterprise_channel_validation_succeeds(self):
credentials = SimpleCredentialProvider("", "")
await EnterpriseChannelValidation.validate_identity(
ClaimsIdentity(
{"iss": "https://api.botframework.com", "aud": credentials.app_id}, True
),
credentials,
)
@pytest.mark.asyncio
async def test_enterprise_channel_validation_no_authentication_fails(self):
with pytest.raises(Exception) as excinfo:
await EnterpriseChannelValidation.validate_identity(
ClaimsIdentity({}, False), None
)
assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_enterprise_channel_validation_no_issuer_fails(self):
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await EnterpriseChannelValidation.validate_identity(
ClaimsIdentity({"peanut": "peanut"}, True), credentials
)
assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_enterprise_channel_validation_wrong_issuer_fails(self):
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await EnterpriseChannelValidation.validate_identity(
ClaimsIdentity({"iss": "peanut"}, True), credentials
)
assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_enterprise_channel_validation_no_audience_fails(self):
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await GovernmentChannelValidation.validate_identity(
ClaimsIdentity({"iss": "https://api.botframework.com"}, True),
credentials,
)
assert "Unauthorized" in str(excinfo.value)
@pytest.mark.asyncio
async def test_enterprise_channel_validation_wrong_audience_fails(self):
credentials = SimpleCredentialProvider("", "")
with pytest.raises(Exception) as excinfo:
await GovernmentChannelValidation.validate_identity(
ClaimsIdentity(
{"iss": "https://api.botframework.com", "aud": "peanut"}, True
),
credentials,
)
assert "Unauthorized" in str(excinfo.value)
def test_get_app_id_from_claims(self):
v1_claims = {}
v2_claims = {}
app_id = str(uuid.uuid4())
# Empty list
assert not JwtTokenValidation.get_app_id_from_claims(v1_claims)
# AppId there but no version (assumes v1)
v1_claims[AuthenticationConstants.APP_ID_CLAIM] = app_id
assert JwtTokenValidation.get_app_id_from_claims(v1_claims) == app_id
# AppId there with v1 version
v1_claims[AuthenticationConstants.VERSION_CLAIM] = "1.0"
assert JwtTokenValidation.get_app_id_from_claims(v1_claims) == app_id
# v2 version but no azp
v2_claims[AuthenticationConstants.VERSION_CLAIM] = "2.0"
assert not JwtTokenValidation.get_app_id_from_claims(v2_claims)
# v2 version but no azp
v2_claims[AuthenticationConstants.AUTHORIZED_PARTY] = app_id
assert JwtTokenValidation.get_app_id_from_claims(v2_claims) == app_id