Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-azure
/botbuilder
/azure
/azure_queue_storage.py
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
from azure.core.exceptions import ResourceExistsError | |
from azure.storage.queue.aio import QueueClient | |
from jsonpickle import encode | |
from botbuilder.core import QueueStorage | |
from botbuilder.schema import Activity | |
class AzureQueueStorage(QueueStorage): | |
def __init__(self, queues_storage_connection_string: str, queue_name: str): | |
if not queues_storage_connection_string: | |
raise Exception("queues_storage_connection_string cannot be empty.") | |
if not queue_name: | |
raise Exception("queue_name cannot be empty.") | |
self.__queue_client = QueueClient.from_connection_string( | |
queues_storage_connection_string, queue_name | |
) | |
self.__initialized = False | |
async def _initialize(self): | |
if self.__initialized is False: | |
# This should only happen once - assuming this is a singleton. | |
# There is no `create_queue_if_exists` or `exists` method, so we need to catch the ResourceExistsError. | |
try: | |
await self.__queue_client.create_queue() | |
except ResourceExistsError: | |
pass | |
self.__initialized = True | |
return self.__initialized | |
async def queue_activity( | |
self, | |
activity: Activity, | |
visibility_timeout: int = None, | |
time_to_live: int = None, | |
) -> str: | |
""" | |
Enqueues an Activity for later processing. The visibility timeout specifies how long the message should be | |
visible to Dequeue and Peek operations. | |
:param activity: The activity to be queued for later processing. | |
:type activity: :class:`botbuilder.schema.Activity` | |
:param visibility_timeout: Visibility timeout in seconds. Optional with a default value of 0. | |
Cannot be larger than 7 days. | |
:type visibility_timeout: int | |
:param time_to_live: Specifies the time-to-live interval for the message in seconds. | |
:type time_to_live: int | |
:returns: QueueMessage as a JSON string. | |
:rtype: :class:`azure.storage.queue.QueueMessage` | |
""" | |
await self._initialize() | |
# Encode the activity as a JSON string. | |
message = encode(activity) | |
receipt = await self.__queue_client.send_message( | |
message, visibility_timeout=visibility_timeout, time_to_live=time_to_live | |
) | |
# Encode the QueueMessage receipt as a JSON string. | |
return encode(receipt) | |