Spaces:
Build error
Build error
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import aiounittest | |
from botbuilder.dialogs.prompts import OAuthPromptSettings | |
from botbuilder.schema import ( | |
Activity, | |
ActivityTypes, | |
ChannelAccount, | |
ConversationAccount, | |
InputHints, | |
SignInConstants, | |
TokenResponse, | |
) | |
from botbuilder.core import CardFactory, ConversationState, MemoryStorage, TurnContext | |
from botbuilder.core.adapters import TestAdapter | |
from botbuilder.dialogs import DialogSet, DialogTurnStatus, PromptOptions | |
from botbuilder.dialogs.prompts import OAuthPrompt | |
def create_reply(activity): | |
return Activity( | |
type=ActivityTypes.message, | |
from_property=ChannelAccount( | |
id=activity.recipient.id, name=activity.recipient.name | |
), | |
recipient=ChannelAccount( | |
id=activity.from_property.id, name=activity.from_property.name | |
), | |
reply_to_id=activity.id, | |
service_url=activity.service_url, | |
channel_id=activity.channel_id, | |
conversation=ConversationAccount( | |
is_group=activity.conversation.is_group, | |
id=activity.conversation.id, | |
name=activity.conversation.name, | |
), | |
) | |
class OAuthPromptTests(aiounittest.AsyncTestCase): | |
async def test_should_call_oauth_prompt(self): | |
connection_name = "myConnection" | |
token = "abc123" | |
async def callback_handler(turn_context: TurnContext): | |
dialog_context = await dialogs.create_context(turn_context) | |
results = await dialog_context.continue_dialog() | |
if results.status == DialogTurnStatus.Empty: | |
await dialog_context.prompt("prompt", PromptOptions()) | |
elif results.status == DialogTurnStatus.Complete: | |
if results.result.token: | |
await turn_context.send_activity("Logged in.") | |
else: | |
await turn_context.send_activity("Failed") | |
await convo_state.save_changes(turn_context) | |
# Initialize TestAdapter. | |
adapter = TestAdapter(callback_handler) | |
# Create ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
# Create a DialogState property, DialogSet and AttachmentPrompt. | |
dialog_state = convo_state.create_property("dialogState") | |
dialogs = DialogSet(dialog_state) | |
dialogs.add( | |
OAuthPrompt( | |
"prompt", OAuthPromptSettings(connection_name, "Login", None, 300000) | |
) | |
) | |
async def inspector( | |
activity: Activity, description: str = None | |
): # pylint: disable=unused-argument | |
self.assertTrue(len(activity.attachments) == 1) | |
self.assertTrue( | |
activity.attachments[0].content_type | |
== CardFactory.content_types.oauth_card | |
) | |
# send a mock EventActivity back to the bot with the token | |
adapter.add_user_token( | |
connection_name, activity.channel_id, activity.recipient.id, token | |
) | |
event_activity = create_reply(activity) | |
event_activity.type = ActivityTypes.event | |
event_activity.from_property, event_activity.recipient = ( | |
event_activity.recipient, | |
event_activity.from_property, | |
) | |
event_activity.name = "tokens/response" | |
event_activity.value = TokenResponse( | |
connection_name=connection_name, token=token | |
) | |
context = TurnContext(adapter, event_activity) | |
await callback_handler(context) | |
step1 = await adapter.send("Hello") | |
step2 = await step1.assert_reply(inspector) | |
await step2.assert_reply("Logged in.") | |
async def test_should_call_oauth_prompt_with_code(self): | |
connection_name = "myConnection" | |
token = "abc123" | |
magic_code = "888999" | |
async def exec_test(turn_context: TurnContext): | |
dialog_context = await dialogs.create_context(turn_context) | |
results = await dialog_context.continue_dialog() | |
if results.status == DialogTurnStatus.Empty: | |
await dialog_context.prompt("prompt", PromptOptions()) | |
elif results.status == DialogTurnStatus.Complete: | |
if results.result.token: | |
await turn_context.send_activity("Logged in.") | |
else: | |
await turn_context.send_activity("Failed") | |
await convo_state.save_changes(turn_context) | |
# Initialize TestAdapter. | |
adapter = TestAdapter(exec_test) | |
# Create ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
# Create a DialogState property, DialogSet and AttachmentPrompt. | |
dialog_state = convo_state.create_property("dialog_state") | |
dialogs = DialogSet(dialog_state) | |
dialogs.add( | |
OAuthPrompt( | |
"prompt", OAuthPromptSettings(connection_name, "Login", None, 300000) | |
) | |
) | |
def inspector( | |
activity: Activity, description: str = None | |
): # pylint: disable=unused-argument | |
assert len(activity.attachments) == 1 | |
assert ( | |
activity.attachments[0].content_type | |
== CardFactory.content_types.oauth_card | |
) | |
# send a mock EventActivity back to the bot with the token | |
adapter.add_user_token( | |
connection_name, | |
activity.channel_id, | |
activity.recipient.id, | |
token, | |
magic_code, | |
) | |
step1 = await adapter.send("Hello") | |
step2 = await step1.assert_reply(inspector) | |
step3 = await step2.send(magic_code) | |
await step3.assert_reply("Logged in.") | |
async def test_oauth_prompt_doesnt_detect_code_in_begin_dialog(self): | |
connection_name = "myConnection" | |
token = "abc123" | |
magic_code = "888999" | |
async def exec_test(turn_context: TurnContext): | |
# Add a magic code to the adapter preemptively so that we can test if the message that triggers | |
# BeginDialogAsync uses magic code detection | |
adapter.add_user_token( | |
connection_name, | |
turn_context.activity.channel_id, | |
turn_context.activity.from_property.id, | |
token, | |
magic_code, | |
) | |
dialog_context = await dialogs.create_context(turn_context) | |
results = await dialog_context.continue_dialog() | |
if results.status == DialogTurnStatus.Empty: | |
# If magicCode is detected when prompting, this will end the dialog and return the token in tokenResult | |
token_result = await dialog_context.prompt("prompt", PromptOptions()) | |
if isinstance(token_result.result, TokenResponse): | |
self.assertTrue(False) # pylint: disable=redundant-unittest-assert | |
await convo_state.save_changes(turn_context) | |
# Initialize TestAdapter. | |
adapter = TestAdapter(exec_test) | |
# Create ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
# Create a DialogState property, DialogSet and AttachmentPrompt. | |
dialog_state = convo_state.create_property("dialog_state") | |
dialogs = DialogSet(dialog_state) | |
dialogs.add( | |
OAuthPrompt( | |
"prompt", OAuthPromptSettings(connection_name, "Login", None, 300000) | |
) | |
) | |
def inspector( | |
activity: Activity, description: str = None | |
): # pylint: disable=unused-argument | |
assert len(activity.attachments) == 1 | |
assert ( | |
activity.attachments[0].content_type | |
== CardFactory.content_types.oauth_card | |
) | |
step1 = await adapter.send(magic_code) | |
await step1.assert_reply(inspector) | |
async def test_should_add_accepting_input_hint_oauth_prompt(self): | |
connection_name = "myConnection" | |
called = False | |
async def callback_handler(turn_context: TurnContext): | |
nonlocal called | |
dialog_context = await dialogs.create_context(turn_context) | |
await dialog_context.continue_dialog() | |
await dialog_context.prompt( | |
"prompt", PromptOptions(prompt=Activity(), retry_prompt=Activity()) | |
) | |
self.assertTrue( | |
dialog_context.active_dialog.state["options"].prompt.input_hint | |
== InputHints.accepting_input | |
) | |
self.assertTrue( | |
dialog_context.active_dialog.state["options"].retry_prompt.input_hint | |
== InputHints.accepting_input | |
) | |
await convo_state.save_changes(turn_context) | |
called = True | |
# Initialize TestAdapter. | |
adapter = TestAdapter(callback_handler) | |
# Create ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
# Create a DialogState property, DialogSet and AttachmentPrompt. | |
dialog_state = convo_state.create_property("dialogState") | |
dialogs = DialogSet(dialog_state) | |
dialogs.add( | |
OAuthPrompt( | |
"prompt", OAuthPromptSettings(connection_name, "Login", None, 300000) | |
) | |
) | |
await adapter.send("Hello") | |
self.assertTrue(called) | |
async def test_should_end_oauth_prompt_on_invalid_message_when_end_on_invalid_message( | |
self, | |
): | |
connection_name = "myConnection" | |
token = "abc123" | |
magic_code = "888999" | |
async def exec_test(turn_context: TurnContext): | |
dialog_context = await dialogs.create_context(turn_context) | |
results = await dialog_context.continue_dialog() | |
if results.status == DialogTurnStatus.Empty: | |
await dialog_context.prompt("prompt", PromptOptions()) | |
elif results.status == DialogTurnStatus.Complete: | |
if results.result and results.result.token: | |
await turn_context.send_activity("Failed") | |
else: | |
await turn_context.send_activity("Ended") | |
await convo_state.save_changes(turn_context) | |
# Initialize TestAdapter. | |
adapter = TestAdapter(exec_test) | |
# Create ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
# Create a DialogState property, DialogSet and AttachmentPrompt. | |
dialog_state = convo_state.create_property("dialog_state") | |
dialogs = DialogSet(dialog_state) | |
dialogs.add( | |
OAuthPrompt( | |
"prompt", | |
OAuthPromptSettings(connection_name, "Login", None, 300000, None, True), | |
) | |
) | |
def inspector( | |
activity: Activity, description: str = None | |
): # pylint: disable=unused-argument | |
assert len(activity.attachments) == 1 | |
assert ( | |
activity.attachments[0].content_type | |
== CardFactory.content_types.oauth_card | |
) | |
# send a mock EventActivity back to the bot with the token | |
adapter.add_user_token( | |
connection_name, | |
activity.channel_id, | |
activity.recipient.id, | |
token, | |
magic_code, | |
) | |
step1 = await adapter.send("Hello") | |
step2 = await step1.assert_reply(inspector) | |
step3 = await step2.send("test invalid message") | |
await step3.assert_reply("Ended") | |
async def test_should_timeout_oauth_prompt_with_message_activity( | |
self, | |
): | |
activity = Activity(type=ActivityTypes.message, text="any") | |
await self.run_timeout_test(activity) | |
async def test_should_timeout_oauth_prompt_with_token_response_event_activity( | |
self, | |
): | |
activity = Activity( | |
type=ActivityTypes.event, name=SignInConstants.token_response_event_name | |
) | |
await self.run_timeout_test(activity) | |
async def test_should_timeout_oauth_prompt_with_verify_state_operation_activity( | |
self, | |
): | |
activity = Activity( | |
type=ActivityTypes.invoke, name=SignInConstants.verify_state_operation_name | |
) | |
await self.run_timeout_test(activity) | |
async def test_should_not_timeout_oauth_prompt_with_custom_event_activity( | |
self, | |
): | |
activity = Activity(type=ActivityTypes.event, name="custom event name") | |
await self.run_timeout_test(activity, False, "Ended", "Failed") | |
async def run_timeout_test( | |
self, | |
activity: Activity, | |
should_succeed: bool = True, | |
token_response: str = "Failed", | |
no_token_resonse="Ended", | |
): | |
connection_name = "myConnection" | |
token = "abc123" | |
magic_code = "888999" | |
async def exec_test(turn_context: TurnContext): | |
dialog_context = await dialogs.create_context(turn_context) | |
results = await dialog_context.continue_dialog() | |
if results.status == DialogTurnStatus.Empty: | |
await dialog_context.prompt("prompt", PromptOptions()) | |
elif results.status == DialogTurnStatus.Complete or ( | |
results.status == DialogTurnStatus.Waiting and not should_succeed | |
): | |
if results.result and results.result.token: | |
await turn_context.send_activity(token_response) | |
else: | |
await turn_context.send_activity(no_token_resonse) | |
await convo_state.save_changes(turn_context) | |
# Initialize TestAdapter. | |
adapter = TestAdapter(exec_test) | |
# Create ConversationState with MemoryStorage and register the state as middleware. | |
convo_state = ConversationState(MemoryStorage()) | |
# Create a DialogState property, DialogSet and AttachmentPrompt. | |
dialog_state = convo_state.create_property("dialog_state") | |
dialogs = DialogSet(dialog_state) | |
dialogs.add( | |
OAuthPrompt( | |
"prompt", | |
OAuthPromptSettings(connection_name, "Login", None, 1), | |
) | |
) | |
def inspector( | |
activity: Activity, description: str = None | |
): # pylint: disable=unused-argument | |
assert len(activity.attachments) == 1 | |
assert ( | |
activity.attachments[0].content_type | |
== CardFactory.content_types.oauth_card | |
) | |
# send a mock EventActivity back to the bot with the token | |
adapter.add_user_token( | |
connection_name, | |
activity.channel_id, | |
activity.recipient.id, | |
token, | |
magic_code, | |
) | |
step1 = await adapter.send("Hello") | |
step2 = await step1.assert_reply(inspector) | |
step3 = await step2.send(activity) | |
await step3.assert_reply(no_token_resonse) | |