Skip to content

Commit 7fd5d34

Browse files
k-jamrozyuce
andauthored
Add support for vector collection split brain configuration [AI-146] (hazelcast#713)
Co-authored-by: Yüce Tekol <yuce.tekol@hazelcast.com> Co-authored-by: Yüce Tekol <yucetekol@gmail.com>
1 parent e667147 commit 7fd5d34

File tree

3 files changed

+54
-4
lines changed

3 files changed

+54
-4
lines changed

hazelcast/client.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ def create_vector_collection_config(
384384
indexes: typing.List[IndexConfig],
385385
backup_count: int = 1,
386386
async_backup_count: int = 0,
387+
split_brain_protection_name: typing.Optional[str] = None,
388+
merge_policy: str = "PutIfAbsentMergePolicy",
389+
merge_batch_size: int = 100,
387390
) -> None:
388391
# check that indexes have different names
389392
if indexes:
@@ -392,7 +395,13 @@ def create_vector_collection_config(
392395
raise AssertionError("index names must be unique")
393396

394397
request = dynamic_config_add_vector_collection_config_codec.encode_request(
395-
name, indexes, backup_count, async_backup_count
398+
name,
399+
indexes,
400+
backup_count,
401+
async_backup_count,
402+
split_brain_protection_name,
403+
merge_policy,
404+
merge_batch_size,
396405
)
397406
invocation = Invocation(request, response_handler=lambda m: m)
398407
self._invocation_service.invoke(invocation)

hazelcast/protocol/codec/dynamic_config_add_vector_collection_config_codec.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from hazelcast.protocol.builtin import StringCodec
55
from hazelcast.protocol.builtin import ListMultiFrameCodec
66
from hazelcast.protocol.codec.custom.vector_index_config_codec import VectorIndexConfigCodec
7+
from hazelcast.protocol.builtin import CodecUtil
78

89
# hex: 0x1B1400
910
_REQUEST_MESSAGE_TYPE = 1774592
@@ -12,13 +13,17 @@
1213

1314
_REQUEST_BACKUP_COUNT_OFFSET = REQUEST_HEADER_SIZE
1415
_REQUEST_ASYNC_BACKUP_COUNT_OFFSET = _REQUEST_BACKUP_COUNT_OFFSET + INT_SIZE_IN_BYTES
15-
_REQUEST_INITIAL_FRAME_SIZE = _REQUEST_ASYNC_BACKUP_COUNT_OFFSET + INT_SIZE_IN_BYTES
16+
_REQUEST_MERGE_BATCH_SIZE_OFFSET = _REQUEST_ASYNC_BACKUP_COUNT_OFFSET + INT_SIZE_IN_BYTES
17+
_REQUEST_INITIAL_FRAME_SIZE = _REQUEST_MERGE_BATCH_SIZE_OFFSET + INT_SIZE_IN_BYTES
1618

1719

18-
def encode_request(name, index_configs, backup_count, async_backup_count):
20+
def encode_request(name, index_configs, backup_count, async_backup_count, split_brain_protection_name, merge_policy, merge_batch_size):
1921
buf = create_initial_buffer(_REQUEST_INITIAL_FRAME_SIZE, _REQUEST_MESSAGE_TYPE)
2022
FixSizedTypesCodec.encode_int(buf, _REQUEST_BACKUP_COUNT_OFFSET, backup_count)
2123
FixSizedTypesCodec.encode_int(buf, _REQUEST_ASYNC_BACKUP_COUNT_OFFSET, async_backup_count)
24+
FixSizedTypesCodec.encode_int(buf, _REQUEST_MERGE_BATCH_SIZE_OFFSET, merge_batch_size)
2225
StringCodec.encode(buf, name)
23-
ListMultiFrameCodec.encode(buf, index_configs, VectorIndexConfigCodec.encode, True)
26+
ListMultiFrameCodec.encode(buf, index_configs, VectorIndexConfigCodec.encode)
27+
CodecUtil.encode_nullable(buf, split_brain_protection_name, StringCodec.encode)
28+
StringCodec.encode(buf, merge_policy, True)
2429
return OutboundMessage(buf, False)

tests/integration/backward_compatible/proxy/vector_collection_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,42 @@ def test_sync_and_async_backup_count_more_than_max_value_fail(self):
272272
async_backup_count=3,
273273
)
274274

275+
def test_merge_policy_can_be_sent(self):
276+
skip_if_client_version_older_than(self, "6.0")
277+
name = random_string()
278+
self.client.create_vector_collection_config(
279+
name,
280+
[IndexConfig("vector", Metric.COSINE, 3)],
281+
merge_policy="DiscardMergePolicy",
282+
merge_batch_size=1000,
283+
)
284+
# validation happens when the collection proxy is created
285+
self.client.get_vector_collection(name)
286+
287+
def test_wrong_merge_policy_fails(self):
288+
skip_if_client_version_older_than(self, "6.0")
289+
skip_if_server_version_older_than(self, self.client, "6.0")
290+
name = random_string()
291+
with self.assertRaises(hazelcast.errors.InvalidConfigurationError):
292+
self.client.create_vector_collection_config(
293+
name, [IndexConfig("vector", Metric.COSINE, 3)], merge_policy="non-existent"
294+
)
295+
# validation happens when the collection proxy is created
296+
self.client.get_vector_collection(name)
297+
298+
def test_split_brain_name_can_be_sent(self):
299+
skip_if_client_version_older_than(self, "6.0")
300+
name = random_string()
301+
self.client.create_vector_collection_config(
302+
name,
303+
[IndexConfig("vector", Metric.COSINE, 3)],
304+
# wrong name will be ignored
305+
split_brain_protection_name="non-existent",
306+
)
307+
col = self.client.get_vector_collection(name)
308+
doc = Document("v1", Vector("vector", Type.DENSE, [0.1, 0.2, 0.3]))
309+
col.set("k1", doc)
310+
275311
def assert_document_equal(self, doc1, doc2) -> None:
276312
self.assertEqual(doc1.value, doc2.value)
277313
self.assertEqual(len(doc1.vectors), len(doc2.vectors))

0 commit comments

Comments
 (0)