Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-dialogs
/botbuilder
/dialogs
/prompts
/oauth_prompt.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import re | |
from datetime import datetime, timedelta | |
from http import HTTPStatus | |
from typing import Union, Awaitable, Callable | |
from botframework.connector import Channels | |
from botframework.connector.auth import ( | |
ClaimsIdentity, | |
SkillValidation, | |
JwtTokenValidation, | |
) | |
from botbuilder.core import ( | |
CardFactory, | |
MessageFactory, | |
InvokeResponse, | |
TurnContext, | |
BotAdapter, | |
) | |
from botbuilder.core.bot_framework_adapter import TokenExchangeRequest | |
from botbuilder.dialogs import Dialog, DialogContext, DialogTurnResult | |
from botbuilder.schema import ( | |
Activity, | |
ActivityTypes, | |
ActionTypes, | |
CardAction, | |
InputHints, | |
SigninCard, | |
SignInConstants, | |
OAuthCard, | |
TokenResponse, | |
TokenExchangeInvokeRequest, | |
TokenExchangeInvokeResponse, | |
) | |
from .prompt_options import PromptOptions | |
from .oauth_prompt_settings import OAuthPromptSettings | |
from .prompt_validator_context import PromptValidatorContext | |
from .prompt_recognizer_result import PromptRecognizerResult | |
from .._user_token_access import _UserTokenAccess | |
class CallerInfo: | |
def __init__(self, caller_service_url: str = None, scope: str = None): | |
self.caller_service_url = caller_service_url | |
self.scope = scope | |
class OAuthPrompt(Dialog): | |
PERSISTED_OPTIONS = "options" | |
PERSISTED_STATE = "state" | |
PERSISTED_EXPIRES = "expires" | |
PERSISTED_CALLER = "caller" | |
""" | |
Creates a new prompt that asks the user to sign in, using the Bot Framework Single Sign On (SSO) service. | |
.. remarks:: | |
The prompt will attempt to retrieve the users current token and if the user isn't signed in, it | |
will send them an `OAuthCard` containing a button they can press to sign in. Depending on the channel, | |
the user will be sent through one of two possible sign-in flows: | |
- The automatic sign-in flow where once the user signs in, the SSO service will forward | |
the bot the users access token using either an `event` or `invoke` activity. | |
- The "magic code" flow where once the user signs in, they will be prompted by the SSO service | |
to send the bot a six digit code confirming their identity. This code will be sent as a | |
standard `message` activity. | |
Both flows are automatically supported by the `OAuthPrompt` and they only thing you need to be careful of | |
is that you don't block the `event` and `invoke` activities that the prompt might be waiting on. | |
You should avoid persisting the access token with your bots other state. The Bot Frameworks SSO service | |
will securely store the token on your behalf. If you store it in your bots state, | |
it could expire or be revoked in between turns. | |
When calling the prompt from within a waterfall step, you should use the token within the step | |
following the prompt and then let the token go out of scope at the end of your function. | |
When used with your bots :class:`DialogSet`, you can simply add a new instance of the prompt as a named | |
dialog using :meth`DialogSet.add()`. | |
You can then start the prompt from a waterfall step using either :meth:`DialogContext.begin()` or | |
:meth:`DialogContext.prompt()`. | |
The user will be prompted to sign in as needed and their access token will be passed as an argument to | |
the callers next waterfall step. | |
""" | |
def __init__( | |
self, | |
dialog_id: str, | |
settings: OAuthPromptSettings, | |
validator: Callable[[PromptValidatorContext], Awaitable[bool]] = None, | |
): | |
""" | |
Creates a new instance of the :class:`OAuthPrompt` class. | |
:param dialog_id: The Id to assign to this prompt. | |
:type dialog_id: str | |
:param settings: Additional authentication settings to use with this instance of the prompt | |
:type settings: :class:`OAuthPromptSettings` | |
:param validator: Optional, contains additional, custom validation for this prompt | |
:type validator: :class:`PromptValidatorContext` | |
.. remarks:: | |
The value of :param dialogId: must be unique within the :class:`DialogSet`or :class:`ComponentDialog` | |
to which the prompt is added. | |
""" | |
super().__init__(dialog_id) | |
self._validator = validator | |
if not settings: | |
raise TypeError( | |
"OAuthPrompt.__init__(): OAuthPrompt requires OAuthPromptSettings." | |
) | |
self._settings = settings | |
self._validator = validator | |
async def begin_dialog( | |
self, dialog_context: DialogContext, options: PromptOptions = None | |
) -> DialogTurnResult: | |
""" | |
Starts an authentication prompt dialog. Called when an authentication prompt dialog is pushed onto the | |
dialog stack and is being activated. | |
:param dialog_context: The dialog context for the current turn of the conversation | |
:type dialog_context: :class:`DialogContext` | |
:param options: Optional, additional information to pass to the prompt being started | |
:type options: :class:`PromptOptions` | |
:return: Dialog turn result | |
:rtype: :class`:`DialogTurnResult` | |
.. remarks:: | |
If the task is successful, the result indicates whether the prompt is still active after the turn | |
has been processed. | |
""" | |
if dialog_context is None: | |
raise TypeError( | |
f"OAuthPrompt.begin_dialog(): Expected DialogContext but got NoneType instead" | |
) | |
options = options or PromptOptions() | |
# Ensure prompts have input hint set | |
if options.prompt and not options.prompt.input_hint: | |
options.prompt.input_hint = InputHints.accepting_input | |
if options.retry_prompt and not options.retry_prompt.input_hint: | |
options.retry_prompt.input_hint = InputHints.accepting_input | |
# Initialize prompt state | |
timeout = ( | |
self._settings.timeout | |
if isinstance(self._settings.timeout, int) | |
else 900000 | |
) | |
state = dialog_context.active_dialog.state | |
state[OAuthPrompt.PERSISTED_STATE] = {} | |
state[OAuthPrompt.PERSISTED_OPTIONS] = options | |
state[OAuthPrompt.PERSISTED_EXPIRES] = datetime.now() + timedelta( | |
seconds=timeout / 1000 | |
) | |
state[OAuthPrompt.PERSISTED_CALLER] = OAuthPrompt.__create_caller_info( | |
dialog_context.context | |
) | |
output = await _UserTokenAccess.get_user_token( | |
dialog_context.context, self._settings, None | |
) | |
if output is not None: | |
# Return token | |
return await dialog_context.end_dialog(output) | |
await self._send_oauth_card(dialog_context.context, options.prompt) | |
return Dialog.end_of_turn | |
async def continue_dialog(self, dialog_context: DialogContext) -> DialogTurnResult: | |
""" | |
Continues a dialog. Called when a prompt dialog is the active dialog and the user replied with a new activity. | |
:param dialog_context: The dialog context for the current turn of the conversation | |
:type dialog_context: :class:`DialogContext` | |
:return: Dialog turn result | |
:rtype: :class:`DialogTurnResult` | |
.. remarks:: | |
If the task is successful, the result indicates whether the dialog is still | |
active after the turn has been processed by the dialog. | |
The prompt generally continues to receive the user's replies until it accepts the | |
user's reply as valid input for the prompt. | |
""" | |
# Check for timeout | |
state = dialog_context.active_dialog.state | |
is_message = dialog_context.context.activity.type == ActivityTypes.message | |
is_timeout_activity_type = ( | |
is_message | |
or OAuthPrompt._is_token_response_event(dialog_context.context) | |
or OAuthPrompt._is_teams_verification_invoke(dialog_context.context) | |
or OAuthPrompt._is_token_exchange_request_invoke(dialog_context.context) | |
) | |
has_timed_out = is_timeout_activity_type and ( | |
datetime.now() > state[OAuthPrompt.PERSISTED_EXPIRES] | |
) | |
if has_timed_out: | |
return await dialog_context.end_dialog(None) | |
if state["state"].get("attemptCount") is None: | |
state["state"]["attemptCount"] = 1 | |
else: | |
state["state"]["attemptCount"] += 1 | |
# Recognize token | |
recognized = await self._recognize_token(dialog_context) | |
# Validate the return value | |
is_valid = False | |
if self._validator is not None: | |
is_valid = await self._validator( | |
PromptValidatorContext( | |
dialog_context.context, | |
recognized, | |
state[OAuthPrompt.PERSISTED_STATE], | |
state[OAuthPrompt.PERSISTED_OPTIONS], | |
) | |
) | |
elif recognized.succeeded: | |
is_valid = True | |
# Return recognized value or re-prompt | |
if is_valid: | |
return await dialog_context.end_dialog(recognized.value) | |
if is_message and self._settings.end_on_invalid_message: | |
# If EndOnInvalidMessage is set, complete the prompt with no result. | |
return await dialog_context.end_dialog(None) | |
# Send retry prompt | |
if ( | |
not dialog_context.context.responded | |
and is_message | |
and state[OAuthPrompt.PERSISTED_OPTIONS].retry_prompt is not None | |
): | |
await dialog_context.context.send_activity( | |
state[OAuthPrompt.PERSISTED_OPTIONS].retry_prompt | |
) | |
return Dialog.end_of_turn | |
async def get_user_token( | |
self, context: TurnContext, code: str = None | |
) -> TokenResponse: | |
""" | |
Gets the user's tokeN. | |
:param context: Context for the current turn of conversation with the user | |
:type context: :class:`TurnContext` | |
:param code: (Optional) Optional user entered code to validate. | |
:type code: str | |
:return: A response that includes the user's token | |
:rtype: :class:`TokenResponse` | |
.. remarks:: | |
If the task is successful and the user already has a token or the user successfully signs in, | |
the result contains the user's token. | |
""" | |
return await _UserTokenAccess.get_user_token(context, self._settings, code) | |
async def sign_out_user(self, context: TurnContext): | |
""" | |
Signs out the user | |
:param context: Context for the current turn of conversation with the user | |
:type context: :class:`TurnContext` | |
:return: A task representing the work queued to execute | |
.. remarks:: | |
If the task is successful and the user already has a token or the user successfully signs in, | |
the result contains the user's token. | |
""" | |
return await _UserTokenAccess.sign_out_user(context, self._settings) | |
def __create_caller_info(context: TurnContext) -> CallerInfo: | |
bot_identity = context.turn_state.get(BotAdapter.BOT_IDENTITY_KEY) | |
if bot_identity and SkillValidation.is_skill_claim(bot_identity.claims): | |
return CallerInfo( | |
caller_service_url=context.activity.service_url, | |
scope=JwtTokenValidation.get_app_id_from_claims(bot_identity.claims), | |
) | |
return None | |
async def _send_oauth_card( | |
self, context: TurnContext, prompt: Union[Activity, str] = None | |
): | |
if not isinstance(prompt, Activity): | |
prompt = MessageFactory.text(prompt or "", None, InputHints.accepting_input) | |
else: | |
prompt.input_hint = prompt.input_hint or InputHints.accepting_input | |
prompt.attachments = prompt.attachments or [] | |
if OAuthPrompt._channel_suppports_oauth_card(context.activity.channel_id): | |
if not any( | |
att.content_type == CardFactory.content_types.oauth_card | |
for att in prompt.attachments | |
): | |
card_action_type = ActionTypes.signin | |
sign_in_resource = await _UserTokenAccess.get_sign_in_resource( | |
context, self._settings | |
) | |
link = sign_in_resource.sign_in_link | |
bot_identity: ClaimsIdentity = context.turn_state.get( | |
BotAdapter.BOT_IDENTITY_KEY | |
) | |
# use the SignInLink when in speech channel or bot is a skill or | |
# an extra OAuthAppCredentials is being passed in | |
if ( | |
( | |
bot_identity | |
and SkillValidation.is_skill_claim(bot_identity.claims) | |
) | |
or not context.activity.service_url.startswith("http") | |
or self._settings.oath_app_credentials | |
): | |
if context.activity.channel_id == Channels.emulator: | |
card_action_type = ActionTypes.open_url | |
elif not OAuthPrompt._channel_requires_sign_in_link( | |
context.activity.channel_id | |
): | |
link = None | |
json_token_ex_resource = ( | |
sign_in_resource.token_exchange_resource.as_dict() | |
if sign_in_resource.token_exchange_resource | |
else None | |
) | |
prompt.attachments.append( | |
CardFactory.oauth_card( | |
OAuthCard( | |
text=self._settings.text, | |
connection_name=self._settings.connection_name, | |
buttons=[ | |
CardAction( | |
title=self._settings.title, | |
text=self._settings.text, | |
type=card_action_type, | |
value=link, | |
) | |
], | |
token_exchange_resource=json_token_ex_resource, | |
) | |
) | |
) | |
else: | |
if not any( | |
att.content_type == CardFactory.content_types.signin_card | |
for att in prompt.attachments | |
): | |
if not hasattr(context.adapter, "get_oauth_sign_in_link"): | |
raise Exception( | |
"OAuthPrompt._send_oauth_card(): get_oauth_sign_in_link() not supported by the current adapter" | |
) | |
link = await context.adapter.get_oauth_sign_in_link( | |
context, | |
self._settings.connection_name, | |
None, | |
self._settings.oath_app_credentials, | |
) | |
prompt.attachments.append( | |
CardFactory.signin_card( | |
SigninCard( | |
text=self._settings.text, | |
buttons=[ | |
CardAction( | |
title=self._settings.title, | |
value=link, | |
type=ActionTypes.signin, | |
) | |
], | |
) | |
) | |
) | |
# Send prompt | |
await context.send_activity(prompt) | |
async def _recognize_token( | |
self, dialog_context: DialogContext | |
) -> PromptRecognizerResult: | |
context = dialog_context.context | |
token = None | |
if OAuthPrompt._is_token_response_event(context): | |
token = context.activity.value | |
# fixup the turnContext's state context if this was received from a skill host caller | |
state: CallerInfo = dialog_context.active_dialog.state[ | |
OAuthPrompt.PERSISTED_CALLER | |
] | |
if state: | |
# set the ServiceUrl to the skill host's Url | |
dialog_context.context.activity.service_url = state.caller_service_url | |
claims_identity = context.turn_state.get(BotAdapter.BOT_IDENTITY_KEY) | |
connector_client = await _UserTokenAccess.create_connector_client( | |
context, | |
dialog_context.context.activity.service_url, | |
claims_identity, | |
state.scope, | |
) | |
context.turn_state[BotAdapter.BOT_CONNECTOR_CLIENT_KEY] = ( | |
connector_client | |
) | |
elif OAuthPrompt._is_teams_verification_invoke(context): | |
code = context.activity.value["state"] | |
try: | |
token = await _UserTokenAccess.get_user_token( | |
context, self._settings, code | |
) | |
if token is not None: | |
await context.send_activity( | |
Activity( | |
type="invokeResponse", | |
value=InvokeResponse(status=HTTPStatus.OK), | |
) | |
) | |
else: | |
await context.send_activity( | |
Activity( | |
type="invokeResponse", | |
value=InvokeResponse(status=HTTPStatus.NOT_FOUND), | |
) | |
) | |
except Exception: | |
await context.send_activity( | |
Activity( | |
type="invokeResponse", | |
value=InvokeResponse(status=HTTPStatus.INTERNAL_SERVER_ERROR), | |
) | |
) | |
elif self._is_token_exchange_request_invoke(context): | |
if isinstance(context.activity.value, dict): | |
context.activity.value = TokenExchangeInvokeRequest().from_dict( | |
context.activity.value | |
) | |
if not ( | |
context.activity.value | |
and self._is_token_exchange_request(context.activity.value) | |
): | |
# Received activity is not a token exchange request. | |
await context.send_activity( | |
self._get_token_exchange_invoke_response( | |
int(HTTPStatus.BAD_REQUEST), | |
"The bot received an InvokeActivity that is missing a TokenExchangeInvokeRequest value." | |
" This is required to be sent with the InvokeActivity.", | |
) | |
) | |
elif ( | |
context.activity.value.connection_name != self._settings.connection_name | |
): | |
# Connection name on activity does not match that of setting. | |
await context.send_activity( | |
self._get_token_exchange_invoke_response( | |
int(HTTPStatus.BAD_REQUEST), | |
"The bot received an InvokeActivity with a TokenExchangeInvokeRequest containing a" | |
" ConnectionName that does not match the ConnectionName expected by the bots active" | |
" OAuthPrompt. Ensure these names match when sending the InvokeActivityInvalid" | |
" ConnectionName in the TokenExchangeInvokeRequest", | |
) | |
) | |
elif not getattr(context.adapter, "exchange_token"): | |
# Token Exchange not supported in the adapter. | |
await context.send_activity( | |
self._get_token_exchange_invoke_response( | |
int(HTTPStatus.BAD_GATEWAY), | |
"The bot's BotAdapter does not support token exchange operations." | |
" Ensure the bot's Adapter supports the ExtendedUserTokenProvider interface.", | |
) | |
) | |
raise AttributeError( | |
"OAuthPrompt._recognize_token(): not supported by the current adapter." | |
) | |
else: | |
# No errors. Proceed with token exchange. | |
token_exchange_response = None | |
try: | |
token_exchange_response = await _UserTokenAccess.exchange_token( | |
context, | |
self._settings, | |
TokenExchangeRequest(token=context.activity.value.token), | |
) | |
except: | |
# Ignore Exceptions | |
# If token exchange failed for any reason, tokenExchangeResponse above stays null, and | |
# hence we send back a failure invoke response to the caller. | |
pass | |
if not token_exchange_response or not token_exchange_response.token: | |
await context.send_activity( | |
self._get_token_exchange_invoke_response( | |
int(HTTPStatus.PRECONDITION_FAILED), | |
"The bot is unable to exchange token. Proceed with regular login.", | |
) | |
) | |
else: | |
await context.send_activity( | |
self._get_token_exchange_invoke_response( | |
int(HTTPStatus.OK), None, context.activity.value.id | |
) | |
) | |
token = TokenResponse( | |
channel_id=token_exchange_response.channel_id, | |
connection_name=token_exchange_response.connection_name, | |
token=token_exchange_response.token, | |
expiration=None, | |
) | |
elif context.activity.type == ActivityTypes.message and context.activity.text: | |
match = re.match(r"(?<!\d)\d{6}(?!\d)", context.activity.text) | |
if match: | |
token = await _UserTokenAccess.get_user_token( | |
context, self._settings, match[0] | |
) | |
return ( | |
PromptRecognizerResult(True, token) | |
if token is not None | |
else PromptRecognizerResult() | |
) | |
def _get_token_exchange_invoke_response( | |
self, status: int, failure_detail: str, identifier: str = None | |
) -> Activity: | |
return Activity( | |
type=ActivityTypes.invoke_response, | |
value=InvokeResponse( | |
status=status, | |
body=TokenExchangeInvokeResponse( | |
id=identifier, | |
connection_name=self._settings.connection_name, | |
failure_detail=failure_detail, | |
), | |
), | |
) | |
def _is_token_response_event(context: TurnContext) -> bool: | |
activity = context.activity | |
return ( | |
activity.type == ActivityTypes.event | |
and activity.name == SignInConstants.token_response_event_name | |
) | |
def _is_teams_verification_invoke(context: TurnContext) -> bool: | |
activity = context.activity | |
return ( | |
activity.type == ActivityTypes.invoke | |
and activity.name == SignInConstants.verify_state_operation_name | |
) | |
def _channel_suppports_oauth_card(channel_id: str) -> bool: | |
if channel_id in [ | |
Channels.cortana, | |
Channels.skype, | |
Channels.skype_for_business, | |
]: | |
return False | |
return True | |
def _channel_requires_sign_in_link(channel_id: str) -> bool: | |
if channel_id in [Channels.ms_teams]: | |
return True | |
return False | |
def _is_token_exchange_request_invoke(context: TurnContext) -> bool: | |
activity = context.activity | |
return ( | |
activity.type == ActivityTypes.invoke | |
and activity.name == SignInConstants.token_exchange_operation_name | |
) | |
def _is_token_exchange_request(obj: TokenExchangeInvokeRequest) -> bool: | |
return bool(obj.connection_name) and bool(obj.token) | |