Skip to content

Commit e189211

Browse files
mdrichardsonaxelsrz
authored andcommitted
Cosmos partitioned - Parity with C# and Node | Also storage base tests (#459)
* - added cosmosdb_partitioned_storage - added storage_base_tests - added test_cosmos_partitioned_storage * - added cosmosdb_partitioned_storage - added storage_base_tests - added test_cosmos_partitioned_storage * fixed bot_state so dialog tests pass * removed commented code * cosmos tests pass with storage_base_tests * blob storage uses storage_base_tests * memory_storage uses storage_base_tests * attempt to fix storage_base_tests import * moved storage_base_tests * black compliance * pylint compliance
1 parent d166d4d commit e189211

File tree

11 files changed

+1187
-198
lines changed

11 files changed

+1187
-198
lines changed

libraries/botbuilder-azure/botbuilder/azure/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77

88
from .about import __version__
99
from .cosmosdb_storage import CosmosDbStorage, CosmosDbConfig, CosmosDbKeyEscape
10+
from .cosmosdb_partitioned_storage import (
11+
CosmosDbPartitionedStorage,
12+
CosmosDbPartitionedConfig,
13+
)
1014
from .blob_storage import BlobStorage, BlobStorageSettings
1115

1216
__all__ = [
@@ -15,5 +19,7 @@
1519
"CosmosDbStorage",
1620
"CosmosDbConfig",
1721
"CosmosDbKeyEscape",
22+
"CosmosDbPartitionedStorage",
23+
"CosmosDbPartitionedConfig",
1824
"__version__",
1925
]

libraries/botbuilder-azure/botbuilder/azure/blob_storage.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from jsonpickle import encode
55
from jsonpickle.unpickler import Unpickler
6-
76
from azure.storage.blob import BlockBlobService, Blob, PublicAccess
87
from botbuilder.core import Storage
98

@@ -42,7 +41,7 @@ def __init__(self, settings: BlobStorageSettings):
4241

4342
async def read(self, keys: List[str]) -> Dict[str, object]:
4443
if not keys:
45-
raise Exception("Please provide at least one key to read from storage.")
44+
raise Exception("Keys are required when reading")
4645

