Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-azure
/botbuilder
/azure
/cosmosdb_storage.py
"""Implements a CosmosDB based storage provider. | |
""" | |
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
from hashlib import sha256 | |
from typing import Dict, List | |
from threading import Semaphore | |
import json | |
import warnings | |
from jsonpickle.pickler import Pickler | |
from jsonpickle.unpickler import Unpickler | |
import azure.cosmos.cosmos_client as cosmos_client # pylint: disable=no-name-in-module,import-error | |
import azure.cosmos.errors as cosmos_errors # pylint: disable=no-name-in-module,import-error | |
from botbuilder.core.storage import Storage | |
class CosmosDbConfig: | |
"""The class for CosmosDB configuration for the Azure Bot Framework.""" | |
def __init__( | |
self, | |
endpoint: str = None, | |
masterkey: str = None, | |
database: str = None, | |
container: str = None, | |
partition_key: str = None, | |
database_creation_options: dict = None, | |
container_creation_options: dict = None, | |
**kwargs, | |
): | |
"""Create the Config object. | |
:param endpoint: | |
:param masterkey: | |
:param database: | |
:param container: | |
:param filename: | |
:return CosmosDbConfig: | |
""" | |
self.__config_file = kwargs.get("filename") | |
if self.__config_file: | |
kwargs = json.load(open(self.__config_file)) | |
self.endpoint = endpoint or kwargs.get("endpoint") | |
self.masterkey = masterkey or kwargs.get("masterkey") | |
self.database = database or kwargs.get("database", "bot_db") | |
self.container = container or kwargs.get("container", "bot_container") | |
self.partition_key = partition_key or kwargs.get("partition_key") | |
self.database_creation_options = database_creation_options or kwargs.get( | |
"database_creation_options" | |
) | |
self.container_creation_options = container_creation_options or kwargs.get( | |
"container_creation_options" | |
) | |
class CosmosDbKeyEscape: | |
def sanitize_key( | |
key: str, key_suffix: str = "", compatibility_mode: bool = True | |
) -> str: | |
"""Return the sanitized key. | |
Replace characters that are not allowed in keys in Cosmos. | |
:param key: The provided key to be escaped. | |
:param key_suffix: The string to add a the end of all RowKeys. | |
:param compatibility_mode: True if keys should be truncated in order to support previous CosmosDb | |
max key length of 255. This behavior can be overridden by setting | |
cosmosdb_partitioned_config.compatibility_mode to False. | |
:return str: | |
""" | |
# forbidden characters | |
bad_chars = ["\\", "?", "/", "#", "\t", "\n", "\r", "*"] | |
# replace those with with '*' and the | |
# Unicode code point of the character and return the new string | |
key = "".join(map(lambda x: "*" + str(ord(x)) if x in bad_chars else x, key)) | |
if key_suffix is None: | |
key_suffix = "" | |
return CosmosDbKeyEscape.truncate_key(f"{key}{key_suffix}", compatibility_mode) | |
def truncate_key(key: str, compatibility_mode: bool = True) -> str: | |
max_key_len = 255 | |
if not compatibility_mode: | |
return key | |
if len(key) > max_key_len: | |
aux_hash = sha256(key.encode("utf-8")) | |
aux_hex = aux_hash.hexdigest() | |
key = key[0 : max_key_len - len(aux_hex)] + aux_hex | |
return key | |
class CosmosDbStorage(Storage): | |
"""A CosmosDB based storage provider for a bot.""" | |
def __init__( | |
self, config: CosmosDbConfig, client: cosmos_client.CosmosClient = None | |
): | |
"""Create the storage object. | |
:param config: | |
""" | |
super(CosmosDbStorage, self).__init__() | |
warnings.warn( | |
"CosmosDbStorage is obsolete. Use CosmosDbPartitionedStorage instead." | |
) | |
self.config = config | |
self.client = client or cosmos_client.CosmosClient( | |
self.config.endpoint, {"masterKey": self.config.masterkey} | |
) | |
# these are set by the functions that check | |
# the presence of the database and container or creates them | |
self.database = None | |
self.container = None | |
self._database_creation_options = config.database_creation_options | |
self._container_creation_options = config.container_creation_options | |
self.__semaphore = Semaphore() | |
async def read(self, keys: List[str]) -> Dict[str, object]: | |
"""Read storeitems from storage. | |
:param keys: | |
:return dict: | |
""" | |
try: | |
# check if the database and container exists and if not create | |
if not self.__container_exists: | |
self.__create_db_and_container() | |
if keys: | |
# create the parameters object | |
parameters = [ | |
{ | |
"name": f"@id{i}", | |
"value": f"{CosmosDbKeyEscape.sanitize_key(key)}", | |
} | |
for i, key in enumerate(keys) | |
] | |
# get the names of the params | |
parameter_sequence = ",".join(param.get("name") for param in parameters) | |
# create the query | |
query = { | |
"query": f"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameter_sequence})", | |
"parameters": parameters, | |
} | |
if self.config.partition_key: | |
options = {"partitionKey": self.config.partition_key} | |
else: | |
options = {"enableCrossPartitionQuery": True} | |
# run the query and store the results as a list | |
results = list( | |
self.client.QueryItems(self.__container_link, query, options) | |
) | |
# return a dict with a key and an object | |
return {r.get("realId"): self.__create_si(r) for r in results} | |
# No keys passed in, no result to return. | |
return {} | |
except TypeError as error: | |
raise error | |
async def write(self, changes: Dict[str, object]): | |
"""Save storeitems to storage. | |
:param changes: | |
:return: | |
""" | |
if changes is None: | |
raise Exception("Changes are required when writing") | |
if not changes: | |
return | |
try: | |
# check if the database and container exists and if not create | |
if not self.__container_exists: | |
self.__create_db_and_container() | |
# iterate over the changes | |
for key, change in changes.items(): | |
# store the e_tag | |
e_tag = None | |
if isinstance(change, dict): | |
e_tag = change.get("e_tag", None) | |
elif hasattr(change, "e_tag"): | |
e_tag = change.e_tag | |
# create the new document | |
doc = { | |
"id": CosmosDbKeyEscape.sanitize_key(key), | |
"realId": key, | |
"document": self.__create_dict(change), | |
} | |
if e_tag == "": | |
raise Exception("cosmosdb_storage.write(): etag missing") | |
# the e_tag will be * for new docs so do an insert | |
if e_tag == "*" or not e_tag: | |
self.client.UpsertItem( | |
database_or_Container_link=self.__container_link, | |
document=doc, | |
options={"disableAutomaticIdGeneration": True}, | |
) | |
# if we have an etag, do opt. concurrency replace | |
elif e_tag: | |
access_condition = {"type": "IfMatch", "condition": e_tag} | |
self.client.ReplaceItem( | |
document_link=self.__item_link( | |
CosmosDbKeyEscape.sanitize_key(key) | |
), | |
new_document=doc, | |
options={"accessCondition": access_condition}, | |
) | |
except Exception as error: | |
raise error | |
async def delete(self, keys: List[str]): | |
"""Remove storeitems from storage. | |
:param keys: | |
:return: | |
""" | |
try: | |
# check if the database and container exists and if not create | |
if not self.__container_exists: | |
self.__create_db_and_container() | |
options = {} | |
if self.config.partition_key: | |
options["partitionKey"] = self.config.partition_key | |
# call the function for each key | |
for key in keys: | |
self.client.DeleteItem( | |
document_link=self.__item_link(CosmosDbKeyEscape.sanitize_key(key)), | |
options=options, | |
) | |
# print(res) | |
except cosmos_errors.HTTPFailure as http_failure: | |
# print(h.status_code) | |
if http_failure.status_code != 404: | |
raise http_failure | |
except TypeError as error: | |
raise error | |
def __create_si(self, result) -> object: | |
"""Create an object from a result out of CosmosDB. | |
:param result: | |
:return object: | |
""" | |
# get the document item from the result and turn into a dict | |
doc = result.get("document") | |
# read the e_tag from Cosmos | |
if result.get("_etag"): | |
doc["e_tag"] = result["_etag"] | |
result_obj = Unpickler().restore(doc) | |
# create and return the object | |
return result_obj | |
def __create_dict(self, store_item: object) -> Dict: | |
"""Return the dict of an object. | |
This eliminates non_magic attributes and the e_tag. | |
:param store_item: | |
:return dict: | |
""" | |
# read the content | |
json_dict = Pickler().flatten(store_item) | |
if "e_tag" in json_dict: | |
del json_dict["e_tag"] | |
# loop through attributes and write and return a dict | |
return json_dict | |
def __item_link(self, identifier) -> str: | |
"""Return the item link of a item in the container. | |
:param identifier: | |
:return str: | |
""" | |
return self.__container_link + "/docs/" + identifier | |
def __container_link(self) -> str: | |
"""Return the container link in the database. | |
:param: | |
:return str: | |
""" | |
return self.__database_link + "/colls/" + self.container | |
def __database_link(self) -> str: | |
"""Return the database link. | |
:return str: | |
""" | |
return "dbs/" + self.database | |
def __container_exists(self) -> bool: | |
"""Return whether the database and container have been created. | |
:return bool: | |
""" | |
return self.database and self.container | |
def __create_db_and_container(self): | |
"""Call the get or create methods.""" | |
with self.__semaphore: | |
db_id = self.config.database | |
container_name = self.config.container | |
self.database = self._get_or_create_database(self.client, db_id) | |
self.container = self._get_or_create_container(self.client, container_name) | |
def _get_or_create_database( # pylint: disable=invalid-name | |
self, doc_client, id | |
) -> str: | |
"""Return the database link. | |
Check if the database exists or create the database. | |
:param doc_client: | |
:param id: | |
:return str: | |
""" | |
# query CosmosDB for a database with that name/id | |
dbs = list( | |
doc_client.QueryDatabases( | |
{ | |
"query": "SELECT * FROM r WHERE r.id=@id", | |
"parameters": [{"name": "@id", "value": id}], | |
} | |
) | |
) | |
# if there are results, return the first (database names are unique) | |
if dbs: | |
return dbs[0]["id"] | |
# create the database if it didn't exist | |
res = doc_client.CreateDatabase({"id": id}, self._database_creation_options) | |
return res["id"] | |
def _get_or_create_container(self, doc_client, container) -> str: | |
"""Return the container link. | |
Check if the container exists or create the container. | |
:param doc_client: | |
:param container: | |
:return str: | |
""" | |
# query CosmosDB for a container in the database with that name | |
containers = list( | |
doc_client.QueryContainers( | |
self.__database_link, | |
{ | |
"query": "SELECT * FROM r WHERE r.id=@id", | |
"parameters": [{"name": "@id", "value": container}], | |
}, | |
) | |
) | |
# if there are results, return the first (container names are unique) | |
if containers: | |
return containers[0]["id"] | |
# Create a container if it didn't exist | |
res = doc_client.CreateContainer( | |
self.__database_link, {"id": container}, self._container_creation_options | |
) | |
return res["id"] | |