Skip to content
This repository was archived by the owner on Jan 5, 2026. It is now read-only.

Commit f4994af

Browse files
committed
sanitize key tests
1 parent c8d87e9 commit f4994af

File tree

3 files changed

+129
-24
lines changed

3 files changed

+129
-24
lines changed

libraries/botbuilder-azure/botbuilder/azure/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
# --------------------------------------------------------------------------
77

88
from .about import __version__
9-
from .cosmosdb_storage import CosmosDbStorage, CosmosDbConfig
9+
from .cosmosdb_storage import CosmosDbStorage, CosmosDbConfig, CosmosDbKeyEscape
1010

1111
__all__ = ['CosmosDbStorage',
1212
'CosmosDbConfig',
13+
'CosmosDbKeyEscape',
1314
'__version__']

libraries/botbuilder-azure/botbuilder/azure/cosmosdb_storage.py

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
# Copyright (c) Microsoft Corporation. All rights reserved.
88
# Licensed under the MIT License.
9-
9+
from hashlib import sha256
1010
from typing import Dict, List
1111
from threading import Semaphore
1212
import json
@@ -42,6 +42,42 @@ def __init__(self, endpoint: str = None, masterkey: str = None, database: str =
4242
self.container_creation_options = container_creation_options or kwargs.get('container_creation_options')
4343

4444

45+
class CosmosDbKeyEscape:
46+
47+
@staticmethod
48+
def sanitize_key(key) -> str:
49+
"""Return the sanitized key.
50+
51+
Replace characters that are not allowed in keys in Cosmos.
52+
53+
:param key:
54+
:return str:
55+
"""
56+
# forbidden characters
57+
bad_chars = ['\\', '?', '/', '#', '\t', '\n', '\r', '*']
58+
# replace those with with '*' and the
59+
# Unicode code point of the character and return the new string
60+
key = ''.join(
61+
map(
62+
lambda x: '*' + str(ord(x)) if x in bad_chars else x, key
63+
)
64+
)
65+
66+
return CosmosDbKeyEscape.truncate_key(key)
67+
68+
@staticmethod
69+
def truncate_key(key: str) -> str:
70+
MAX_KEY_LEN = 255
71+
72+
if len(key) > MAX_KEY_LEN:
73+
aux_hash = sha256(key.encode('utf-8'))
74+
aux_hex = aux_hash.hexdigest()
75+
76+
key = key[0:MAX_KEY_LEN - len(aux_hex)] + aux_hex
77+
78+
return key
79+
80+
4581
class CosmosDbStorage(Storage):
4682
"""The class for CosmosDB middleware for the Azure Bot Framework."""
4783

@@ -77,7 +113,7 @@ async def read(self, keys: List[str]) -> Dict[str, object]:
77113
if keys:
78114
# create the parameters object
79115
parameters = [
80-
{'name': f'@id{i}', 'value': f'{self.__sanitize_key(key)}'}
116+
{'name': f'@id{i}', 'value': f'{CosmosDbKeyEscape.sanitize_key(key)}'}
81117
for i, key in enumerate(keys)
82118
]
83119
# get the names of the params
@@ -125,7 +161,7 @@ async def write(self, changes: Dict[str, StoreItem]):
125161
# store the e_tag
126162
e_tag = change.e_tag
127163
# create the new document
128-
doc = {'id': self.__sanitize_key(key),
164+
doc = {'id': CosmosDbKeyEscape.sanitize_key(key),
129165
'realId': key,
130166
'document': self.__create_dict(change)
131167
}
@@ -141,7 +177,7 @@ async def write(self, changes: Dict[str, StoreItem]):
141177
access_condition = {'type': 'IfMatch', 'condition': e_tag}
142178
self.client.ReplaceItem(
143179
document_link=self.__item_link(
144-
self.__sanitize_key(key)),
180+
CosmosDbKeyEscape.sanitize_key(key)),
145181
new_document=doc,
146182
options={'accessCondition': access_condition}
147183
)
@@ -169,7 +205,7 @@ async def delete(self, keys: List[str]):
169205
# call the function for each key
170206
for k in keys:
171207
self.client.DeleteItem(
172-
document_link=self.__item_link(self.__sanitize_key(k)),
208+
document_link=self.__item_link(CosmosDbKeyEscape.sanitize_key(k)),
173209
options=options
174210
)
175211
# print(res)
@@ -209,24 +245,6 @@ def __create_dict(self, si: StoreItem) -> Dict:
209245
return ({attr: getattr(si, attr)
210246
for attr in non_magic_attr})
211247

212-
def __sanitize_key(self, key) -> str:
213-
"""Return the sanitized key.
214-
215-
Replace characters that are not allowed in keys in Cosmos.
216-
217-
:param key:
218-
:return str:
219-
"""
220-
# forbidden characters
221-
bad_chars = ['\\', '?', '/', '#', '\t', '\n', '\r']
222-
# replace those with with '*' and the
223-
# Unicode code point of the character and return the new string
224-
return ''.join(
225-
map(
226-
lambda x: '*' + str(ord(x)) if x in bad_chars else x, key
227-
)
228-
)
229-
230248
def __item_link(self, id) -> str:
231249
"""Return the item link of a item in the container.
232250
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from unittest import TestCase
2+
from botbuilder.azure import CosmosDbKeyEscape
3+
4+
5+
class TestKeyValidation(TestCase):
6+
def test_should_not_change_a_valid_key(self):
7+
valid_key = 'Abc12345'
8+
sanitized_key = CosmosDbKeyEscape.sanitize_key(valid_key)
9+
assert valid_key == sanitized_key, f'{valid_key} should be equal to {sanitized_key}'
10+
11+
def test_should_escape_illegal_characters_case_1(self):
12+
# Ascii code of "?" is "3f"
13+
sanitized_key = CosmosDbKeyEscape.sanitize_key('?test?')
14+
assert sanitized_key == '*63test*63'
15+
16+
def test_should_escape_illegal_characters_case_2(self):
17+
# Ascii code of "/" is "2f"
18+
sanitized_key = CosmosDbKeyEscape.sanitize_key('/test/')
19+
assert sanitized_key == '*47test*47'
20+
21+
def test_should_escape_illegal_characters_case_3(self):
22+
# Ascii code of "\" is "5c"
23+
sanitized_key = CosmosDbKeyEscape.sanitize_key('\\test\\')
24+
assert sanitized_key == '*92test*92'
25+
26+
def test_should_escape_illegal_characters_case_4(self):
27+
# Ascii code of "#" is "23"
28+
sanitized_key = CosmosDbKeyEscape.sanitize_key('#test#')
29+
assert sanitized_key == '*35test*35'
30+
31+
def test_should_escape_illegal_characters_case_5(self):
32+
# Ascii code of "*" is "2a".
33+
sanitized_key = CosmosDbKeyEscape.sanitize_key('*test*')
34+
assert sanitized_key == '*42test*42'
35+
36+
def test_should_escape_illegal_characters_compound_key(self):
37+
# Check a compound key
38+
compoundsanitized_key = CosmosDbKeyEscape.sanitize_key('?#/')
39+
assert compoundsanitized_key, '*3f*23*2f'
40+
41+
def test_should_handle_possible_collisions(self):
42+
valid_key1 = '*2atest*2a'
43+
valid_key2 = '*test*'
44+
45+
escaped1 = CosmosDbKeyEscape.sanitize_key(valid_key1)
46+
escaped2 = CosmosDbKeyEscape.sanitize_key(valid_key2)
47+
48+
assert escaped1 != escaped2, f'{escaped1} should be different that {escaped2}'
49+
50+
def test_should_truncate_longer_keys(self):
51+
# create an extra long key
52+
# limit is 255
53+
long_key = 'x' * 300
54+
fixed = CosmosDbKeyEscape.sanitize_key(long_key)
55+
56+
assert len(fixed) <= 255, 'long key was not properly truncated'
57+
58+
def test_should_not_truncate_short_key(self):
59+
# create a short key
60+
short_key = 'x' * 16
61+
fixed2 = CosmosDbKeyEscape.sanitize_key(short_key)
62+
63+
assert len(fixed2) == 16, 'short key was truncated improperly'
64+
65+
def test_should_create_sufficiently_different_truncated_keys_of_similar_origin(self):
66+
# create 2 very similar extra long key where the difference will definitely be trimmed off by truncate function
67+
long_key = 'x' * 300 + "1"
68+
long_key2 = 'x' * 300 + "2"
69+
70+
fixed = CosmosDbKeyEscape.sanitize_key(long_key)
71+
fixed2 = CosmosDbKeyEscape.sanitize_key(long_key2)
72+
73+
assert len(fixed) != fixed2, 'key truncation failed to create unique key'
74+
75+
def test_should_properly_truncate_keys_with_special_chars(self):
76+
# create a short key
77+
long_key = '*' * 300
78+
fixed = CosmosDbKeyEscape.sanitize_key(long_key)
79+
80+
assert len(fixed) <= 255, 'long key with special char was truncated improperly'
81+
82+
# create a short key
83+
short_key = '#' * 16
84+
fixed2 = CosmosDbKeyEscape.sanitize_key(short_key)
85+
86+
assert len(fixed2) <= 255, 'short key with special char was truncated improperly'

0 commit comments

Comments
 (0)