Skip to content

Commit 19d9b82

Browse files
committed
Do not invoke urgent invocations that contain serialized data immediately
Although we believe it was a mistake, the urgent invocations on the client-side are used for some user invocations like listener registrations, apart from the actual urgent invocations like heartbeats, authentication, etc. When the client reconnects to some cluster, it sends the local state in some executor. So, there might be some time between the local state is sent. However, during that time, urgent invocations are allowed to go through from the client. That might violate our assumption that the schema is received by the cluster before the data. To solve this, we will not invoke urgent invocations that contain serialized data if the client is not initialized on the cluster, and there were some compact schemas sent to the previous clusters. Such invocations are only related to user invocations so we will not delay invoking actual urgent invocations like heartbeats.
1 parent 7923483 commit 19d9b82

File tree

143 files changed

+275
-150
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

143 files changed

+275
-150
lines changed

hazelcast/compact.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(
4141
self._cluster_service = cluster_service
4242
self._reactor = reactor
4343
self._invocation_retry_pause = config.invocation_retry_pause
44+
self._has_replicated_schemas = False
4445

4546
def fetch_schema(self, schema_id: int) -> Future:
4647
_logger.debug(
@@ -68,6 +69,7 @@ def send_schema_and_retry(
6869
request = client_send_schema_codec.encode_request(schema)
6970

7071
def callback():
72+
self._has_replicated_schemas = True
7173
self._compact_serializer.register_schema_to_type(schema, clazz)
7274
return func(*args, **kwargs)
7375

@@ -150,3 +152,10 @@ def register_fetched_schema(self, schema_id: int, schema: typing.Optional["Schem
150152
)
151153

152154
self._compact_serializer.register_schema_to_id(schema)
155+
156+
def has_replicated_schemas(self):
157+
"""
158+
Returns ``True`` is the client has replicated
159+
any Compact schemas to the cluster.
160+
"""
161+
return self._has_replicated_schemas

hazelcast/connection.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,13 @@ def check_invocation_allowed(self):
366366
else:
367367
raise IOError("No connection found to cluster")
368368

369+
def initialized_on_cluster(self) -> bool:
370+
"""
371+
Returns ``True`` if the client is initialized on the cluster, by
372+
sending its local state, if necessary.
373+
"""
374+
return self._client_state == _ClientState.INITIALIZED_ON_CLUSTER
375+
369376
def _get_or_connect_to_address(self, address):
370377
for connection in list(self.active_connections.values()):
371378
if connection.remote_address == address:

hazelcast/errors.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,22 @@ class NotLeaderError(HazelcastError):
436436
pass
437437

438438

439+
class InvocationMightContainCompactDataError(HazelcastError):
440+
"""
441+
Signals that the invocation might contain Compact serialized data,
442+
and it would not be safe to send that invocation now to make sure
443+
that the invariant regarding not sending the data before the schemas
444+
are hold while the client reconnects or retries urgent invocations.
445+
"""
446+
447+
def __init__(self):
448+
super(InvocationMightContainCompactDataError, self).__init__(
449+
"The invocation might contain Compact serialized "
450+
"data and it is not safe to invoke it when the client "
451+
"is not yet initialized on the cluster."
452+
)
453+
454+
439455
# Error Codes
440456
_UNDEFINED = 0
441457
_ARRAY_INDEX_OUT_OF_BOUNDS = 1

hazelcast/invocation.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
EXCEPTION_MESSAGE_TYPE,
1313
IndeterminateOperationStateError,
1414
OperationTimeoutError,
15+
InvocationMightContainCompactDataError,
1516
)
1617
from hazelcast.future import Future
1718
from hazelcast.protocol.client_message import InboundMessage
@@ -175,7 +176,9 @@ def _invoke_on_random_connection(self, invocation):
175176

176177
def _invoke_smart(self, invocation):
177178
try:
178-
if not invocation.urgent:
179+
if invocation.urgent:
180+
self._check_urgent_invocation_allowed(invocation)
181+
else:
179182
self._check_invocation_allowed_fn()
180183

181184
connection = invocation.connection
@@ -204,7 +207,9 @@ def _invoke_smart(self, invocation):
204207

205208
def _invoke_non_smart(self, invocation):
206209
try:
207-
if not invocation.urgent:
210+
if invocation.urgent:
211+
self._check_urgent_invocation_allowed(invocation)
212+
else:
208213
self._check_invocation_allowed_fn()
209214

210215
connection = invocation.connection
@@ -309,6 +314,9 @@ def _retry_if_not_done(self, invocation):
309314
self._do_invoke(invocation)
310315

311316
def _should_retry(self, invocation, error):
317+
if isinstance(error, InvocationMightContainCompactDataError):
318+
return True
319+
312320
if invocation.connection and isinstance(error, (IOError, TargetDisconnectedError)):
313321
return False
314322

@@ -325,6 +333,32 @@ def _should_retry(self, invocation, error):
325333

326334
return False
327335

336+
def _check_urgent_invocation_allowed(self, invocation: Invocation):
337+
if self._connection_manager.initialized_on_cluster():
338+
# If the client is initialized on the cluster, that means we
339+
# have sent all the schemas to the cluster, even if we are
340+
# reconnected to it
341+
return
342+
343+
if not self._compact_schema_service.has_replicated_schemas():
344+
# If there were no Compact schemas to begin with, we don't need
345+
# to perform the check below. If the client didn't send a Compact
346+
# schema up until this point, the retries or listener registrations
347+
# could not send a schema, because if they were, we wouldn't hit
348+
# this line.
349+
return
350+
351+
# We are not yet initialized on cluster, so the Compact schemas might
352+
# not be sent yet. This message contains some serialized data,
353+
# and it is possible that it can also contain Compact serialized data.
354+
# In that case, allowing this invocation to go through now could
355+
# violate the invariant that the schema must come to cluster before
356+
# the data. We will retry this invocation and wait until the client
357+
# is initialized on the cluster, which means schemas are replicated
358+
# in the cluster.
359+
if invocation.request.contains_data:
360+
raise InvocationMightContainCompactDataError()
361+
328362
def _register_backup_listener(self):
329363
codec = client_local_backup_listener_codec
330364
request = codec.encode_request()

hazelcast/protocol/client_message.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,12 @@ def create_initial_buffer_custom(size, is_begin_frame=False):
6464

6565

6666
class OutboundMessage:
67-
__slots__ = ("buf", "retryable")
67+
__slots__ = ("buf", "retryable", "contains_data")
6868

69-
def __init__(self, buf, retryable):
69+
def __init__(self, buf, retryable, contains_data=False):
7070
self.buf = buf
7171
self.retryable = retryable
72+
self.contains_data = contains_data
7273

7374
def set_correlation_id(self, correlation_id):
7475
LE_LONG.pack_into(self.buf, _OUTBOUND_MESSAGE_CORRELATION_ID_OFFSET, correlation_id)
@@ -80,7 +81,7 @@ def set_partition_id(self, partition_id):
8081
LE_INT.pack_into(self.buf, _OUTBOUND_MESSAGE_PARTITION_ID_OFFSET, partition_id)
8182

8283
def copy(self):
83-
return OutboundMessage(bytearray(self.buf), self.retryable)
84+
return OutboundMessage(bytearray(self.buf), self.retryable, self.contains_data)
8485

8586
def set_backup_aware_flag(self):
8687
flags = LE_UINT16.unpack_from(self.buf, INT_SIZE_IN_BYTES)[0]

hazelcast/protocol/codec/atomic_long_alter_codec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def encode_request(group_id, name, function, return_value_type):
2121
RaftGroupIdCodec.encode(buf, group_id)
2222
StringCodec.encode(buf, name)
2323
DataCodec.encode(buf, function, True)
24-
return OutboundMessage(buf, False)
24+
return OutboundMessage(buf, False, True)
2525

2626

2727
def decode_response(msg):

hazelcast/protocol/codec/atomic_long_apply_codec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def encode_request(group_id, name, function):
1717
RaftGroupIdCodec.encode(buf, group_id)
1818
StringCodec.encode(buf, name)
1919
DataCodec.encode(buf, function, True)
20-
return OutboundMessage(buf, False)
20+
return OutboundMessage(buf, False, True)
2121

2222

2323
def decode_response(msg):

hazelcast/protocol/codec/atomic_ref_apply_codec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def encode_request(group_id, name, function, return_value_type, alter):
2323
RaftGroupIdCodec.encode(buf, group_id)
2424
StringCodec.encode(buf, name)
2525
DataCodec.encode(buf, function, True)
26-
return OutboundMessage(buf, False)
26+
return OutboundMessage(buf, False, True)
2727

2828

2929
def decode_response(msg):

hazelcast/protocol/codec/atomic_ref_compare_and_set_codec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def encode_request(group_id, name, old_value, new_value):
2020
StringCodec.encode(buf, name)
2121
CodecUtil.encode_nullable(buf, old_value, DataCodec.encode)
2222
CodecUtil.encode_nullable(buf, new_value, DataCodec.encode, True)
23-
return OutboundMessage(buf, False)
23+
return OutboundMessage(buf, False, True)
2424

2525

2626
def decode_response(msg):

hazelcast/protocol/codec/atomic_ref_contains_codec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def encode_request(group_id, name, value):
1919
RaftGroupIdCodec.encode(buf, group_id)
2020
StringCodec.encode(buf, name)
2121
CodecUtil.encode_nullable(buf, value, DataCodec.encode, True)
22-
return OutboundMessage(buf, True)
22+
return OutboundMessage(buf, True, True)
2323

2424

2525
def decode_response(msg):

0 commit comments

Comments
 (0)