Skip to content

Commit 80119dd

Browse files
authored
Asyncio Module Compact Serialization [6/6] (#752)
1 parent 11d001d commit 80119dd

File tree

14 files changed

+1822
-122
lines changed

14 files changed

+1822
-122
lines changed

hazelcast/asyncio/client.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@
88
from hazelcast.config import Config, IndexConfig
99
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAddressProvider
1010
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
11-
from hazelcast.cp import CPSubsystem, ProxySessionManager
1211
from hazelcast.discovery import HazelcastCloudAddressProvider
1312
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
1413
from hazelcast.internal.asyncio_invocation import InvocationService, Invocation
1514
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
1615
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
1716
from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService
1817
from hazelcast.near_cache import NearCacheManager
19-
from hazelcast.partition import PartitionService, _InternalPartitionService
18+
from hazelcast.internal.asyncio_partition import PartitionService, InternalPartitionService
2019
from hazelcast.protocol.codec import (
2120
client_add_distributed_object_listener_codec,
2221
client_get_distributed_objects_codec,
@@ -34,7 +33,7 @@
3433
from hazelcast.serialization import SerializationServiceV1
3534
from hazelcast.sql import SqlService, _InternalSqlService
3635
from hazelcast.internal.asyncio_statistics import Statistics
37-
from hazelcast.types import KeyType, ValueType, ItemType, MessageType
36+
from hazelcast.types import KeyType, ValueType
3837
from hazelcast.util import AtomicInteger, RoundRobinLB
3938

4039
__all__ = ("HazelcastClient",)
@@ -84,7 +83,7 @@ def __init__(self, config: Config | None = None, **kwargs):
8483
self._config,
8584
)
8685
self._address_provider = self._create_address_provider()
87-
self._internal_partition_service = _InternalPartitionService(self)
86+
self._internal_partition_service = InternalPartitionService(self)
8887
self._partition_service = PartitionService(
8988
self._internal_partition_service,
9089
self._serialization_service,
@@ -111,8 +110,6 @@ def __init__(self, config: Config | None = None, **kwargs):
111110
self._compact_schema_service,
112111
)
113112
self._proxy_manager = ProxyManager(self._context)
114-
self._cp_subsystem = CPSubsystem(self._context)
115-
self._proxy_session_manager = ProxySessionManager(self._context)
116113
self._lock_reference_id_generator = AtomicInteger(1)
117114
self._statistics = Statistics(
118115
self,
@@ -159,15 +156,14 @@ def _init_context(self):
159156
self._near_cache_manager,
160157
self._lock_reference_id_generator,
161158
self._name,
162-
self._proxy_session_manager,
163159
self._reactor,
164160
self._compact_schema_service,
165161
)
166162

167163
async def _start(self):
168164
try:
169165
self._internal_lifecycle_service.start()
170-
self._invocation_service.start()
166+
await self._invocation_service.start()
171167
membership_listeners = self._config.membership_listeners
172168
self._internal_cluster_service.start(self._connection_manager, membership_listeners)
173169
self._cluster_view_listener.start()
@@ -278,7 +274,6 @@ async def shutdown(self) -> None:
278274
if self._internal_lifecycle_service.running:
279275
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN)
280276
self._internal_lifecycle_service.shutdown()
281-
self._proxy_session_manager.shutdown().result()
282277
self._near_cache_manager.destroy_near_caches()
283278
await self._connection_manager.shutdown()
284279
self._invocation_service.shutdown()
@@ -301,10 +296,6 @@ def partition_service(self) -> PartitionService:
301296
def cluster_service(self) -> ClusterService:
302297
return self._cluster_service
303298

304-
@property
305-
def cp_subsystem(self) -> CPSubsystem:
306-
return self._cp_subsystem
307-
308299
def _create_address_provider(self):
309300
config = self._config
310301
cluster_members = config.cluster_members
@@ -360,7 +351,6 @@ def __init__(self):
360351
self.near_cache_manager = None
361352
self.lock_reference_id_generator = None
362353
self.name = None
363-
self.proxy_session_manager = None
364354
self.reactor = None
365355
self.compact_schema_service = None
366356

