cohit's picture
Upload folder using huggingface_hub
0827183 verified
"""Implements a CosmosDB based storage provider using partitioning for a bot.
"""
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Dict, List
from threading import Lock
import json
from azure.cosmos import documents, http_constants
from jsonpickle.pickler import Pickler
from jsonpickle.unpickler import Unpickler
import azure.cosmos.cosmos_client as cosmos_client # pylint: disable=no-name-in-module,import-error
import azure.cosmos.errors as cosmos_errors # pylint: disable=no-name-in-module,import-error
from botbuilder.core.storage import Storage
from botbuilder.azure import CosmosDbKeyEscape
class CosmosDbPartitionedConfig:
"""The class for partitioned CosmosDB configuration for the Azure Bot Framework."""
def __init__(
self,
cosmos_db_endpoint: str = None,
auth_key: str = None,
database_id: str = None,
container_id: str = None,
cosmos_client_options: dict = None,
container_throughput: int = 400,
key_suffix: str = "",
compatibility_mode: bool = False,
**kwargs,
):
"""Create the Config object.
:param cosmos_db_endpoint: The CosmosDB endpoint.
:param auth_key: The authentication key for Cosmos DB.
:param database_id: The database identifier for Cosmos DB instance.
:param container_id: The container identifier.
:param cosmos_client_options: The options for the CosmosClient. Currently only supports connection_policy and
consistency_level
:param container_throughput: The throughput set when creating the Container. Defaults to 400.
:param key_suffix: The suffix to be added to every key. The keySuffix must contain only valid ComosDb
key characters. (e.g. not: '\\', '?', '/', '#', '*')
:param compatibility_mode: True if keys should be truncated in order to support previous CosmosDb
max key length of 255.
:return CosmosDbPartitionedConfig:
"""
self.__config_file = kwargs.get("filename")
if self.__config_file:
kwargs = json.load(open(self.__config_file))
self.cosmos_db_endpoint = cosmos_db_endpoint or kwargs.get("cosmos_db_endpoint")
self.auth_key = auth_key or kwargs.get("auth_key")
self.database_id = database_id or kwargs.get("database_id")
self.container_id = container_id or kwargs.get("container_id")
self.cosmos_client_options = cosmos_client_options or kwargs.get(
"cosmos_client_options", {}
)
self.container_throughput = container_throughput or kwargs.get(
"container_throughput"
)
self.key_suffix = key_suffix or kwargs.get("key_suffix")
self.compatibility_mode = compatibility_mode or kwargs.get("compatibility_mode")
class CosmosDbPartitionedStorage(Storage):
"""A CosmosDB based storage provider using partitioning for a bot."""
def __init__(self, config: CosmosDbPartitionedConfig):
"""Create the storage object.
:param config:
"""
super(CosmosDbPartitionedStorage, self).__init__()
self.config = config
self.client = None
self.database = None
self.container = None
self.compatability_mode_partition_key = False
# Lock used for synchronizing container creation
self.__lock = Lock()
if config.key_suffix is None:
config.key_suffix = ""
if not config.key_suffix.__eq__(""):
if config.compatibility_mode:
raise Exception(
"compatibilityMode cannot be true while using a keySuffix."
)
suffix_escaped = CosmosDbKeyEscape.sanitize_key(config.key_suffix)
if not suffix_escaped.__eq__(config.key_suffix):
raise Exception(
f"Cannot use invalid Row Key characters: {config.key_suffix} in keySuffix."
)
async def read(self, keys: List[str]) -> Dict[str, object]:
"""Read storeitems from storage.
:param keys:
:return dict:
"""
if not keys:
raise Exception("Keys are required when reading")
await self.initialize()
store_items = {}
for key in keys:
try:
escaped_key = CosmosDbKeyEscape.sanitize_key(
key, self.config.key_suffix, self.config.compatibility_mode
)
read_item_response = self.client.ReadItem(
self.__item_link(escaped_key), self.__get_partition_key(escaped_key)
)
document_store_item = read_item_response
if document_store_item:
store_items[document_store_item["realId"]] = self.__create_si(
document_store_item
)
# When an item is not found a CosmosException is thrown, but we want to
# return an empty collection so in this instance we catch and do not rethrow.
# Throw for any other exception.
except cosmos_errors.HTTPFailure as err:
if (
err.status_code
== cosmos_errors.http_constants.StatusCodes.NOT_FOUND
):
continue
raise err
except Exception as err:
raise err
return store_items
async def write(self, changes: Dict[str, object]):
"""Save storeitems to storage.
:param changes:
:return:
"""
if changes is None:
raise Exception("Changes are required when writing")
if not changes:
return
await self.initialize()
for key, change in changes.items():
e_tag = None
if isinstance(change, dict):
e_tag = change.get("e_tag", None)
elif hasattr(change, "e_tag"):
e_tag = change.e_tag
doc = {
"id": CosmosDbKeyEscape.sanitize_key(
key, self.config.key_suffix, self.config.compatibility_mode
),
"realId": key,
"document": self.__create_dict(change),
}
if e_tag == "":
raise Exception("cosmosdb_storage.write(): etag missing")
access_condition = {
"accessCondition": {"type": "IfMatch", "condition": e_tag}
}
options = (
access_condition if e_tag != "*" and e_tag and e_tag != "" else None
)
try:
self.client.UpsertItem(
database_or_Container_link=self.__container_link,
document=doc,
options=options,
)
except cosmos_errors.HTTPFailure as err:
raise err
except Exception as err:
raise err
async def delete(self, keys: List[str]):
"""Remove storeitems from storage.
:param keys:
:return:
"""
await self.initialize()
for key in keys:
escaped_key = CosmosDbKeyEscape.sanitize_key(
key, self.config.key_suffix, self.config.compatibility_mode
)
try:
self.client.DeleteItem(
document_link=self.__item_link(escaped_key),
options=self.__get_partition_key(escaped_key),
)
except cosmos_errors.HTTPFailure as err:
if (
err.status_code
== cosmos_errors.http_constants.StatusCodes.NOT_FOUND
):
continue
raise err
except Exception as err:
raise err
async def initialize(self):
if not self.container:
if not self.client:
self.client = cosmos_client.CosmosClient(
self.config.cosmos_db_endpoint,
{"masterKey": self.config.auth_key},
self.config.cosmos_client_options.get("connection_policy", None),
self.config.cosmos_client_options.get("consistency_level", None),
)
if not self.database:
with self.__lock:
try:
if not self.database:
self.database = self.client.CreateDatabase(
{"id": self.config.database_id}
)
except cosmos_errors.HTTPFailure:
self.database = self.client.ReadDatabase(
"dbs/" + self.config.database_id
)
self.__get_or_create_container()
def __get_or_create_container(self):
with self.__lock:
container_def = {
"id": self.config.container_id,
"partitionKey": {
"paths": ["/id"],
"kind": documents.PartitionKind.Hash,
},
}
try:
if not self.container:
self.container = self.client.CreateContainer(
"dbs/" + self.database["id"],
container_def,
{"offerThroughput": self.config.container_throughput},
)
except cosmos_errors.HTTPFailure as err:
if err.status_code == http_constants.StatusCodes.CONFLICT:
self.container = self.client.ReadContainer(
"dbs/" + self.database["id"] + "/colls/" + container_def["id"]
)
if "partitionKey" not in self.container:
self.compatability_mode_partition_key = True
else:
paths = self.container["partitionKey"]["paths"]
if "/partitionKey" in paths:
self.compatability_mode_partition_key = True
elif "/id" not in paths:
raise Exception(
f"Custom Partition Key Paths are not supported. {self.config.container_id} "
"has a custom Partition Key Path of {paths[0]}."
)
else:
raise err
def __get_partition_key(self, key: str) -> str:
return None if self.compatability_mode_partition_key else {"partitionKey": key}
@staticmethod
def __create_si(result) -> object:
"""Create an object from a result out of CosmosDB.
:param result:
:return object:
"""
# get the document item from the result and turn into a dict
doc = result.get("document")
# read the e_tag from Cosmos
if result.get("_etag"):
doc["e_tag"] = result["_etag"]
result_obj = Unpickler().restore(doc)
# create and return the object
return result_obj
@staticmethod
def __create_dict(store_item: object) -> Dict:
"""Return the dict of an object.
This eliminates non_magic attributes and the e_tag.
:param store_item:
:return dict:
"""
# read the content
json_dict = Pickler().flatten(store_item)
if "e_tag" in json_dict:
del json_dict["e_tag"]
# loop through attributes and write and return a dict
return json_dict
def __item_link(self, identifier) -> str:
"""Return the item link of a item in the container.
:param identifier:
:return str:
"""
return self.__container_link + "/docs/" + identifier
@property
def __container_link(self) -> str:
"""Return the container link in the database.
:param:
:return str:
"""
return self.__database_link + "/colls/" + self.config.container_id
@property
def __database_link(self) -> str:
"""Return the database link.
:return str:
"""
return "dbs/" + self.config.database_id