Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-dialogs
/botbuilder
/dialogs
/dialog_context.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
from typing import List, Optional | |
from botbuilder.core.turn_context import TurnContext | |
from botbuilder.dialogs.memory import DialogStateManager | |
from .dialog_event import DialogEvent | |
from .dialog_events import DialogEvents | |
from .dialog_set import DialogSet | |
from .dialog_state import DialogState | |
from .dialog_turn_status import DialogTurnStatus | |
from .dialog_turn_result import DialogTurnResult | |
from .dialog_reason import DialogReason | |
from .dialog_instance import DialogInstance | |
from .dialog import Dialog | |
class DialogContext: | |
def __init__( | |
self, dialog_set: DialogSet, turn_context: TurnContext, state: DialogState | |
): | |
if dialog_set is None: | |
raise TypeError("DialogContext(): dialog_set cannot be None.") | |
# TODO: Circular dependency with dialog_set: Check type. | |
if turn_context is None: | |
raise TypeError("DialogContext(): turn_context cannot be None.") | |
self._turn_context = turn_context | |
self._dialogs = dialog_set | |
self._stack = state.dialog_stack | |
self.services = {} | |
self.parent: DialogContext = None | |
self.state = DialogStateManager(self) | |
def dialogs(self) -> DialogSet: | |
"""Gets the set of dialogs that can be called from this context. | |
:param: | |
:return DialogSet: | |
""" | |
return self._dialogs | |
def context(self) -> TurnContext: | |
"""Gets the context for the current turn of conversation. | |
:param: | |
:return TurnContext: | |
""" | |
return self._turn_context | |
def stack(self) -> List: | |
"""Gets the current dialog stack. | |
:param: | |
:return list: | |
""" | |
return self._stack | |
def active_dialog(self): | |
"""Return the container link in the database. | |
:param: | |
:return: | |
""" | |
if self._stack: | |
return self._stack[0] | |
return None | |
def child(self) -> Optional["DialogContext"]: | |
"""Return the container link in the database. | |
:param: | |
:return DialogContext: | |
""" | |
# pylint: disable=import-outside-toplevel | |
instance = self.active_dialog | |
if instance: | |
dialog = self.find_dialog_sync(instance.id) | |
# This import prevents circular dependency issues | |
from .dialog_container import DialogContainer | |
if isinstance(dialog, DialogContainer): | |
return dialog.create_child_context(self) | |
return None | |
async def begin_dialog(self, dialog_id: str, options: object = None): | |
""" | |
Pushes a new dialog onto the dialog stack. | |
:param dialog_id: ID of the dialog to start | |
:param options: (Optional) additional argument(s) to pass to the dialog being started. | |
""" | |
try: | |
if not dialog_id: | |
raise TypeError("Dialog(): dialog_id cannot be None.") | |
# Look up dialog | |
dialog = await self.find_dialog(dialog_id) | |
if dialog is None: | |
raise Exception( | |
"'DialogContext.begin_dialog(): A dialog with an id of '%s' wasn't found." | |
" The dialog must be included in the current or parent DialogSet." | |
" For example, if subclassing a ComponentDialog you can call add_dialog() within your constructor." | |
% dialog_id | |
) | |
# Push new instance onto stack | |
instance = DialogInstance() | |
instance.id = dialog_id | |
instance.state = {} | |
self._stack.insert(0, (instance)) | |
# Call dialog's begin_dialog() method | |
return await dialog.begin_dialog(self, options) | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
# TODO: Fix options: PromptOptions instead of object | |
async def prompt(self, dialog_id: str, options) -> DialogTurnResult: | |
""" | |
Helper function to simplify formatting the options for calling a prompt dialog. This helper will | |
take a `PromptOptions` argument and then call. | |
:param dialog_id: ID of the prompt to start. | |
:param options: Contains a Prompt, potentially a RetryPrompt and if using ChoicePrompt, Choices. | |
:return: | |
""" | |
try: | |
if not dialog_id: | |
raise TypeError("DialogContext.prompt(): dialogId cannot be None.") | |
if not options: | |
raise TypeError("DialogContext.prompt(): options cannot be None.") | |
return await self.begin_dialog(dialog_id, options) | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
async def continue_dialog(self): | |
""" | |
Continues execution of the active dialog, if there is one, by passing the context object to | |
its `Dialog.continue_dialog()` method. You can check `turn_context.responded` after the call completes | |
to determine if a dialog was run and a reply was sent to the user. | |
:return: | |
""" | |
try: | |
# Check for a dialog on the stack | |
if self.active_dialog is not None: | |
# Look up dialog | |
dialog = await self.find_dialog(self.active_dialog.id) | |
if not dialog: | |
raise Exception( | |
"DialogContext.continue_dialog(): Can't continue dialog. " | |
"A dialog with an id of '%s' wasn't found." | |
% self.active_dialog.id | |
) | |
# Continue execution of dialog | |
return await dialog.continue_dialog(self) | |
return DialogTurnResult(DialogTurnStatus.Empty) | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
# TODO: instance is DialogInstance | |
async def end_dialog(self, result: object = None): | |
""" | |
Ends a dialog by popping it off the stack and returns an optional result to the dialog's | |
parent. The parent dialog is the dialog that started the dialog being ended via a call to | |
either "begin_dialog" or "prompt". | |
The parent dialog will have its `Dialog.resume_dialog()` method invoked with any returned | |
result. If the parent dialog hasn't implemented a `resume_dialog()` method then it will be | |
automatically ended as well and the result passed to its parent. If there are no more | |
parent dialogs on the stack then processing of the turn will end. | |
:param result: (Optional) result to pass to the parent dialogs. | |
:return: | |
""" | |
try: | |
await self.end_active_dialog(DialogReason.EndCalled) | |
# Resume previous dialog | |
if self.active_dialog is not None: | |
# Look up dialog | |
dialog = await self.find_dialog(self.active_dialog.id) | |
if not dialog: | |
raise Exception( | |
"DialogContext.EndDialogAsync(): Can't resume previous dialog." | |
" A dialog with an id of '%s' wasn't found." | |
% self.active_dialog.id | |
) | |
# Return result to previous dialog | |
return await dialog.resume_dialog(self, DialogReason.EndCalled, result) | |
return DialogTurnResult(DialogTurnStatus.Complete, result) | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
async def cancel_all_dialogs( | |
self, | |
cancel_parents: bool = None, | |
event_name: str = None, | |
event_value: object = None, | |
): | |
""" | |
Deletes any existing dialog stack thus cancelling all dialogs on the stack. | |
:param cancel_parents: | |
:param event_name: | |
:param event_value: | |
:return: | |
""" | |
try: | |
event_name = event_name or DialogEvents.cancel_dialog | |
if self.stack or self.parent: | |
# Cancel all local and parent dialogs while checking for interception | |
notify = False | |
dialog_context = self | |
while dialog_context: | |
if dialog_context.stack: | |
# Check to see if the dialog wants to handle the event | |
if notify: | |
event_handled = await dialog_context.emit_event( | |
event_name, | |
event_value, | |
bubble=False, | |
from_leaf=False, | |
) | |
if event_handled: | |
break | |
# End the active dialog | |
await dialog_context.end_active_dialog( | |
DialogReason.CancelCalled | |
) | |
else: | |
dialog_context = ( | |
dialog_context.parent if cancel_parents else None | |
) | |
notify = True | |
return DialogTurnResult(DialogTurnStatus.Cancelled) | |
# Stack was empty and no parent | |
return DialogTurnResult(DialogTurnStatus.Empty) | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
async def find_dialog(self, dialog_id: str) -> Dialog: | |
""" | |
If the dialog cannot be found within the current `DialogSet`, the parent `DialogContext` | |
will be searched if there is one. | |
:param dialog_id: ID of the dialog to search for. | |
:return: | |
""" | |
try: | |
dialog = await self.dialogs.find(dialog_id) | |
if dialog is None and self.parent is not None: | |
dialog = await self.parent.find_dialog(dialog_id) | |
return dialog | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
def find_dialog_sync(self, dialog_id: str) -> Dialog: | |
""" | |
If the dialog cannot be found within the current `DialogSet`, the parent `DialogContext` | |
will be searched if there is one. | |
:param dialog_id: ID of the dialog to search for. | |
:return: | |
""" | |
dialog = self.dialogs.find_dialog(dialog_id) | |
if dialog is None and self.parent is not None: | |
dialog = self.parent.find_dialog_sync(dialog_id) | |
return dialog | |
async def replace_dialog( | |
self, dialog_id: str, options: object = None | |
) -> DialogTurnResult: | |
""" | |
Ends the active dialog and starts a new dialog in its place. This is particularly useful | |
for creating loops or redirecting to another dialog. | |
:param dialog_id: ID of the dialog to search for. | |
:param options: (Optional) additional argument(s) to pass to the new dialog. | |
:return: | |
""" | |
try: | |
# End the current dialog and giving the reason. | |
await self.end_active_dialog(DialogReason.ReplaceCalled) | |
# Start replacement dialog | |
return await self.begin_dialog(dialog_id, options) | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
async def reprompt_dialog(self): | |
""" | |
Calls reprompt on the currently active dialog, if there is one. Used with Prompts that have a reprompt behavior. | |
:return: | |
""" | |
try: | |
# Check for a dialog on the stack | |
if self.active_dialog is not None: | |
# Look up dialog | |
dialog = await self.find_dialog(self.active_dialog.id) | |
if not dialog: | |
raise Exception( | |
"DialogSet.reprompt_dialog(): Can't find A dialog with an id of '%s'." | |
% self.active_dialog.id | |
) | |
# Ask dialog to re-prompt if supported | |
await dialog.reprompt_dialog(self.context, self.active_dialog) | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
async def end_active_dialog(self, reason: DialogReason): | |
instance = self.active_dialog | |
if instance is not None: | |
# Look up dialog | |
dialog = await self.find_dialog(instance.id) | |
if dialog is not None: | |
# Notify dialog of end | |
await dialog.end_dialog(self.context, instance, reason) | |
# Pop dialog off stack | |
self._stack.pop(0) | |
async def emit_event( | |
self, | |
name: str, | |
value: object = None, | |
bubble: bool = True, | |
from_leaf: bool = False, | |
) -> bool: | |
""" | |
Searches for a dialog with a given ID. | |
Emits a named event for the current dialog, or someone who started it, to handle. | |
:param name: Name of the event to raise. | |
:param value: Value to send along with the event. | |
:param bubble: Flag to control whether the event should be bubbled to its parent if not handled locally. | |
Defaults to a value of `True`. | |
:param from_leaf: Whether the event is emitted from a leaf node. | |
:param cancellationToken: The cancellation token. | |
:return: True if the event was handled. | |
""" | |
try: | |
# Initialize event | |
dialog_event = DialogEvent( | |
bubble=bubble, | |
name=name, | |
value=value, | |
) | |
dialog_context = self | |
# Find starting dialog | |
if from_leaf: | |
while True: | |
child_dc = dialog_context.child | |
if child_dc: | |
dialog_context = child_dc | |
else: | |
break | |
# Dispatch to active dialog first | |
instance = dialog_context.active_dialog | |
if instance: | |
dialog = await dialog_context.find_dialog(instance.id) | |
if dialog: | |
return await dialog.on_dialog_event(dialog_context, dialog_event) | |
return False | |
except Exception as err: | |
self.__set_exception_context_data(err) | |
raise | |
def __set_exception_context_data(self, exception: Exception): | |
if not hasattr(exception, "data"): | |
exception.data = {} | |
if not type(self).__name__ in exception.data: | |
stack = [] | |
current_dc = self | |
while current_dc is not None: | |
stack = stack + [x.id for x in current_dc.stack] | |
current_dc = current_dc.parent | |
exception.data[type(self).__name__] = { | |
"active_dialog": ( | |
None if self.active_dialog is None else self.active_dialog.id | |
), | |
"parent": None if self.parent is None else self.parent.active_dialog.id, | |
"stack": self.stack, | |
} | |