Skip to content

Commit 11d001d

Browse files
authored
Asyncio Module VectorCollection Support [5/6] (#751)
Ported VectorCollection to asyncio client
1 parent bbd5a21 commit 11d001d

File tree

6 files changed

+640
-3
lines changed

6 files changed

+640
-3
lines changed

hazelcast/asyncio/client.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55

66
from hazelcast.internal.asyncio_cluster import ClusterService, _InternalClusterService
77
from hazelcast.internal.asyncio_compact import CompactSchemaService
8-
from hazelcast.config import Config
8+
from hazelcast.config import Config, IndexConfig
99
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAddressProvider
1010
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
1111
from hazelcast.cp import CPSubsystem, ProxySessionManager
1212
from hazelcast.discovery import HazelcastCloudAddressProvider
1313
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
1414
from hazelcast.internal.asyncio_invocation import InvocationService, Invocation
15+
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
1516
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
1617
from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService
1718
from hazelcast.near_cache import NearCacheManager
@@ -20,10 +21,12 @@
2021
client_add_distributed_object_listener_codec,
2122
client_get_distributed_objects_codec,
2223
client_remove_distributed_object_listener_codec,
24+
dynamic_config_add_vector_collection_config_codec,
2325
)
2426
from hazelcast.internal.asyncio_proxy.manager import (
2527
MAP_SERVICE,
2628
ProxyManager,
29+
VECTOR_SERVICE,
2730
)
2831
from hazelcast.internal.asyncio_proxy.base import Proxy
2932
from hazelcast.internal.asyncio_proxy.map import Map
@@ -185,6 +188,37 @@ async def _start(self):
185188
async def get_map(self, name: str) -> Map[KeyType, ValueType]:
186189
return await self._proxy_manager.get_or_create(MAP_SERVICE, name)
187190

191+
async def create_vector_collection_config(
192+
self,
193+
name: str,
194+
indexes: typing.List[IndexConfig],
195+
backup_count: int = 1,
196+
async_backup_count: int = 0,
197+
split_brain_protection_name: typing.Optional[str] = None,
198+
merge_policy: str = "PutIfAbsentMergePolicy",
199+
merge_batch_size: int = 100,
200+
) -> None:
201+
# check that indexes have different names
202+
if indexes:
203+
index_names = set(index.name for index in indexes)
204+
if len(index_names) != len(indexes):
205+
raise AssertionError("index names must be unique")
206+
207+
request = dynamic_config_add_vector_collection_config_codec.encode_request(
208+
name,
209+
indexes,
210+
backup_count,
211+
async_backup_count,
212+
split_brain_protection_name,
213+
merge_policy,
214+
merge_batch_size,
215+
)
216+
invocation = Invocation(request, response_handler=lambda m: m)
217+
await self._invocation_service.ainvoke(invocation)
218+
219+
async def get_vector_collection(self, name: str) -> VectorCollection:
220+
return await self._proxy_manager.get_or_create(VECTOR_SERVICE, name)
221+
188222
async def add_distributed_object_listener(
189223
self, listener_func: typing.Callable[[DistributedObjectEvent], None]
190224
) -> str:

