7
7
# Copyright (c) Microsoft Corporation. All rights reserved.
8
8
# Licensed under the MIT License.
9
9
from typing import Dict , List
10
- from threading import Semaphore
10
+ from threading import Lock
11
11
import json
12
12
13
13
from azure .cosmos import documents , http_constants
@@ -29,7 +29,9 @@ def __init__(
29
29
database_id : str = None ,
30
30
container_id : str = None ,
31
31
cosmos_client_options : dict = None ,
32
- container_throughput : int = None ,
32
+ container_throughput : int = 400 ,
33
+ key_suffix : str = "" ,
34
+ compatibility_mode : bool = False ,
33
35
** kwargs ,
34
36
):
35
37
"""Create the Config object.
@@ -41,6 +43,10 @@ def __init__(
41
43
:param cosmos_client_options: The options for the CosmosClient. Currently only supports connection_policy and
42
44
consistency_level
43
45
:param container_throughput: The throughput set when creating the Container. Defaults to 400.
46
+ :param key_suffix: The suffix to be added to every key. The keySuffix must contain only valid ComosDb
47
+ key characters. (e.g. not: '\\ ', '?', '/', '#', '*')
48
+ :param compatibility_mode: True if keys should be truncated in order to support previous CosmosDb
49
+ max key length of 255.
44
50
:return CosmosDbPartitionedConfig:
45
51
"""
46
52
self .__config_file = kwargs .get ("filename" )
@@ -56,6 +62,8 @@ def __init__(
56
62
self .container_throughput = container_throughput or kwargs .get (
57
63
"container_throughput"
58
64
)
65
+ self .key_suffix = key_suffix or kwargs .get ("key_suffix" )
66
+ self .compatibility_mode = compatibility_mode or kwargs .get ("compatibility_mode" )
59
67
60
68
61
69
class CosmosDbPartitionedStorage (Storage ):
@@ -71,7 +79,21 @@ def __init__(self, config: CosmosDbPartitionedConfig):
71
79
self .client = None
72
80
self .database = None
73
81
self .container = None
74
- self .__semaphore = Semaphore ()
82
+ self .compatability_mode_partition_key = False
83
+ # Lock used for synchronizing container creation
84
+ self .__lock = Lock ()
85
+ if config .key_suffix is None :
86
+ config .key_suffix = ""
87
+ if not config .key_suffix .__eq__ ("" ):
88
+ if config .compatibility_mode :
89
+ raise Exception (
90
+ "compatibilityMode cannot be true while using a keySuffix."
91
+ )
92
+ suffix_escaped = CosmosDbKeyEscape .sanitize_key (config .key_suffix )
93
+ if not suffix_escaped .__eq__ (config .key_suffix ):
94
+ raise Exception (
95
+ f"Cannot use invalid Row Key characters: { config .key_suffix } in keySuffix."
96
+ )
75
97
76
98
async def read (self , keys : List [str ]) -> Dict [str , object ]:
77
99
"""Read storeitems from storage.
@@ -88,10 +110,12 @@ async def read(self, keys: List[str]) -> Dict[str, object]:
88
110
89
111
for key in keys :
90
112
try :
91
- escaped_key = CosmosDbKeyEscape .sanitize_key (key )
113
+ escaped_key = CosmosDbKeyEscape .sanitize_key (
114
+ key , self .config .key_suffix , self .config .compatibility_mode
115
+ )
92
116
93
117
read_item_response = self .client .ReadItem (
94
- self .__item_link (escaped_key ), { "partitionKey" : escaped_key }
118
+ self .__item_link (escaped_key ), self . __get_partition_key ( escaped_key )
95
119
)
96
120
document_store_item = read_item_response
97
121
if document_store_item :
@@ -128,7 +152,9 @@ async def write(self, changes: Dict[str, object]):
128
152
for (key , change ) in changes .items ():
129
153
e_tag = change .get ("e_tag" , None )
130
154
doc = {
131
- "id" : CosmosDbKeyEscape .sanitize_key (key ),
155
+ "id" : CosmosDbKeyEscape .sanitize_key (
156
+ key , self .config .key_suffix , self .config .compatibility_mode
157
+ ),
132
158
"realId" : key ,
133
159
"document" : self .__create_dict (change ),
134
160
}
@@ -161,11 +187,13 @@ async def delete(self, keys: List[str]):
161
187
await self .initialize ()
162
188
163
189
for key in keys :
164
- escaped_key = CosmosDbKeyEscape .sanitize_key (key )
190
+ escaped_key = CosmosDbKeyEscape .sanitize_key (
191
+ key , self .config .key_suffix , self .config .compatibility_mode
192
+ )
165
193
try :
166
194
self .client .DeleteItem (
167
195
document_link = self .__item_link (escaped_key ),
168
- options = { "partitionKey" : escaped_key } ,
196
+ options = self . __get_partition_key ( escaped_key ) ,
169
197
)
170
198
except cosmos_errors .HTTPFailure as err :
171
199
if (
@@ -188,41 +216,57 @@ async def initialize(self):
188
216
)
189
217
190
218
if not self .database :
191
- with self .__semaphore :
219
+ with self .__lock :
192
220
try :
193
- self .database = self .client .CreateDatabase (
194
- {"id" : self .config .database_id }
195
- )
221
+ if not self .database :
222
+ self .database = self .client .CreateDatabase (
223
+ {"id" : self .config .database_id }
224
+ )
196
225
except cosmos_errors .HTTPFailure :
197
226
self .database = self .client .ReadDatabase (
198
227
"dbs/" + self .config .database_id
199
228
)
200
229
201
- if not self .container :
202
- with self .__semaphore :
203
- container_def = {
204
- "id" : self .config .container_id ,
205
- "partitionKey" : {
206
- "paths" : ["/id" ],
207
- "kind" : documents .PartitionKind .Hash ,
208
- },
209
- }
210
- try :
211
- self .container = self .client .CreateContainer (
212
- "dbs/" + self .database ["id" ],
213
- container_def ,
214
- {"offerThroughput" : 400 },
215
- )
216
- except cosmos_errors .HTTPFailure as err :
217
- if err .status_code == http_constants .StatusCodes .CONFLICT :
218
- self .container = self .client .ReadContainer (
219
- "dbs/"
220
- + self .database ["id" ]
221
- + "/colls/"
222
- + container_def ["id" ]
230
+ self .__get_or_create_container ()
231
+
232
+ def __get_or_create_container (self ):
233
+ with self .__lock :
234
+ container_def = {
235
+ "id" : self .config .container_id ,
236
+ "partitionKey" : {
237
+ "paths" : ["/id" ],
238
+ "kind" : documents .PartitionKind .Hash ,
239
+ },
240
+ }
241
+ try :
242
+ if not self .container :
243
+ self .container = self .client .CreateContainer (
244
+ "dbs/" + self .database ["id" ],
245
+ container_def ,
246
+ {"offerThroughput" : self .config .container_throughput },
247
+ )
248
+ except cosmos_errors .HTTPFailure as err :
249
+ if err .status_code == http_constants .StatusCodes .CONFLICT :
250
+ self .container = self .client .ReadContainer (
251
+ "dbs/" + self .database ["id" ] + "/colls/" + container_def ["id" ]
252
+ )
253
+ if "partitionKey" not in self .container :
254
+ self .compatability_mode_partition_key = True
255
+ else :
256
+ paths = self .container ["partitionKey" ]["paths" ]
257
+ if "/partitionKey" in paths :
258
+ self .compatability_mode_partition_key = True
259
+ elif "/id" not in paths :
260
+ raise Exception (
261
+ f"Custom Partition Key Paths are not supported. { self .config .container_id } "
262
+ "has a custom Partition Key Path of {paths[0]}."
223
263
)
224
- else :
225
- raise err
264
+
265
+ else :
266
+ raise err
267
+
268
+ def __get_partition_key (self , key : str ) -> str :
269
+ return None if self .compatability_mode_partition_key else {"partitionKey" : key }
226
270
227
271
@staticmethod
228
272
def __create_si (result ) -> object :
0 commit comments