4746
self.client.create_container(self.settings.container_name)
4847
self.client.set_container_acl(
@@ -63,24 +62,31 @@ async def read(self, keys: List[str]) -> Dict[str, object]:
6362
return items
6463

6564
async def write(self, changes: Dict[str, object]):
65+
if changes is None:
66+
raise Exception("Changes are required when writing")
67+
if not changes:
68+
return
69+
6670
self.client.create_container(self.settings.container_name)
6771
self.client.set_container_acl(
6872
self.settings.container_name, public_access=PublicAccess.Container
6973
)
7074

71-
for name, item in changes.items():
72-
e_tag = (
73-
None if not hasattr(item, "e_tag") or item.e_tag == "*" else item.e_tag
74-
)
75-
if e_tag:
76-
item.e_tag = e_tag.replace('"', '\\"')
75+
for (name, item) in changes.items():
76+
e_tag = item.e_tag if hasattr(item, "e_tag") else item.get("e_tag", None)
77+
e_tag = None if e_tag == "*" else e_tag
78+
if e_tag == "":
79+
raise Exception("blob_storage.write(): etag missing")
7780
item_str = self._store_item_to_str(item)
78-
self.client.create_blob_from_text(
79-
container_name=self.settings.container_name,
80-
blob_name=name,
81-
text=item_str,
82-
if_match=e_tag,
83-
)
81+
try:
82+
self.client.create_blob_from_text(
83+
container_name=self.settings.container_name,
84+
blob_name=name,
85+
text=item_str,
86+
if_match=e_tag,
87+
)
88+
except Exception as error:
89+
raise error
8490

8591
async def delete(self, keys: List[str]):
8692
if keys is None:
@@ -102,7 +108,6 @@ async def delete(self, keys: List[str]):
102108
def _blob_to_store_item(self, blob: Blob) -> object:
103109
item = json.loads(blob.content)
104110
item["e_tag"] = blob.properties.etag
105-
item["id"] = blob.name
106111
result = Unpickler().restore(item)
107112
return result
108113

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
"""CosmosDB Middleware for Python Bot Framework.
2+
3+
This is middleware to store items in CosmosDB.
4+
Part of the Azure Bot Framework in Python.
5+
"""
6+
7+
# Copyright (c) Microsoft Corporation. All rights reserved.
8+
# Licensed under the MIT License.
9+
from typing import Dict, List
10+
from threading import Semaphore
11+
import json
12+
13+
from azure.cosmos import documents, http_constants
14+
from jsonpickle.pickler import Pickler
15+
from jsonpickle.unpickler import Unpickler
16+
import azure.cosmos.cosmos_client as cosmos_client # pylint: disable=no-name-in-module,import-error
17+
import azure.cosmos.errors as cosmos_errors # pylint: disable=no-name-in-module,import-error
18+
from botbuilder.core.storage import Storage
19+
from botbuilder.azure import CosmosDbKeyEscape
20+
21+
22+
class CosmosDbPartitionedConfig:
23+
"""The class for partitioned CosmosDB configuration for the Azure Bot Framework."""
24+
25+
def __init__(
26+
self,
27+
cosmos_db_endpoint: str = None,
28+
auth_key: str = None,
29+
database_id: str = None,
30+
container_id: str = None,
31+
cosmos_client_options: dict = None,
32+
container_throughput: int = None,
33+
**kwargs,
34+
):
35+
"""Create the Config object.
36+
37+
:param cosmos_db_endpoint: The CosmosDB endpoint.
38+
:param auth_key: The authentication key for Cosmos DB.
39+
:param database_id: The database identifier for Cosmos DB instance.
40+
:param container_id: The container identifier.
41+
:param cosmos_client_options: The options for the CosmosClient. Currently only supports connection_policy and
42+
consistency_level
43+
:param container_throughput: The throughput set when creating the Container. Defaults to 400.
44+
:return CosmosDbPartitionedConfig:
45+
"""
46+
self.__config_file = kwargs.get("filename")
47+
if self.__config_file:
48+
kwargs = json.load(open(self.__config_file))
49+
self.cosmos_db_endpoint = cosmos_db_endpoint or kwargs.get("cosmos_db_endpoint")
50+
self.auth_key = auth_key or kwargs.get("auth_key")
51+
self.database_id = database_id or kwargs.get("database_id")
52+
self.container_id = container_id or kwargs.get("container_id")
53+
self.cosmos_client_options = cosmos_client_options or kwargs.get(
54+
"cosmos_client_options", {}
55+
)
56+
self.container_throughput = container_throughput or kwargs.get(
57+
"container_throughput"
58+
)
59+
60+
61+
class CosmosDbPartitionedStorage(Storage):
62+
"""The class for partitioned CosmosDB middleware for the Azure Bot Framework."""
63+
64+
def __init__(self, config: CosmosDbPartitionedConfig):
65+
"""Create the storage object.
66+
67+
:param config:
68+
"""
69+
super(CosmosDbPartitionedStorage, self).__init__()
70+
self.config = config
71+
self.client = None
72+
self.database = None
73+
self.container = None
74+
self.__semaphore = Semaphore()
75+
76+
async def read(self, keys: List[str]) -> Dict[str, object]:
77+
"""Read storeitems from storage.
78+
79+
:param keys:
80+
:return dict:
81+
"""
82+
if not keys:
83+
raise Exception("Keys are required when reading")
84+
85+
await self.initialize()
86+
87+
store_items = {}
88+
89+
for key in keys:
90+
try:
91+
escaped_key = CosmosDbKeyEscape.sanitize_key(key)
92+
93+
read_item_response = self.client.ReadItem(
94+
self.__item_link(escaped_key), {"partitionKey": escaped_key}
95+
)
96+
document_store_item = read_item_response
97+
if document_store_item:
98+
store_items[document_store_item["realId"]] = self.__create_si(
99+
document_store_item
100+
)
101+
# When an item is not found a CosmosException is thrown, but we want to
102+
# return an empty collection so in this instance we catch and do not rethrow.
103+
# Throw for any other exception.
104+
except cosmos_errors.HTTPFailure as err:
105+
if (
106+
err.status_code
107+
== cosmos_errors.http_constants.StatusCodes.NOT_FOUND
108+
):
109+
continue
110+
raise err
111+
except Exception as err:
112+
raise err
113+
return store_items
114+
115+
async def write(self, changes: Dict[str, object]):
116+
"""Save storeitems to storage.
117+
118+
:param changes:
119+
:return:
120+
"""
121+
if changes is None:
122+
raise Exception("Changes are required when writing")
123+
if not changes:
124+
return
125+
126+
await self.initialize()
127+
128+
for (key, change) in changes.items():
129+
e_tag = change.get("e_tag", None)
130+
doc = {
131+
"id": CosmosDbKeyEscape.sanitize_key(key),
132+
"realId": key,
133+
"document": self.__create_dict(change),
134+
}
135+
if e_tag == "":
136+
raise Exception("cosmosdb_storage.write(): etag missing")
137+
138+
access_condition = {
139+
"accessCondition": {"type": "IfMatch", "condition": e_tag}
140+
}
141+
options = (
142+
access_condition if e_tag != "*" and e_tag and e_tag != "" else None
143+
)
144+
try:
145+
self.client.UpsertItem(
146+
database_or_Container_link=self.__container_link,
147+
document=doc,
148+
options=options,
149+
)
150+
except cosmos_errors.HTTPFailure as err:
151+
raise err
152+
except Exception as err:
153+
raise err
154+
155+
async def delete(self, keys: List[str]):
156+
"""Remove storeitems from storage.
157+
158+
:param keys:
159+
:return:
160+
"""
161+
await self.initialize()
162+
163+
for key in keys:
164+
escaped_key = CosmosDbKeyEscape.sanitize_key(key)
165+
try:
166+
self.client.DeleteItem(
167+
document_link=self.__item_link(escaped_key),
168+
options={"partitionKey": escaped_key},
169+
)
170+
except cosmos_errors.HTTPFailure as err:
171+
if (
172+
err.status_code
173+
== cosmos_errors.http_constants.StatusCodes.NOT_FOUND
174+
):
175+
continue
176+
raise err
177+
except Exception as err:
178+
raise err
179+
180+
async def initialize(self):
181+
if not self.container:
182+
if not self.client:
183+
self.client = cosmos_client.CosmosClient(
184+
self.config.cosmos_db_endpoint,
185+
{"masterKey": self.config.auth_key},
186+
self.config.cosmos_client_options.get("connection_policy", None),
187+
self.config.cosmos_client_options.get("consistency_level", None),
188+
)
189+
190+
if not self.database:
191+
with self.__semaphore:
192+
try:
193+
self.database = self.client.CreateDatabase(
194+
{"id": self.config.database_id}
195+
)
196+
except cosmos_errors.HTTPFailure:
197+
self.database = self.client.ReadDatabase(
198+
"dbs/" + self.config.database_id
199+
)
200+
201+
if not self.container:
202+
with self.__semaphore:
203+
container_def = {
204+
"id": self.config.container_id,
205+
"partitionKey": {
206+
"paths": ["/id"],
207+
"kind": documents.PartitionKind.Hash,
208+
},
209+
}
210+
try:
211+
self.container = self.client.CreateContainer(
212+
"dbs/" + self.database["id"],
213+
container_def,
214+
{"offerThroughput": 400},
215+
)
216+
except cosmos_errors.HTTPFailure as err:
217+
if err.status_code == http_constants.StatusCodes.CONFLICT:
218+
self.container = self.client.ReadContainer(
219+
"dbs/"
220+
+ self.database["id"]
221+
+ "/colls/"
222+
+ container_def["id"]
223+
)
224+
else:
225+
raise err
226+
227+
@staticmethod
228+
def __create_si(result) -> object:
229+
"""Create an object from a result out of CosmosDB.
230+
231+
:param result:
232+
:return object:
233+
"""
234+
# get the document item from the result and turn into a dict
235+
doc = result.get("document")
236+
# read the e_tag from Cosmos
237+
if result.get("_etag"):
238+
doc["e_tag"] = result["_etag"]
239+
240+
result_obj = Unpickler().restore(doc)
241+
242+
# create and return the object
243+
return result_obj
244+
245+
@staticmethod
246+
def __create_dict(store_item: object) -> Dict:
247+
"""Return the dict of an object.
248+
249+
This eliminates non_magic attributes and the e_tag.
250+
251+
:param store_item:
252+
:return dict:
253+
"""
254+
# read the content
255+
json_dict = Pickler().flatten(store_item)
256+
if "e_tag" in json_dict:
257+
del json_dict["e_tag"]
258+
259+
# loop through attributes and write and return a dict
260+
return json_dict
261+
262+
def __item_link(self, identifier) -> str:
263+
"""Return the item link of a item in the container.
264+
265+
:param identifier:
266+
:return str:
267+
"""
268+
return self.__container_link + "/docs/" + identifier
269+
270+
@property
271+
def __container_link(self) -> str:
272+
"""Return the container link in the database.
273+
274+
:param:
275+
:return str:
276+
"""
277+
return self.__database_link + "/colls/" + self.config.container_id
278+
279+
@property
280+
def __database_link(self) -> str:
281+
"""Return the database link.
282+
283+
:return str:
284+
"""
285+
return "dbs/" + self.config.database_id

0 commit comments

Comments
 (0)