hazelcast/internal/asyncio_proxy/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@ async def destroy(self) -> bool:
4141
``True`` if this proxy is destroyed successfully, ``False``
4242
otherwise.
4343
"""
44+
destroyed = False
4445
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
4546
tg.create_task(self._on_destroy())
46-
return await tg.create_task(
47+
destroyed = await tg.create_task(
4748
self._context.proxy_manager.destroy_proxy(self.service_name, self.name)
4849
)
50+
return destroyed
4951

5052
async def _on_destroy(self):
5153
pass

hazelcast/internal/asyncio_proxy/manager.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
import typing
22

3+
from hazelcast.internal.asyncio_proxy.vector_collection import (
4+
VectorCollection,
5+
create_vector_collection_proxy,
6+
)
37
from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec
48
from hazelcast.internal.asyncio_invocation import Invocation
59
from hazelcast.internal.asyncio_proxy.base import Proxy
610
from hazelcast.internal.asyncio_proxy.map import create_map_proxy
711
from hazelcast.util import to_list
812

913
MAP_SERVICE = "hz:impl:mapService"
14+
VECTOR_SERVICE = "hz:service:vector"
1015

1116
_proxy_init: typing.Dict[
1217
str,
1318
typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]],
1419
] = {
1520
MAP_SERVICE: create_map_proxy,
21+
VECTOR_SERVICE: create_vector_collection_proxy,
1622
}
1723

1824

hazelcast/internal/asyncio_proxy/map.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from hazelcast.aggregator import Aggregator
66
from hazelcast.config import IndexUtil, IndexType, IndexConfig
77
from hazelcast.core import SimpleEntryView
8-
from hazelcast.errors import InvalidConfigurationError
98
from hazelcast.projection import Projection
109
from hazelcast.protocol import PagingPredicateHolder
1110
from hazelcast.protocol.codec import (
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
import asyncio
2+
import copy
3+
import typing
4+
import uuid
5+
from typing import Any, Dict, List, Optional, Tuple
6+
7+
from hazelcast.protocol.codec import (
8+
vector_collection_set_codec,
9+
vector_collection_get_codec,
10+
vector_collection_search_near_vector_codec,
11+
vector_collection_delete_codec,
12+
vector_collection_put_codec,
13+
vector_collection_put_if_absent_codec,
14+
vector_collection_remove_codec,
15+
vector_collection_put_all_codec,
16+
vector_collection_clear_codec,
17+
vector_collection_optimize_codec,
18+
vector_collection_size_codec,
19+
)
20+
from hazelcast.internal.asyncio_proxy.base import Proxy
21+
from hazelcast.serialization.compact import SchemaNotReplicatedError
22+
from hazelcast.serialization.data import Data
23+
from hazelcast.types import KeyType, ValueType
24+
from hazelcast.util import check_not_none
25+
from hazelcast.vector import (
26+
Document,
27+
SearchResult,
28+
Vector,
29+
VectorType,
30+
VectorSearchOptions,
31+
)
32+
33+
34+
class VectorCollection(Proxy, typing.Generic[KeyType, ValueType]):
35+
def __init__(self, service_name, name, context):
36+
super(VectorCollection, self).__init__(service_name, name, context)
37+
38+
async def get(self, key: Any) -> Document | None:
39+
check_not_none(key, "key can't be None")
40+
return await self._get_internal(key)
41+
42+
async def set(self, key: Any, document: Document) -> None:
43+
check_not_none(key, "key can't be None")
44+
check_not_none(document, "document can't be None")
45+
check_not_none(document.value, "document value can't be None")
46+
return await self._set_internal(key, document)
47+
48+
async def put(self, key: Any, document: Document) -> Document | None:
49+
check_not_none(key, "key can't be None")
50+
check_not_none(document, "document can't be None")
51+
check_not_none(document.value, "document value can't be None")
52+
return await self._put_internal(key, document)
53+
54+
async def put_all(self, map: Dict[Any, Document]) -> None:
55+
check_not_none(map, "map can't be None")
56+
if not map:
57+
return None
58+
partition_service = self._context.partition_service
59+
partition_map: Dict[int, List[Tuple[Data, Document]]] = {}
60+
for key, doc in map.items():
61+
check_not_none(key, "key can't be None")
62+
check_not_none(doc, "value can't be None")
63+
doc = copy.copy(doc)
64+
try:
65+
entry = (self._to_data(key), doc)
66+
doc.value = self._to_data(doc.value)
67+
except SchemaNotReplicatedError as e:
68+
return await self._send_schema_and_retry(e, self.put_all, map)
69+
70+
partition_id = partition_service.get_partition_id(entry[0])
71+
partition_map.setdefault(partition_id, []).append(entry)
72+
73+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
74+
for partition_id, entry_list in partition_map.items():
75+
request = vector_collection_put_all_codec.encode_request(self.name, entry_list)
76+
tg.create_task(self._ainvoke_on_partition(request, partition_id))
77+
78+
return None
79+
80+
async def put_if_absent(self, key: Any, document: Document) -> Document | None:
81+
check_not_none(key, "key can't be None")
82+
check_not_none(document, "document can't be None")
83+
check_not_none(document.value, "document value can't be None")
84+
return await self._put_if_absent_internal(key, document)
85+
86+
async def search_near_vector(
87+
self,
88+
vector: Vector,
89+
*,
90+
include_value: bool = False,
91+
include_vectors: bool = False,
92+
limit: int = 10,
93+
hints: Dict[str, str] = None
94+
) -> List[SearchResult]:
95+
check_not_none(vector, "vector can't be None")
96+
if limit <= 0:
97+
raise AssertionError("limit must be positive")
98+
return await self._search_near_vector_internal(
99+
vector,
100+
include_value=include_value,
101+
include_vectors=include_vectors,
102+
limit=limit,
103+
hints=hints,
104+
)
105+
106+
async def remove(self, key: Any) -> Document | None:
107+
check_not_none(key, "key can't be None")
108+
return await self._remove_internal(key)
109+
110+
async def delete(self, key: Any) -> None:
111+
check_not_none(key, "key can't be None")
112+
return await self._delete_internal(key)
113+
114+
async def optimize(self, index_name: str = None) -> None:
115+
request = vector_collection_optimize_codec.encode_request(
116+
self.name, index_name, uuid.uuid4()
117+
)
118+
return await self._invoke(request)
119+
120+
async def clear(self) -> None:
121+
request = vector_collection_clear_codec.encode_request(self.name)
122+
return await self._invoke(request)
123+
124+
async def size(self) -> int:
125+
request = vector_collection_size_codec.encode_request(self.name)
126+
return await self._invoke(request, vector_collection_size_codec.decode_response)
127+
128+
def _set_internal(self, key: Any, document: Document) -> asyncio.Future[None]:
129+
try:
130+
key_data = self._to_data(key)
131+
value_data = self._to_data(document.value)
132+
except SchemaNotReplicatedError as e:
133+
return self._send_schema_and_retry(e, self.set, key, document)
134+
document = copy.copy(document)
135+
document.value = value_data
136+
request = vector_collection_set_codec.encode_request(
137+
self.name,
138+
key_data,
139+
document,
140+
)
141+
return self._invoke_on_key(request, key_data)
142+
143+
def _get_internal(self, key: Any) -> asyncio.Future[Any]:
144+
def handler(message):
145+
doc = vector_collection_get_codec.decode_response(message)
146+
return self._transform_document(doc)
147+
148+
try:
149+
key_data = self._to_data(key)
150+
except SchemaNotReplicatedError as e:
151+
return self._send_schema_and_retry(e, self.get, key)
152+
request = vector_collection_get_codec.encode_request(
153+
self.name,
154+
key_data,
155+
)
156+
return self._invoke_on_key(request, key_data, response_handler=handler)
157+
158+
def _search_near_vector_internal(
159+
self,
160+
vector: Vector,
161+
*,
162+
include_value: bool = False,
163+
include_vectors: bool = False,
164+
limit: int = 10,
165+
hints: Dict[str, str] = None
166+
) -> asyncio.Future[List[SearchResult]]:
167+
def handler(message):
168+
results: List[
169+
SearchResult
170+
] = vector_collection_search_near_vector_codec.decode_response(message)
171+
for result in results:
172+
if result.key is not None:
173+
result.key = self._to_object(result.key)
174+
if result.value is not None:
175+
result.value = self._to_object(result.value)
176+
if result.vectors:
177+
for vec in result.vectors:
178+
vec.type = VectorType(vec.type)
179+
return results
180+
181+
options = VectorSearchOptions(
182+
include_value=include_value,
183+
include_vectors=include_vectors,
184+
limit=limit,
185+
hints=hints or {},
186+
)
187+
request = vector_collection_search_near_vector_codec.encode_request(
188+
self.name,
189+
[vector],
190+
options,
191+
)
192+
return self._invoke(request, response_handler=handler)
193+
194+
def _delete_internal(self, key: Any) -> asyncio.Future[None]:
195+
key_data = self._to_data(key)
196+
request = vector_collection_delete_codec.encode_request(self.name, key_data)
197+
return self._invoke_on_key(request, key_data)
198+
199+
def _remove_internal(self, key: Any) -> asyncio.Future[Document | None]:
200+
def handler(message):
201+
doc = vector_collection_remove_codec.decode_response(message)
202+
return self._transform_document(doc)
203+
204+
key_data = self._to_data(key)
205+
request = vector_collection_remove_codec.encode_request(self.name, key_data)
206+
return self._invoke_on_key(request, key_data, response_handler=handler)
207+
208+
def _put_internal(self, key: Any, document: Document) -> asyncio.Future[Document | None]:
209+
def handler(message):
210+
doc = vector_collection_put_codec.decode_response(message)
211+
return self._transform_document(doc)
212+
213+
try:
214+
key_data = self._to_data(key)
215+
value_data = self._to_data(document.value)
216+
except SchemaNotReplicatedError as e:
217+
return self._send_schema_and_retry(e, self.set, key, document)
218+
document = copy.copy(document)
219+
document.value = value_data
220+
request = vector_collection_put_codec.encode_request(
221+
self.name,
222+
key_data,
223+
document,
224+
)
225+
return self._invoke_on_key(request, key_data, response_handler=handler)
226+
227+
def _put_if_absent_internal(
228+
self, key: Any, document: Document
229+
) -> asyncio.Future[Document | None]:
230+
def handler(message):
231+
doc = vector_collection_put_if_absent_codec.decode_response(message)
232+
return self._transform_document(doc)
233+
234+
try:
235+
key_data = self._to_data(key)
236+
value_data = self._to_data(document.value)
237+
except SchemaNotReplicatedError as e:
238+
return self._send_schema_and_retry(e, self.set, key, document)
239+
document.value = value_data
240+
request = vector_collection_put_if_absent_codec.encode_request(
241+
self.name,
242+
key_data,
243+
document,
244+
)
245+
return self._invoke_on_key(request, key_data, response_handler=handler)
246+
247+
def _transform_document(self, doc: Optional[Document]) -> Optional[Document]:
248+
if doc is not None:
249+
if doc.value is not None:
250+
doc.value = self._to_object(doc.value)
251+
for vec in doc.vectors:
252+
vec.type = VectorType(vec.type)
253+
return doc
254+
255+
256+
async def create_vector_collection_proxy(service_name, name, context):
257+
return VectorCollection(service_name, name, context)

0 commit comments

Comments
 (0)