Skip to content

Commit 788c9c3

Browse files
authored
Use the same correlation id on invocation retries (hazelcast#269)
We were using a new correlation id on invocation retries, which can cause problems because the invocation is modified in-place. We may have pending invocation entries with the key of correlation id of X while the correlation id we would get from the client message of the invocation is Y (Y > X). It is changed so that the invocation is retried but now uses the same correlation id.
1 parent 9b3a221 commit 788c9c3

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

hazelcast/invocation.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ def __init__(self, client, reactor):
5858
config = client.config
5959
smart_routing = config.smart_routing
6060
if smart_routing:
61-
self.invoke = self._invoke_smart
61+
self._do_invoke = self._invoke_smart
6262
else:
63-
self.invoke = self._invoke_non_smart
63+
self._do_invoke = self._invoke_non_smart
6464

6565
self._client = client
6666
self._reactor = reactor
@@ -109,6 +109,16 @@ def handle_client_message(self, message):
109109

110110
self._notify(invocation, message)
111111

112+
def invoke(self, invocation):
113+
if not invocation.timeout:
114+
invocation.timeout = self._invocation_timeout + time.time()
115+
116+
correlation_id = self._next_correlation_id.get_and_increment()
117+
request = invocation.request
118+
request.set_correlation_id(correlation_id)
119+
request.set_partition_id(invocation.partition_id)
120+
self._do_invoke(invocation)
121+
112122
def shutdown(self):
113123
if self._shutdown:
114124
return
@@ -141,9 +151,6 @@ def _invoke_on_random_connection(self, invocation):
141151
return self._send(invocation, connection)
142152

143153
def _invoke_smart(self, invocation):
144-
if not invocation.timeout:
145-
invocation.timeout = self._invocation_timeout + time.time()
146-
147154
try:
148155
if not invocation.urgent:
149156
self._check_invocation_allowed_fn()
@@ -171,9 +178,6 @@ def _invoke_smart(self, invocation):
171178
self._notify_error(invocation, e)
172179

173180
def _invoke_non_smart(self, invocation):
174-
if not invocation.timeout:
175-
invocation.timeout = self._invocation_timeout + time.time()
176-
177181
try:
178182
if not invocation.urgent:
179183
self._check_invocation_allowed_fn()
@@ -197,10 +201,8 @@ def _send(self, invocation, connection):
197201
if self._backup_ack_to_client_enabled:
198202
invocation.request.set_backup_aware_flag()
199203

200-
correlation_id = self._next_correlation_id.get_and_increment()
201204
message = invocation.request
202-
message.set_correlation_id(correlation_id)
203-
message.set_partition_id(invocation.partition_id)
205+
correlation_id = message.get_correlation_id()
204206
self._pending[correlation_id] = invocation
205207

206208
if invocation.event_handler:
@@ -232,7 +234,7 @@ def _notify_error(self, invocation, error):
232234
self._complete_with_error(invocation, error)
233235
return
234236

235-
invoke_func = functools.partial(self.invoke, invocation)
237+
invoke_func = functools.partial(self._do_invoke, invocation)
236238
self._reactor.add_timer(self._invocation_retry_pause, invoke_func)
237239

238240
def _should_retry(self, invocation, error):

0 commit comments

Comments
 (0)