Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions hazelcast/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def __init__(self, client, reactor):
config = client.config
smart_routing = config.smart_routing
if smart_routing:
self.invoke = self._invoke_smart
self._do_invoke = self._invoke_smart
else:
self.invoke = self._invoke_non_smart
self._do_invoke = self._invoke_non_smart

self._client = client
self._reactor = reactor
Expand Down Expand Up @@ -109,6 +109,16 @@ def handle_client_message(self, message):

self._notify(invocation, message)

def invoke(self, invocation):
if not invocation.timeout:
invocation.timeout = self._invocation_timeout + time.time()

correlation_id = self._next_correlation_id.get_and_increment()
request = invocation.request
request.set_correlation_id(correlation_id)
request.set_partition_id(invocation.partition_id)
self._do_invoke(invocation)

def shutdown(self):
if self._shutdown:
return
Expand Down Expand Up @@ -141,9 +151,6 @@ def _invoke_on_random_connection(self, invocation):
return self._send(invocation, connection)

def _invoke_smart(self, invocation):
if not invocation.timeout:
invocation.timeout = self._invocation_timeout + time.time()

try:
if not invocation.urgent:
self._check_invocation_allowed_fn()
Expand Down Expand Up @@ -171,9 +178,6 @@ def _invoke_smart(self, invocation):
self._notify_error(invocation, e)

def _invoke_non_smart(self, invocation):
if not invocation.timeout:
invocation.timeout = self._invocation_timeout + time.time()

try:
if not invocation.urgent:
self._check_invocation_allowed_fn()
Expand All @@ -197,10 +201,8 @@ def _send(self, invocation, connection):
if self._backup_ack_to_client_enabled:
invocation.request.set_backup_aware_flag()

correlation_id = self._next_correlation_id.get_and_increment()
message = invocation.request
message.set_correlation_id(correlation_id)
message.set_partition_id(invocation.partition_id)
correlation_id = message.get_correlation_id()
self._pending[correlation_id] = invocation

if invocation.event_handler:
Expand Down Expand Up @@ -232,7 +234,7 @@ def _notify_error(self, invocation, error):
self._complete_with_error(invocation, error)
return

invoke_func = functools.partial(self.invoke, invocation)
invoke_func = functools.partial(self._do_invoke, invocation)
self._reactor.add_timer(self._invocation_retry_pause, invoke_func)

def _should_retry(self, invocation, error):
Expand Down