Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-testing
/botbuilder
/testing
/storage_base_tests.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
""" | |
Base tests that all storage providers should implement in their own tests. | |
They handle the storage-based assertions, internally. | |
All tests return true if assertions pass to indicate that the code ran to completion, passing internal assertions. | |
Therefore, all tests using theses static tests should strictly check that the method returns true. | |
Note: Python cannot have dicts with properties with a None value like other SDKs can have properties with null values. | |
Because of this, StoreItem tests have "e_tag: *" where the tests in the other SDKs do not. | |
This has also caused us to comment out some parts of these tests where we assert that "e_tag" | |
is None for the same reason. A null e_tag should work just like a * e_tag when writing, | |
as far as the storage adapters are concerened, so this shouldn't cause issues. | |
:Example: | |
async def test_handle_null_keys_when_reading(self): | |
await reset() | |
test_ran = await StorageBaseTests.handle_null_keys_when_reading(get_storage()) | |
assert test_ran | |
""" | |
import pytest | |
from botbuilder.azure import CosmosDbStorage | |
from botbuilder.core import ( | |
ConversationState, | |
TurnContext, | |
MessageFactory, | |
MemoryStorage, | |
) | |
from botbuilder.core.adapters import TestAdapter | |
from botbuilder.dialogs import ( | |
DialogSet, | |
DialogTurnStatus, | |
TextPrompt, | |
PromptValidatorContext, | |
WaterfallStepContext, | |
Dialog, | |
WaterfallDialog, | |
PromptOptions, | |
) | |
class StorageBaseTests: | |
# pylint: disable=pointless-string-statement | |
async def return_empty_object_when_reading_unknown_key(storage) -> bool: | |
result = await storage.read(["unknown"]) | |
assert result is not None | |
assert len(result) == 0 | |
return True | |
async def handle_null_keys_when_reading(storage) -> bool: | |
if isinstance(storage, (CosmosDbStorage, MemoryStorage)): | |
result = await storage.read(None) | |
assert len(result.keys()) == 0 | |
# Catch-all | |
else: | |
with pytest.raises(Exception) as err: | |
await storage.read(None) | |
assert err.value.args[0] == "Keys are required when reading" | |
return True | |
async def handle_null_keys_when_writing(storage) -> bool: | |
with pytest.raises(Exception) as err: | |
await storage.write(None) | |
assert err.value.args[0] == "Changes are required when writing" | |
return True | |
async def does_not_raise_when_writing_no_items(storage) -> bool: | |
# noinspection PyBroadException | |
try: | |
await storage.write([]) | |
except: | |
pytest.fail("Should not raise") | |
return True | |
async def create_object(storage) -> bool: | |
store_items = { | |
"createPoco": {"id": 1}, | |
"createPocoStoreItem": {"id": 2, "e_tag": "*"}, | |
} | |
await storage.write(store_items) | |
read_store_items = await storage.read(store_items.keys()) | |
assert store_items["createPoco"]["id"] == read_store_items["createPoco"]["id"] | |
assert ( | |
store_items["createPocoStoreItem"]["id"] | |
== read_store_items["createPocoStoreItem"]["id"] | |
) | |
# If decided to validate e_tag integrity again, uncomment this code | |
# assert read_store_items["createPoco"]["e_tag"] is not None | |
assert read_store_items["createPocoStoreItem"]["e_tag"] is not None | |
return True | |
async def handle_crazy_keys(storage) -> bool: | |
key = '!@#$%^&*()_+??><":QASD~`' | |
store_item = {"id": 1} | |
store_items = {key: store_item} | |
await storage.write(store_items) | |
read_store_items = await storage.read(store_items.keys()) | |
assert read_store_items[key] is not None | |
assert read_store_items[key]["id"] == 1 | |
return True | |
async def update_object(storage) -> bool: | |
original_store_items = { | |
"pocoItem": {"id": 1, "count": 1}, | |
"pocoStoreItem": {"id": 1, "count": 1, "e_tag": "*"}, | |
} | |
# 1st write should work | |
await storage.write(original_store_items) | |
loaded_store_items = await storage.read(["pocoItem", "pocoStoreItem"]) | |
update_poco_item = loaded_store_items["pocoItem"] | |
update_poco_item["e_tag"] = None | |
update_poco_store_item = loaded_store_items["pocoStoreItem"] | |
assert update_poco_store_item["e_tag"] is not None | |
# 2nd write should work | |
update_poco_item["count"] += 1 | |
update_poco_store_item["count"] += 1 | |
await storage.write(loaded_store_items) | |
reloaded_store_items = await storage.read(loaded_store_items.keys()) | |
reloaded_update_poco_item = reloaded_store_items["pocoItem"] | |
reloaded_update_poco_store_item = reloaded_store_items["pocoStoreItem"] | |
assert reloaded_update_poco_item["count"] == 2 | |
assert reloaded_update_poco_store_item["count"] == 2 | |
# Write with old e_tag should succeed for non-storeItem | |
update_poco_item["count"] = 123 | |
await storage.write({"pocoItem": update_poco_item}) | |
# Write with old eTag should FAIL for storeItem | |
update_poco_store_item["count"] = 123 | |
""" | |
This assert exists in the other SDKs but can't in python, currently | |
due to using "e_tag: *" above (see comment near the top of this file for details). | |
with pytest.raises(Exception) as err: | |
await storage.write({"pocoStoreItem": update_poco_store_item}) | |
assert err.value is not None | |
""" | |
reloaded_store_items2 = await storage.read(["pocoItem", "pocoStoreItem"]) | |
reloaded_poco_item2 = reloaded_store_items2["pocoItem"] | |
reloaded_poco_item2["e_tag"] = None | |
reloaded_poco_store_item2 = reloaded_store_items2["pocoStoreItem"] | |
assert reloaded_poco_item2["count"] == 123 | |
assert reloaded_poco_store_item2["count"] == 2 | |
# write with wildcard etag should work | |
reloaded_poco_item2["count"] = 100 | |
reloaded_poco_store_item2["count"] = 100 | |
reloaded_poco_store_item2["e_tag"] = "*" | |
wildcard_etag_dict = { | |
"pocoItem": reloaded_poco_item2, | |
"pocoStoreItem": reloaded_poco_store_item2, | |
} | |
await storage.write(wildcard_etag_dict) | |
reloaded_store_items3 = await storage.read(["pocoItem", "pocoStoreItem"]) | |
assert reloaded_store_items3["pocoItem"]["count"] == 100 | |
assert reloaded_store_items3["pocoStoreItem"]["count"] == 100 | |
# Write with empty etag should not work | |
reloaded_store_items4 = await storage.read(["pocoStoreItem"]) | |
reloaded_store_item4 = reloaded_store_items4["pocoStoreItem"] | |
assert reloaded_store_item4 is not None | |
reloaded_store_item4["e_tag"] = "" | |
dict2 = {"pocoStoreItem": reloaded_store_item4} | |
with pytest.raises(Exception) as err: | |
await storage.write(dict2) | |
assert err.value is not None | |
final_store_items = await storage.read(["pocoItem", "pocoStoreItem"]) | |
assert final_store_items["pocoItem"]["count"] == 100 | |
assert final_store_items["pocoStoreItem"]["count"] == 100 | |
return True | |
async def delete_object(storage) -> bool: | |
store_items = {"delete1": {"id": 1, "count": 1, "e_tag": "*"}} | |
await storage.write(store_items) | |
read_store_items = await storage.read(["delete1"]) | |
assert read_store_items["delete1"]["e_tag"] | |
assert read_store_items["delete1"]["count"] == 1 | |
await storage.delete(["delete1"]) | |
reloaded_store_items = await storage.read(["delete1"]) | |
assert reloaded_store_items.get("delete1", None) is None | |
return True | |
async def delete_unknown_object(storage) -> bool: | |
# noinspection PyBroadException | |
try: | |
await storage.delete(["unknown_key"]) | |
except: | |
pytest.fail("Should not raise") | |
return True | |
async def perform_batch_operations(storage) -> bool: | |
await storage.write( | |
{ | |
"batch1": {"count": 10}, | |
"batch2": {"count": 20}, | |
"batch3": {"count": 30}, | |
} | |
) | |
result = await storage.read(["batch1", "batch2", "batch3"]) | |
assert result.get("batch1", None) is not None | |
assert result.get("batch2", None) is not None | |
assert result.get("batch3", None) is not None | |
assert result["batch1"]["count"] == 10 | |
assert result["batch2"]["count"] == 20 | |
assert result["batch3"]["count"] == 30 | |
""" | |
If decided to validate e_tag integrity aagain, uncomment this code | |
assert result["batch1"].get("e_tag", None) is not None | |
assert result["batch2"].get("e_tag", None) is not None | |
assert result["batch3"].get("e_tag", None) is not None | |
""" | |
await storage.delete(["batch1", "batch2", "batch3"]) | |
result = await storage.read(["batch1", "batch2", "batch3"]) | |
assert result.get("batch1", None) is None | |
assert result.get("batch2", None) is None | |
assert result.get("batch3", None) is None | |
return True | |
async def proceeds_through_waterfall(storage) -> bool: | |
convo_state = ConversationState(storage) | |
dialog_state = convo_state.create_property("dialogState") | |
dialogs = DialogSet(dialog_state) | |
async def exec_test(turn_context: TurnContext) -> None: | |
dialog_context = await dialogs.create_context(turn_context) | |
await dialog_context.continue_dialog() | |
if not turn_context.responded: | |
await dialog_context.begin_dialog(WaterfallDialog.__name__) | |
await convo_state.save_changes(turn_context) | |
adapter = TestAdapter(exec_test) | |
async def prompt_validator(prompt_context: PromptValidatorContext): | |
result = prompt_context.recognized.value | |
if len(result) > 3: | |
succeeded_message = MessageFactory.text( | |
f"You got it at the {prompt_context.options.number_of_attempts}rd try!" | |
) | |
await prompt_context.context.send_activity(succeeded_message) | |
return True | |
reply = MessageFactory.text( | |
f"Please send a name that is longer than 3 characters. {prompt_context.options.number_of_attempts}" | |
) | |
await prompt_context.context.send_activity(reply) | |
return False | |
async def step_1(step_context: WaterfallStepContext) -> DialogTurnStatus: | |
assert isinstance(step_context.active_dialog.state["stepIndex"], int) | |
await step_context.context.send_activity("step1") | |
return Dialog.end_of_turn | |
async def step_2(step_context: WaterfallStepContext) -> None: | |
assert isinstance(step_context.active_dialog.state["stepIndex"], int) | |
await step_context.prompt( | |
TextPrompt.__name__, | |
PromptOptions(prompt=MessageFactory.text("Please type your name")), | |
) | |
async def step_3(step_context: WaterfallStepContext) -> DialogTurnStatus: | |
assert isinstance(step_context.active_dialog.state["stepIndex"], int) | |
await step_context.context.send_activity("step3") | |
return Dialog.end_of_turn | |
steps = [step_1, step_2, step_3] | |
dialogs.add(WaterfallDialog(WaterfallDialog.__name__, steps)) | |
dialogs.add(TextPrompt(TextPrompt.__name__, prompt_validator)) | |
step1 = await adapter.send("hello") | |
step2 = await step1.assert_reply("step1") | |
step3 = await step2.send("hello") | |
step4 = await step3.assert_reply("Please type your name") # None | |
step5 = await step4.send("hi") | |
step6 = await step5.assert_reply( | |
"Please send a name that is longer than 3 characters. 0" | |
) | |
step7 = await step6.send("hi") | |
step8 = await step7.assert_reply( | |
"Please send a name that is longer than 3 characters. 1" | |
) | |
step9 = await step8.send("hi") | |
step10 = await step9.assert_reply( | |
"Please send a name that is longer than 3 characters. 2" | |
) | |
step11 = await step10.send("Kyle") | |
step12 = await step11.assert_reply("You got it at the 3rd try!") | |
await step12.assert_reply("step3") | |
return True | |