Spaces:
Build error
Build error
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import uuid | |
from http import HTTPStatus | |
from typing import Callable, Union, List | |
from unittest.mock import Mock | |
import aiounittest | |
from botframework.connector.token_api.models import TokenExchangeResource | |
from botbuilder.core import ( | |
ConversationState, | |
MemoryStorage, | |
InvokeResponse, | |
TurnContext, | |
MessageFactory, | |
) | |
from botbuilder.core.card_factory import ContentTypes | |
from botbuilder.core.skills import ( | |
BotFrameworkSkill, | |
ConversationIdFactoryBase, | |
SkillConversationIdFactoryOptions, | |
SkillConversationReference, | |
BotFrameworkClient, | |
) | |
from botbuilder.schema import ( | |
Activity, | |
ActivityTypes, | |
ConversationReference, | |
OAuthCard, | |
Attachment, | |
ConversationAccount, | |
ChannelAccount, | |
ExpectedReplies, | |
DeliveryModes, | |
) | |
from botbuilder.testing import DialogTestClient | |
from botbuilder.dialogs import ( | |
SkillDialog, | |
SkillDialogOptions, | |
BeginSkillDialogOptions, | |
DialogTurnStatus, | |
) | |
class SimpleConversationIdFactory( | |
ConversationIdFactoryBase | |
): # pylint: disable=abstract-method | |
def __init__(self): | |
self.conversation_refs = {} | |
self.create_count = 0 | |
async def create_skill_conversation_id( | |
self, | |
options_or_conversation_reference: Union[ | |
SkillConversationIdFactoryOptions, ConversationReference | |
], | |
) -> str: | |
self.create_count += 1 | |
key = ( | |
options_or_conversation_reference.activity.conversation.id | |
+ options_or_conversation_reference.activity.service_url | |
) | |
if key not in self.conversation_refs: | |
self.conversation_refs[key] = SkillConversationReference( | |
conversation_reference=TurnContext.get_conversation_reference( | |
options_or_conversation_reference.activity | |
), | |
oauth_scope=options_or_conversation_reference.from_bot_oauth_scope, | |
) | |
return key | |
async def get_skill_conversation_reference( | |
self, skill_conversation_id: str | |
) -> SkillConversationReference: | |
return self.conversation_refs[skill_conversation_id] | |
async def delete_conversation_reference(self, skill_conversation_id: str): | |
self.conversation_refs.pop(skill_conversation_id, None) | |
return | |
class SkillDialogTests(aiounittest.AsyncTestCase): | |
async def test_constructor_validation_test(self): | |
# missing dialog_id | |
with self.assertRaises(TypeError): | |
SkillDialog(SkillDialogOptions(), None) | |
# missing dialog options | |
with self.assertRaises(TypeError): | |
SkillDialog(None, "dialog_id") | |
async def test_begin_dialog_options_validation(self): | |
dialog_options = SkillDialogOptions() | |
sut = SkillDialog(dialog_options, dialog_id="dialog_id") | |
# empty options should raise | |
client = DialogTestClient("test", sut) | |
with self.assertRaises(TypeError): | |
await client.send_activity("irrelevant") | |
# non DialogArgs should raise | |
client = DialogTestClient("test", sut, {}) | |
with self.assertRaises(TypeError): | |
await client.send_activity("irrelevant") | |
# Activity in DialogArgs should be set | |
client = DialogTestClient("test", sut, BeginSkillDialogOptions(None)) | |
with self.assertRaises(TypeError): | |
await client.send_activity("irrelevant") | |
async def test_begin_dialog_calls_skill_no_deliverymode(self): | |
return await self.begin_dialog_calls_skill(None) | |
async def test_begin_dialog_calls_skill_expect_replies(self): | |
return await self.begin_dialog_calls_skill(DeliveryModes.expect_replies) | |
async def begin_dialog_calls_skill(self, deliver_mode: str): | |
activity_sent = None | |
from_bot_id_sent = None | |
to_bot_id_sent = None | |
to_url_sent = None | |
async def capture( | |
from_bot_id: str, | |
to_bot_id: str, | |
to_url: str, | |
service_url: str, # pylint: disable=unused-argument | |
conversation_id: str, # pylint: disable=unused-argument | |
activity: Activity, | |
): | |
nonlocal from_bot_id_sent, to_bot_id_sent, to_url_sent, activity_sent | |
from_bot_id_sent = from_bot_id | |
to_bot_id_sent = to_bot_id | |
to_url_sent = to_url | |
activity_sent = activity | |
mock_skill_client = self._create_mock_skill_client(capture) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client | |
) | |
sut = SkillDialog(dialog_options, "dialog_id") | |
activity_to_send = MessageFactory.text(str(uuid.uuid4())) | |
activity_to_send.delivery_mode = deliver_mode | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions(activity=activity_to_send), | |
conversation_state=conversation_state, | |
) | |
assert len(dialog_options.conversation_id_factory.conversation_refs) == 0 | |
# Send something to the dialog to start it | |
await client.send_activity(MessageFactory.text("irrelevant")) | |
# Assert results and data sent to the SkillClient for fist turn | |
assert len(dialog_options.conversation_id_factory.conversation_refs) == 1 | |
assert dialog_options.bot_id == from_bot_id_sent | |
assert dialog_options.skill.app_id == to_bot_id_sent | |
assert dialog_options.skill.skill_endpoint == to_url_sent | |
assert activity_to_send.text == activity_sent.text | |
assert DialogTurnStatus.Waiting == client.dialog_turn_result.status | |
# Send a second message to continue the dialog | |
await client.send_activity(MessageFactory.text("Second message")) | |
# Assert results for second turn | |
assert len(dialog_options.conversation_id_factory.conversation_refs) == 1 | |
assert activity_sent.text == "Second message" | |
assert DialogTurnStatus.Waiting == client.dialog_turn_result.status | |
# Send EndOfConversation to the dialog | |
await client.send_activity(Activity(type=ActivityTypes.end_of_conversation)) | |
# Assert we are done. | |
assert DialogTurnStatus.Complete == client.dialog_turn_result.status | |
async def test_should_handle_invoke_activities(self): | |
activity_sent = None | |
from_bot_id_sent = None | |
to_bot_id_sent = None | |
to_url_sent = None | |
async def capture( | |
from_bot_id: str, | |
to_bot_id: str, | |
to_url: str, | |
service_url: str, # pylint: disable=unused-argument | |
conversation_id: str, # pylint: disable=unused-argument | |
activity: Activity, | |
): | |
nonlocal from_bot_id_sent, to_bot_id_sent, to_url_sent, activity_sent | |
from_bot_id_sent = from_bot_id | |
to_bot_id_sent = to_bot_id | |
to_url_sent = to_url | |
activity_sent = activity | |
mock_skill_client = self._create_mock_skill_client(capture) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client | |
) | |
sut = SkillDialog(dialog_options, "dialog_id") | |
activity_to_send = Activity( | |
type=ActivityTypes.invoke, | |
name=str(uuid.uuid4()), | |
) | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions(activity=activity_to_send), | |
conversation_state=conversation_state, | |
) | |
# Send something to the dialog to start it | |
await client.send_activity(MessageFactory.text("irrelevant")) | |
# Assert results and data sent to the SkillClient for fist turn | |
assert dialog_options.bot_id == from_bot_id_sent | |
assert dialog_options.skill.app_id == to_bot_id_sent | |
assert dialog_options.skill.skill_endpoint == to_url_sent | |
assert activity_to_send.text == activity_sent.text | |
assert DialogTurnStatus.Waiting == client.dialog_turn_result.status | |
# Send a second message to continue the dialog | |
await client.send_activity(MessageFactory.text("Second message")) | |
# Assert results for second turn | |
assert activity_sent.text == "Second message" | |
assert DialogTurnStatus.Waiting == client.dialog_turn_result.status | |
# Send EndOfConversation to the dialog | |
await client.send_activity(Activity(type=ActivityTypes.end_of_conversation)) | |
# Assert we are done. | |
assert DialogTurnStatus.Complete == client.dialog_turn_result.status | |
async def test_cancel_dialog_sends_eoc(self): | |
activity_sent = None | |
async def capture( | |
from_bot_id: str, # pylint: disable=unused-argument | |
to_bot_id: str, # pylint: disable=unused-argument | |
to_url: str, # pylint: disable=unused-argument | |
service_url: str, # pylint: disable=unused-argument | |
conversation_id: str, # pylint: disable=unused-argument | |
activity: Activity, | |
): | |
nonlocal activity_sent | |
activity_sent = activity | |
mock_skill_client = self._create_mock_skill_client(capture) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client | |
) | |
sut = SkillDialog(dialog_options, "dialog_id") | |
activity_to_send = MessageFactory.text(str(uuid.uuid4())) | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions(activity=activity_to_send), | |
conversation_state=conversation_state, | |
) | |
# Send something to the dialog to start it | |
await client.send_activity(MessageFactory.text("irrelevant")) | |
# Cancel the dialog so it sends an EoC to the skill | |
await client.dialog_context.cancel_all_dialogs() | |
assert activity_sent | |
assert activity_sent.type == ActivityTypes.end_of_conversation | |
async def test_should_throw_on_post_failure(self): | |
# This mock client will fail | |
mock_skill_client = self._create_mock_skill_client(None, 500) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client | |
) | |
sut = SkillDialog(dialog_options, "dialog_id") | |
activity_to_send = MessageFactory.text(str(uuid.uuid4())) | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions(activity=activity_to_send), | |
conversation_state=conversation_state, | |
) | |
# A send should raise an exception | |
with self.assertRaises(Exception): | |
await client.send_activity("irrelevant") | |
async def test_should_intercept_oauth_cards_for_sso(self): | |
connection_name = "connectionName" | |
first_response = ExpectedReplies( | |
activities=[ | |
SkillDialogTests.create_oauth_card_attachment_activity("https://test") | |
] | |
) | |
sequence = 0 | |
async def post_return(): | |
nonlocal sequence | |
if sequence == 0: | |
result = InvokeResponse(body=first_response, status=HTTPStatus.OK) | |
else: | |
result = InvokeResponse(status=HTTPStatus.OK) | |
sequence += 1 | |
return result | |
mock_skill_client = self._create_mock_skill_client(None, post_return) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client, connection_name | |
) | |
sut = SkillDialog(dialog_options, dialog_id="dialog") | |
activity_to_send = SkillDialogTests.create_send_activity() | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions( | |
activity=activity_to_send, | |
), | |
conversation_state=conversation_state, | |
) | |
client.test_adapter.add_exchangeable_token( | |
connection_name, "test", "User1", "https://test", "https://test1" | |
) | |
final_activity = await client.send_activity(MessageFactory.text("irrelevant")) | |
self.assertIsNone(final_activity) | |
async def test_should_not_intercept_oauth_cards_for_empty_connection_name(self): | |
connection_name = "connectionName" | |
first_response = ExpectedReplies( | |
activities=[ | |
SkillDialogTests.create_oauth_card_attachment_activity("https://test") | |
] | |
) | |
sequence = 0 | |
async def post_return(): | |
nonlocal sequence | |
if sequence == 0: | |
result = InvokeResponse(body=first_response, status=HTTPStatus.OK) | |
else: | |
result = InvokeResponse(status=HTTPStatus.OK) | |
sequence += 1 | |
return result | |
mock_skill_client = self._create_mock_skill_client(None, post_return) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client | |
) | |
sut = SkillDialog(dialog_options, dialog_id="dialog") | |
activity_to_send = SkillDialogTests.create_send_activity() | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions( | |
activity=activity_to_send, | |
), | |
conversation_state=conversation_state, | |
) | |
client.test_adapter.add_exchangeable_token( | |
connection_name, "test", "User1", "https://test", "https://test1" | |
) | |
final_activity = await client.send_activity(MessageFactory.text("irrelevant")) | |
self.assertIsNotNone(final_activity) | |
self.assertEqual(len(final_activity.attachments), 1) | |
async def test_should_not_intercept_oauth_cards_for_empty_token(self): | |
first_response = ExpectedReplies( | |
activities=[ | |
SkillDialogTests.create_oauth_card_attachment_activity("https://test") | |
] | |
) | |
sequence = 0 | |
async def post_return(): | |
nonlocal sequence | |
if sequence == 0: | |
result = InvokeResponse(body=first_response, status=HTTPStatus.OK) | |
else: | |
result = InvokeResponse(status=HTTPStatus.OK) | |
sequence += 1 | |
return result | |
mock_skill_client = self._create_mock_skill_client(None, post_return) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client | |
) | |
sut = SkillDialog(dialog_options, dialog_id="dialog") | |
activity_to_send = SkillDialogTests.create_send_activity() | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions( | |
activity=activity_to_send, | |
), | |
conversation_state=conversation_state, | |
) | |
# Don't add exchangeable token to test adapter | |
final_activity = await client.send_activity(MessageFactory.text("irrelevant")) | |
self.assertIsNotNone(final_activity) | |
self.assertEqual(len(final_activity.attachments), 1) | |
async def test_should_not_intercept_oauth_cards_for_token_exception(self): | |
connection_name = "connectionName" | |
first_response = ExpectedReplies( | |
activities=[ | |
SkillDialogTests.create_oauth_card_attachment_activity("https://test") | |
] | |
) | |
sequence = 0 | |
async def post_return(): | |
nonlocal sequence | |
if sequence == 0: | |
result = InvokeResponse(body=first_response, status=HTTPStatus.OK) | |
else: | |
result = InvokeResponse(status=HTTPStatus.OK) | |
sequence += 1 | |
return result | |
mock_skill_client = self._create_mock_skill_client(None, post_return) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client, connection_name | |
) | |
sut = SkillDialog(dialog_options, dialog_id="dialog") | |
activity_to_send = SkillDialogTests.create_send_activity() | |
initial_dialog_options = BeginSkillDialogOptions( | |
activity=activity_to_send, | |
) | |
client = DialogTestClient( | |
"test", | |
sut, | |
initial_dialog_options, | |
conversation_state=conversation_state, | |
) | |
client.test_adapter.throw_on_exchange_request( | |
connection_name, "test", "User1", "https://test" | |
) | |
final_activity = await client.send_activity(MessageFactory.text("irrelevant")) | |
self.assertIsNotNone(final_activity) | |
self.assertEqual(len(final_activity.attachments), 1) | |
async def test_should_not_intercept_oauth_cards_for_bad_request(self): | |
connection_name = "connectionName" | |
first_response = ExpectedReplies( | |
activities=[ | |
SkillDialogTests.create_oauth_card_attachment_activity("https://test") | |
] | |
) | |
sequence = 0 | |
async def post_return(): | |
nonlocal sequence | |
if sequence == 0: | |
result = InvokeResponse(body=first_response, status=HTTPStatus.OK) | |
else: | |
result = InvokeResponse(status=HTTPStatus.CONFLICT) | |
sequence += 1 | |
return result | |
mock_skill_client = self._create_mock_skill_client(None, post_return) | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = SkillDialogTests.create_skill_dialog_options( | |
conversation_state, mock_skill_client, connection_name | |
) | |
sut = SkillDialog(dialog_options, dialog_id="dialog") | |
activity_to_send = SkillDialogTests.create_send_activity() | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions( | |
activity=activity_to_send, | |
), | |
conversation_state=conversation_state, | |
) | |
client.test_adapter.add_exchangeable_token( | |
connection_name, "test", "User1", "https://test", "https://test1" | |
) | |
final_activity = await client.send_activity(MessageFactory.text("irrelevant")) | |
self.assertIsNotNone(final_activity) | |
self.assertEqual(len(final_activity.attachments), 1) | |
async def test_end_of_conversation_from_expect_replies_calls_delete_conversation_reference( | |
self, | |
): | |
activity_sent: Activity = None | |
# Callback to capture the parameters sent to the skill | |
async def capture_action( | |
from_bot_id: str, # pylint: disable=unused-argument | |
to_bot_id: str, # pylint: disable=unused-argument | |
to_uri: str, # pylint: disable=unused-argument | |
service_url: str, # pylint: disable=unused-argument | |
conversation_id: str, # pylint: disable=unused-argument | |
activity: Activity, | |
): | |
# Capture values sent to the skill so we can assert the right parameters were used. | |
nonlocal activity_sent | |
activity_sent = activity | |
eoc = Activity.create_end_of_conversation_activity() | |
expected_replies = list([eoc]) | |
# Create a mock skill client to intercept calls and capture what is sent. | |
mock_skill_client = self._create_mock_skill_client( | |
capture_action, expected_replies=expected_replies | |
) | |
# Use Memory for conversation state | |
conversation_state = ConversationState(MemoryStorage()) | |
dialog_options = self.create_skill_dialog_options( | |
conversation_state, mock_skill_client | |
) | |
# Create the SkillDialogInstance and the activity to send. | |
sut = SkillDialog(dialog_options, dialog_id="dialog") | |
activity_to_send = Activity.create_message_activity() | |
activity_to_send.delivery_mode = DeliveryModes.expect_replies | |
activity_to_send.text = str(uuid.uuid4()) | |
client = DialogTestClient( | |
"test", | |
sut, | |
BeginSkillDialogOptions(activity_to_send), | |
conversation_state=conversation_state, | |
) | |
# Send something to the dialog to start it | |
await client.send_activity("hello") | |
simple_id_factory: SimpleConversationIdFactory = ( | |
dialog_options.conversation_id_factory | |
) | |
self.assertEqual(0, len(simple_id_factory.conversation_refs)) | |
self.assertEqual(1, simple_id_factory.create_count) | |
def create_skill_dialog_options( | |
conversation_state: ConversationState, | |
skill_client: BotFrameworkClient, | |
connection_name: str = None, | |
): | |
return SkillDialogOptions( | |
bot_id=str(uuid.uuid4()), | |
skill_host_endpoint="http://test.contoso.com/skill/messages", | |
conversation_id_factory=SimpleConversationIdFactory(), | |
conversation_state=conversation_state, | |
skill_client=skill_client, | |
skill=BotFrameworkSkill( | |
app_id=str(uuid.uuid4()), | |
skill_endpoint="http://testskill.contoso.com/api/messages", | |
), | |
connection_name=connection_name, | |
) | |
def create_send_activity() -> Activity: | |
return Activity( | |
type=ActivityTypes.message, | |
delivery_mode=DeliveryModes.expect_replies, | |
text=str(uuid.uuid4()), | |
) | |
def create_oauth_card_attachment_activity(uri: str) -> Activity: | |
oauth_card = OAuthCard(token_exchange_resource=TokenExchangeResource(uri=uri)) | |
attachment = Attachment( | |
content_type=ContentTypes.oauth_card, | |
content=oauth_card, | |
) | |
attachment_activity = MessageFactory.attachment(attachment) | |
attachment_activity.conversation = ConversationAccount(id=str(uuid.uuid4())) | |
attachment_activity.from_property = ChannelAccount(id="blah", name="name") | |
return attachment_activity | |
def _create_mock_skill_client( | |
self, | |
callback: Callable, | |
return_status: Union[Callable, int] = 200, | |
expected_replies: List[Activity] = None, | |
) -> BotFrameworkClient: | |
mock_client = Mock() | |
activity_list = ExpectedReplies( | |
activities=expected_replies or [MessageFactory.text("dummy activity")] | |
) | |
async def mock_post_activity( | |
from_bot_id: str, | |
to_bot_id: str, | |
to_url: str, | |
service_url: str, | |
conversation_id: str, | |
activity: Activity, | |
): | |
nonlocal callback, return_status | |
if callback: | |
await callback( | |
from_bot_id, | |
to_bot_id, | |
to_url, | |
service_url, | |
conversation_id, | |
activity, | |
) | |
if isinstance(return_status, Callable): | |
return await return_status() | |
return InvokeResponse(status=return_status, body=activity_list) | |
mock_client.post_activity.side_effect = mock_post_activity | |
return mock_client | |