Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-dialogs
/botbuilder
/dialogs
/waterfall_dialog.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import uuid | |
from typing import Coroutine | |
from botbuilder.core import TurnContext | |
from botbuilder.schema import ActivityTypes | |
from .dialog_reason import DialogReason | |
from .dialog import Dialog | |
from .dialog_turn_result import DialogTurnResult | |
from .dialog_context import DialogContext | |
from .dialog_instance import DialogInstance | |
from .waterfall_step_context import WaterfallStepContext | |
class WaterfallDialog(Dialog): | |
PersistedOptions = "options" | |
StepIndex = "stepIndex" | |
PersistedValues = "values" | |
PersistedInstanceId = "instanceId" | |
def __init__(self, dialog_id: str, steps: [Coroutine] = None): | |
super(WaterfallDialog, self).__init__(dialog_id) | |
if not steps: | |
self._steps = [] | |
else: | |
if not isinstance(steps, list): | |
raise TypeError("WaterfallDialog(): steps must be list of steps") | |
self._steps = steps | |
def add_step(self, step): | |
""" | |
Adds a new step to the waterfall. | |
:param step: Step to add | |
:return: Waterfall dialog for fluent calls to `add_step()`. | |
""" | |
if not step: | |
raise TypeError("WaterfallDialog.add_step(): step cannot be None.") | |
self._steps.append(step) | |
return self | |
async def begin_dialog( | |
self, dialog_context: DialogContext, options: object = None | |
) -> DialogTurnResult: | |
if not dialog_context: | |
raise TypeError("WaterfallDialog.begin_dialog(): dc cannot be None.") | |
# Initialize waterfall state | |
state = dialog_context.active_dialog.state | |
instance_id = uuid.uuid1().__str__() | |
state[self.PersistedOptions] = options | |
state[self.PersistedValues] = {} | |
state[self.PersistedInstanceId] = instance_id | |
properties = {} | |
properties["DialogId"] = self.id | |
properties["InstanceId"] = instance_id | |
self.telemetry_client.track_event("WaterfallStart", properties) | |
# Run first stepkinds | |
return await self.run_step(dialog_context, 0, DialogReason.BeginCalled, None) | |
async def continue_dialog( # pylint: disable=unused-argument,arguments-differ | |
self, | |
dialog_context: DialogContext = None, | |
reason: DialogReason = None, | |
result: object = NotImplementedError(), | |
) -> DialogTurnResult: | |
if not dialog_context: | |
raise TypeError("WaterfallDialog.continue_dialog(): dc cannot be None.") | |
if dialog_context.context.activity.type != ActivityTypes.message: | |
return Dialog.end_of_turn | |
return await self.resume_dialog( | |
dialog_context, | |
DialogReason.ContinueCalled, | |
dialog_context.context.activity.text, | |
) | |
async def resume_dialog( | |
self, dialog_context: DialogContext, reason: DialogReason, result: object | |
): | |
if dialog_context is None: | |
raise TypeError("WaterfallDialog.resume_dialog(): dc cannot be None.") | |
# Increment step index and run step | |
state = dialog_context.active_dialog.state | |
# Future Me: | |
# If issues with CosmosDB, see https://github.com/Microsoft/botbuilder-dotnet/issues/871 | |
# for hints. | |
return await self.run_step( | |
dialog_context, state[self.StepIndex] + 1, reason, result | |
) | |
async def end_dialog( # pylint: disable=unused-argument | |
self, context: TurnContext, instance: DialogInstance, reason: DialogReason | |
) -> None: | |
if reason is DialogReason.CancelCalled: | |
index = instance.state[self.StepIndex] | |
step_name = self.get_step_name(index) | |
instance_id = str(instance.state[self.PersistedInstanceId]) | |
properties = { | |
"DialogId": self.id, | |
"StepName": step_name, | |
"InstanceId": instance_id, | |
} | |
self.telemetry_client.track_event("WaterfallCancel", properties) | |
else: | |
if reason is DialogReason.EndCalled: | |
instance_id = str(instance.state[self.PersistedInstanceId]) | |
properties = {"DialogId": self.id, "InstanceId": instance_id} | |
self.telemetry_client.track_event("WaterfallComplete", properties) | |
return | |
async def on_step(self, step_context: WaterfallStepContext) -> DialogTurnResult: | |
step_name = self.get_step_name(step_context.index) | |
instance_id = str(step_context.active_dialog.state[self.PersistedInstanceId]) | |
properties = { | |
"DialogId": self.id, | |
"StepName": step_name, | |
"InstanceId": instance_id, | |
} | |
self.telemetry_client.track_event("WaterfallStep", properties) | |
return await self._steps[step_context.index](step_context) | |
async def run_step( | |
self, | |
dialog_context: DialogContext, | |
index: int, | |
reason: DialogReason, | |
result: object, | |
) -> DialogTurnResult: | |
if not dialog_context: | |
raise TypeError( | |
"WaterfallDialog.run_steps(): dialog_context cannot be None." | |
) | |
if index < len(self._steps): | |
# Update persisted step index | |
state = dialog_context.active_dialog.state | |
state[self.StepIndex] = index | |
# Create step context | |
options = state[self.PersistedOptions] | |
values = state[self.PersistedValues] | |
step_context = WaterfallStepContext( | |
self, dialog_context, options, values, index, reason, result | |
) | |
return await self.on_step(step_context) | |
# End of waterfall so just return any result to parent | |
return await dialog_context.end_dialog(result) | |
def get_step_name(self, index: int) -> str: | |
""" | |
Give the waterfall step a unique name | |
""" | |
step_name = self._steps[index].__qualname__ | |
if not step_name or step_name.endswith("<lambda>"): | |
step_name = f"Step{index + 1}of{len(self._steps)}" | |
return step_name | |