@@ -378,7 +368,6 @@ def init_context(
378368
near_cache_manager,
379369
lock_reference_id_generator,
380370
name,
381-
proxy_session_manager,
382371
reactor,
383372
compact_schema_service,
384373
):
@@ -394,6 +383,5 @@ def init_context(
394383
self.near_cache_manager = near_cache_manager
395384
self.lock_reference_id_generator = lock_reference_id_generator
396385
self.name = name
397-
self.proxy_session_manager = proxy_session_manager
398386
self.reactor = reactor
399387
self.compact_schema_service = compact_schema_service

hazelcast/internal/asyncio_compact.py

Lines changed: 29 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -57,77 +57,64 @@ def fetch_schema(self, schema_id: int) -> asyncio.Future:
5757
self._invocation_service.invoke(fetch_schema_invocation)
5858
return fetch_schema_invocation.future
5959

60-
def send_schema_and_retry(
60+
async def send_schema_and_retry(
6161
self,
6262
error: "SchemaNotReplicatedError",
6363
func: typing.Callable[..., asyncio.Future],
6464
*args: typing.Any,
6565
**kwargs: typing.Any,
66-
) -> asyncio.Future:
66+
) -> None:
6767
schema = error.schema
6868
clazz = error.clazz
6969
request = client_send_schema_codec.encode_request(schema)
7070

71-
def callback():
71+
async def callback():
7272
self._has_replicated_schemas = True
7373
self._compact_serializer.register_schema_to_type(schema, clazz)
74-
return func(*args, **kwargs)
74+
maybe_coro = func(*args, **kwargs)
75+
# maybe_coro maybe a coroutine or None
76+
if maybe_coro:
77+
return await maybe_coro
7578

76-
return self._replicate_schema(
77-
schema, request, CompactSchemaService._SEND_SCHEMA_RETRY_COUNT, callback
79+
return await self._replicate_schema(
80+
schema, request, CompactSchemaService._SEND_SCHEMA_RETRY_COUNT, callback()
7881
)
7982

80-
def _replicate_schema(
83+
async def _replicate_schema(
8184
self,
8285
schema: "Schema",
8386
request: "OutboundMessage",
8487
remaining_retries: int,
85-
callback: typing.Callable[..., asyncio.Future],
86-
) -> asyncio.Future:
87-
def continuation(future: asyncio.Future):
88-
replicated_members = future.result()
88+
callback: typing.Coroutine[typing.Any, typing.Any, typing.Any],
89+
) -> None:
90+
while remaining_retries >= 2:
91+
replicated_members = await self._send_schema_replication_request(request)
8992
members = self._cluster_service.get_members()
9093
for member in members:
9194
if member.uuid not in replicated_members:
9295
break
9396
else:
9497
# Loop completed normally.
9598
# All members in our member list all known to have the schema
96-
return callback()
99+
return await callback
97100

98101
# There is a member in our member list that the schema
99102
# is not known to be replicated yet. We should retry
100103
# sending it in a random member.
101-
if remaining_retries <= 1:
102-
# We tried to send it a couple of times, but the member list
103-
# in our local and the member list returned by the initiator
104-
# nodes did not match.
105-
raise IllegalStateError(
106-
f"The schema {schema} cannot be replicated in the cluster, "
107-
f"after {CompactSchemaService._SEND_SCHEMA_RETRY_COUNT} retries. "
108-
f"It might be the case that the client is connected to the two "
109-
f"halves of the cluster that is experiencing a split-brain, "
110-
f"and continue putting the data associated with that schema "
111-
f"might result in data loss. It might be possible to replicate "
112-
f"the schema after some time, when the cluster is healed."
113-
)
114-
115-
delayed_future: asyncio.Future = asyncio.get_running_loop().create_future()
116-
self._reactor.add_timer(
117-
self._invocation_retry_pause,
118-
lambda: delayed_future.set_result(None),
119-
)
120-
121-
def retry(_):
122-
return self._replicate_schema(
123-
schema, request.copy(), remaining_retries - 1, callback
124-
)
125-
126-
return delayed_future.add_done_callback(retry)
127-
128-
fut = self._send_schema_replication_request(request)
129-
fut.add_done_callback(continuation)
130-
return fut
104+
await asyncio.sleep(self._invocation_retry_pause)
105+
106+
# We tried to send it a couple of times, but the member list
107+
# in our local and the member list returned by the initiator
108+
# nodes did not match.
109+
raise IllegalStateError(
110+
f"The schema {schema} cannot be replicated in the cluster, "
111+
f"after {CompactSchemaService._SEND_SCHEMA_RETRY_COUNT} retries. "
112+
f"It might be the case that the client is connected to the two "
113+
f"halves of the cluster that is experiencing a split-brain, "
114+
f"and continue putting the data associated with that schema "
115+
f"might result in data loss. It might be possible to replicate "
116+
f"the schema after some time, when the cluster is healed."
117+
)
131118

132119
def _send_schema_replication_request(self, request: "OutboundMessage") -> asyncio.Future:
133120
invocation = Invocation(request, response_handler=client_send_schema_codec.decode_response)

hazelcast/internal/asyncio_invocation.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def __init__(self, client, config, reactor):
9696
self._backup_ack_to_client_enabled = smart_routing and config.backup_ack_to_client_enabled
9797
self._fail_on_indeterminate_state = config.fail_on_indeterminate_operation_state
9898
self._backup_timeout = config.operation_backup_timeout
99-
self._clean_resources_timer = None
99+
self._clean_resources_task = None
100100
self._shutdown = False
101101
self._compact_schema_service = None
102102

@@ -107,8 +107,8 @@ def init(self, partition_service, connection_manager, listener_service, compact_
107107
self._check_invocation_allowed_fn = connection_manager.check_invocation_allowed
108108
self._compact_schema_service = compact_schema_service
109109

110-
def start(self):
111-
self._start_clean_resources_timer()
110+
async def start(self):
111+
await self._start_clean_resources_timer()
112112

113113
async def add_backup_listener(self):
114114
if self._backup_ack_to_client_enabled:
@@ -152,8 +152,8 @@ def shutdown(self):
152152
return
153153

154154
self._shutdown = True
155-
if self._clean_resources_timer:
156-
self._clean_resources_timer.cancel()
155+
if self._clean_resources_task:
156+
self._clean_resources_task.cancel()
157157
for invocation in list(self._pending.values()):
158158
self._notify_error(invocation, HazelcastClientNotActiveError())
159159

@@ -400,8 +400,9 @@ def _notify_backup_complete(self, invocation):
400400

401401
self._complete(invocation, invocation.pending_response)
402402

403-
def _start_clean_resources_timer(self):
404-
def run():
403+
async def _start_clean_resources_timer(self):
404+
async def run():
405+
await asyncio.sleep(self._CLEAN_RESOURCES_PERIOD)
405406
if self._shutdown:
406407
return
407408

@@ -419,9 +420,9 @@ def run():
419420
if self._backup_ack_to_client_enabled:
420421
self._detect_and_handle_backup_timeout(invocation, now)
421422

422-
self._clean_resources_timer = self._reactor.add_timer(self._CLEAN_RESOURCES_PERIOD, run)
423+
self._clean_resources_task = asyncio.create_task(run())
423424

424-
self._clean_resources_timer = self._reactor.add_timer(self._CLEAN_RESOURCES_PERIOD, run)
425+
self._clean_resources_task = asyncio.create_task(run())
425426

426427
def _detect_and_handle_backup_timeout(self, invocation, now):
427428
if not invocation.pending_response:

0 commit comments

Comments
 (0)