Skip to content

Add keysuffix and compat mode to cosmosdbpartitionedstorage #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Dict, List
from threading import Semaphore
from threading import Lock
import json

from azure.cosmos import documents, http_constants
Expand All @@ -29,7 +29,9 @@ def __init__(
database_id: str = None,
container_id: str = None,
cosmos_client_options: dict = None,
container_throughput: int = None,
container_throughput: int = 400,
key_suffix: str = "",
compatibility_mode: bool = False,
**kwargs,
):
"""Create the Config object.
Expand All @@ -41,6 +43,10 @@ def __init__(
:param cosmos_client_options: The options for the CosmosClient. Currently only supports connection_policy and
consistency_level
:param container_throughput: The throughput set when creating the Container. Defaults to 400.
:param key_suffix: The suffix to be added to every key. The keySuffix must contain only valid ComosDb
key characters. (e.g. not: '\\', '?', '/', '#', '*')
:param compatibility_mode: True if keys should be truncated in order to support previous CosmosDb
max key length of 255.
:return CosmosDbPartitionedConfig:
"""
self.__config_file = kwargs.get("filename")
Expand All @@ -56,6 +62,8 @@ def __init__(
self.container_throughput = container_throughput or kwargs.get(
"container_throughput"
)
self.key_suffix = key_suffix or kwargs.get("key_suffix")
self.compatibility_mode = compatibility_mode or kwargs.get("compatibility_mode")


class CosmosDbPartitionedStorage(Storage):
Expand All @@ -71,7 +79,21 @@ def __init__(self, config: CosmosDbPartitionedConfig):
self.client = None
self.database = None
self.container = None
self.__semaphore = Semaphore()
self.compatability_mode_partition_key = False
# Lock used for synchronizing container creation
self.__lock = Lock()
if config.key_suffix is None:
config.key_suffix = ""
if not config.key_suffix.__eq__(""):
if config.compatibility_mode:
raise Exception(
"compatibilityMode cannot be true while using a keySuffix."
)
suffix_escaped = CosmosDbKeyEscape.sanitize_key(config.key_suffix)
if not suffix_escaped.__eq__(config.key_suffix):
raise Exception(
f"Cannot use invalid Row Key characters: {config.key_suffix} in keySuffix."
)

async def read(self, keys: List[str]) -> Dict[str, object]:
"""Read storeitems from storage.
Expand All @@ -88,10 +110,12 @@ async def read(self, keys: List[str]) -> Dict[str, object]:

for key in keys:
try:
escaped_key = CosmosDbKeyEscape.sanitize_key(key)
escaped_key = CosmosDbKeyEscape.sanitize_key(
key, self.config.key_suffix, self.config.compatibility_mode
)

read_item_response = self.client.ReadItem(
self.__item_link(escaped_key), {"partitionKey": escaped_key}
self.__item_link(escaped_key), self.__get_partition_key(escaped_key)
)
document_store_item = read_item_response
if document_store_item:
Expand Down Expand Up @@ -128,7 +152,9 @@ async def write(self, changes: Dict[str, object]):
for (key, change) in changes.items():
e_tag = change.get("e_tag", None)
doc = {
"id": CosmosDbKeyEscape.sanitize_key(key),
"id": CosmosDbKeyEscape.sanitize_key(
key, self.config.key_suffix, self.config.compatibility_mode
),
"realId": key,
"document": self.__create_dict(change),
}
Expand Down Expand Up @@ -161,11 +187,13 @@ async def delete(self, keys: List[str]):
await self.initialize()

for key in keys:
escaped_key = CosmosDbKeyEscape.sanitize_key(key)
escaped_key = CosmosDbKeyEscape.sanitize_key(
key, self.config.key_suffix, self.config.compatibility_mode
)
try:
self.client.DeleteItem(
document_link=self.__item_link(escaped_key),
options={"partitionKey": escaped_key},
options=self.__get_partition_key(escaped_key),
)
except cosmos_errors.HTTPFailure as err:
if (
Expand All @@ -188,41 +216,57 @@ async def initialize(self):
)

if not self.database:
with self.__semaphore:
with self.__lock:
try:
self.database = self.client.CreateDatabase(
{"id": self.config.database_id}
)
if not self.database:
self.database = self.client.CreateDatabase(
{"id": self.config.database_id}
)
except cosmos_errors.HTTPFailure:
self.database = self.client.ReadDatabase(
"dbs/" + self.config.database_id
)

if not self.container:
with self.__semaphore:
container_def = {
"id": self.config.container_id,
"partitionKey": {
"paths": ["/id"],
"kind": documents.PartitionKind.Hash,
},
}
try:
self.container = self.client.CreateContainer(
"dbs/" + self.database["id"],
container_def,
{"offerThroughput": 400},
)
except cosmos_errors.HTTPFailure as err:
if err.status_code == http_constants.StatusCodes.CONFLICT:
self.container = self.client.ReadContainer(
"dbs/"
+ self.database["id"]
+ "/colls/"
+ container_def["id"]
self.__get_or_create_container()

def __get_or_create_container(self):
with self.__lock:
container_def = {
"id": self.config.container_id,
"partitionKey": {
"paths": ["/id"],
"kind": documents.PartitionKind.Hash,
},
}
try:
if not self.container:
self.container = self.client.CreateContainer(
"dbs/" + self.database["id"],
container_def,
{"offerThroughput": self.config.container_throughput},
)
except cosmos_errors.HTTPFailure as err:
if err.status_code == http_constants.StatusCodes.CONFLICT:
self.container = self.client.ReadContainer(
"dbs/" + self.database["id"] + "/colls/" + container_def["id"]
)
if "partitionKey" not in self.container:
self.compatability_mode_partition_key = True
else:
paths = self.container["partitionKey"]["paths"]
if "/partitionKey" in paths:
self.compatability_mode_partition_key = True
elif "/id" not in paths:
raise Exception(
f"Custom Partition Key Paths are not supported. {self.config.container_id} "
"has a custom Partition Key Path of {paths[0]}."
)
else:
raise err

else:
raise err

def __get_partition_key(self, key: str) -> str:
return None if self.compatability_mode_partition_key else {"partitionKey": key}

@staticmethod
def __create_si(result) -> object:
Expand Down
20 changes: 16 additions & 4 deletions libraries/botbuilder-azure/botbuilder/azure/cosmosdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ def __init__(

class CosmosDbKeyEscape:
@staticmethod
def sanitize_key(key) -> str:
def sanitize_key(
key: str, key_suffix: str = "", compatibility_mode: bool = True
) -> str:
"""Return the sanitized key.

Replace characters that are not allowed in keys in Cosmos.

:param key:
:param key: The provided key to be escaped.
:param key_suffix: The string to add a the end of all RowKeys.
:param compatibility_mode: True if keys should be truncated in order to support previous CosmosDb
max key length of 255. This behavior can be overridden by setting
cosmosdb_partitioned_config.compatibility_mode to False.
:return str:
"""
# forbidden characters
Expand All @@ -72,12 +78,18 @@ def sanitize_key(key) -> str:
# Unicode code point of the character and return the new string
key = "".join(map(lambda x: "*" + str(ord(x)) if x in bad_chars else x, key))

return CosmosDbKeyEscape.truncate_key(key)
if key_suffix is None:
key_suffix = ""

return CosmosDbKeyEscape.truncate_key(f"{key}{key_suffix}", compatibility_mode)

@staticmethod
def truncate_key(key: str) -> str:
def truncate_key(key: str, compatibility_mode: bool = True) -> str:
max_key_len = 255

if not compatibility_mode:
return key

if len(key) > max_key_len:
aux_hash = sha256(key.encode("utf-8"))
aux_hex = aux_hash.hexdigest()
Expand Down