Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/client_connection_strategy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ starting and reconnecting modes.
Configuring Client Connection Retry
-----------------------------------

When the client is disconnected from the cluster, it searches for new
connections to reconnect. You can configure the frequency of the
reconnection attempts and client shutdown behavior using the argumentes
below.
The client searches for new connections when it is trying to connect
to the cluster. Both the frequency of connection attempts and the
client shutdown behavior can be configured using the arguments below.

.. code:: python

Expand All @@ -69,13 +68,13 @@ The following are configuration element descriptions:
- ``retry_max_backoff``: Specifies the upper limit for the backoff in
seconds. Its default value is ``30``. It must be non-negative.
- ``retry_multiplier``: Factor to multiply the backoff after a failed
retry. Its default value is ``1``. It must be greater than or equal
retry. Its default value is ``1.05``. It must be greater than or equal
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened to the old connection_attempt_limit ?

Is there no longer an option to control the number of connection attempts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, this configuration option is removed from the client in the 4.0 release with a more enhanced exponential backoff feature. You can get the almost same behavior by configuring the cluster_connect_timeout parameter.

Copy link

@Kilo59 Kilo59 Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my use case, I just want to either attempt 1 or 2 connection attempts and then stop (as part of an integration test).

The exponential backoff is a nice feature (and it will be useful in production), but it looks like the logic for limiting the connection attempts is now somewhat convoluted.

Unless I just don't understand.

https://hazelcast.readthedocs.io/en/stable/client.html#hazelcast.client.HazelcastClient

cluster_connect_timeout seems to relate to a single connection attempt, but I'm trying to shorten the total time (or total attempts) spent attempting to reconnect.

Copy link
Contributor Author

@mdumandag mdumandag Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can agree on some of your points but let me clarify the cluster_connect_timeout a little bit.

It is actually an absolute timeout for whole connection attempts.

Let say that you specified 3 possible member addresses in your configuration. Client first will store the time it started trying to connect the cluster. (Let say that it is start_time)

Then, client tries to connect at least one of the three. If it cannot connect any of them, then client tries to sleep a little bit. But before sleeping, it will check the the time passed since the first connection attempt(time.time() - start_time). If it is more than the cluster_connect_timeout, it will throw an exception saying that client could not connect to any of the members. If it is less than that, it will sleep and try to connect possible addresses again. This loop continues until the client reaches the cluster_connect_timeout or connects one of the members.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same logic applies to reconnection process

Copy link

@Kilo59 Kilo59 Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdumandag thanks for the quick response and clarification, that's very helpful.

Timeout value in seconds for the client to give up a connection attempt to the cluster. Must be non-negative. By default, set to 120.0.

Should this description be adjusted? The "a connection attempt" is what threw me off.
I'd be happy to change it.

Sorry for hijacking the PR 😅 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem at all, happy to help you. Let me rephrase that part tomorrow

to ``1``.
- ``retry_jitter``: Specifies by how much to randomize backoffs. Its
default value is ``0``. It must be in range ``0`` to ``1``.
- ``cluster_connect_timeout``: Timeout value in seconds for the client
to give up to connect to the current cluster. Its default value is
``120``.
to give up connecting to the cluster. Its default value is
``-1``. The client will continuously try to connect by default.

A pseudo-code is as follows:

Expand All @@ -86,6 +85,7 @@ A pseudo-code is as follows:
while (try_connect(connection_timeout)) != SUCCESS) {
if (get_current_time() - begin_time >= CLUSTER_CONNECT_TIMEOUT) {
// Give up to connecting to the current cluster and switch to another if exists.
// CLUSTER_CONNECT_TIMEOUT is infinite by default.
}
sleep(current_backoff + uniform_random(-JITTER * current_backoff, JITTER * current_backoff))
current_backoff = min(current_backoff * MULTIPLIER, MAX_BACKOFF)
Expand Down
11 changes: 6 additions & 5 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import sys
import threading

from hazelcast import six
from hazelcast.cluster import ClusterService, _InternalClusterService
from hazelcast.config import _Config
from hazelcast.connection import ConnectionManager, DefaultAddressProvider
Expand Down Expand Up @@ -121,10 +121,11 @@ class HazelcastClient(object):
in range ``[0.0, 1.0]``. By default, set to ``0.0`` (no randomization).
retry_multiplier (float): The factor with which to multiply backoff after a
failed retry. Must be greater than or equal to ``1``. By default,
set to ``1.0``.
set to ``1.05``.
cluster_connect_timeout (float): Timeout value in seconds for the client to
give up a connection attempt to the cluster. Must be non-negative.
By default, set to `120.0`.
give up connecting to the cluster. Must be non-negative or
equal to `-1`. By default, set to `-1`. `-1` means that the client
will not stop trying to the target cluster. (infinite timeout)
portable_version (int): Default value for the portable version if the
class does not have the :func:`get_portable_version` method. Portable
versions are used to differentiate two versions of the
Expand Down Expand Up @@ -677,7 +678,7 @@ def _create_client_name(self, client_id):
@staticmethod
def _get_connection_timeout(config):
timeout = config.connection_timeout
return six.MAXSIZE if timeout == 0 else timeout
return sys.maxsize if timeout == 0 else timeout

@staticmethod
def _init_load_balancer(config):
Expand Down
51 changes: 35 additions & 16 deletions hazelcast/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,22 @@ def _index_type_to_name(index_type):
raise ValueError("Unsupported index type %s" % index_type)


_DEFAULT_CLUSTER_NAME = "dev"
_DEFAULT_CONNECTION_TIMEOUT = 5.0
_DEFAULT_RETRY_INITIAL_BACKOFF = 1.0
_DEFAULT_RETRY_MAX_BACKOFF = 30.0
_DEFAULT_RETRY_JITTER = 0.0
_DEFAULT_RETRY_MULTIPLIER = 1.05
_DEFAULT_CLUSTER_CONNECT_TIMEOUT = -1
_DEFAULT_PORTABLE_VERSION = 0
_DEFAULT_HEARTBEAT_INTERVAL = 5.0
_DEFAULT_HEARTBEAT_TIMEOUT = 60.0
_DEFAULT_INVOCATION_TIMEOUT = 120.0
_DEFAULT_INVOCATION_RETRY_PAUSE = 1.0
_DEFAULT_STATISTICS_PERIOD = 3.0
_DEFAULT_OPERATION_BACKUP_TIMEOUT = 5.0


class _Config(object):
__slots__ = (
"_cluster_members",
Expand Down Expand Up @@ -512,9 +528,9 @@ class _Config(object):

def __init__(self):
self._cluster_members = []
self._cluster_name = "dev"
self._cluster_name = _DEFAULT_CLUSTER_NAME
self._client_name = None
self._connection_timeout = 5.0
self._connection_timeout = _DEFAULT_CONNECTION_TIMEOUT
self._socket_options = []
self._redo_operation = False
self._smart_routing = True
Expand All @@ -528,12 +544,12 @@ def __init__(self):
self._cloud_discovery_token = None
self._async_start = False
self._reconnect_mode = ReconnectMode.ON
self._retry_initial_backoff = 1.0
self._retry_max_backoff = 30.0
self._retry_jitter = 0.0
self._retry_multiplier = 1.0
self._cluster_connect_timeout = 120.0
self._portable_version = 0
self._retry_initial_backoff = _DEFAULT_RETRY_INITIAL_BACKOFF
self._retry_max_backoff = _DEFAULT_RETRY_MAX_BACKOFF
self._retry_jitter = _DEFAULT_RETRY_JITTER
self._retry_multiplier = _DEFAULT_RETRY_MULTIPLIER
self._cluster_connect_timeout = _DEFAULT_CLUSTER_CONNECT_TIMEOUT
self._portable_version = _DEFAULT_PORTABLE_VERSION
self._data_serializable_factories = {}
self._portable_factories = {}
self._class_definitions = []
Expand All @@ -548,15 +564,15 @@ def __init__(self):
self._lifecycle_listeners = []
self._flake_id_generators = {}
self._labels = []
self._heartbeat_interval = 5.0
self._heartbeat_timeout = 60.0
self._invocation_timeout = 120.0
self._invocation_retry_pause = 1.0
self._heartbeat_interval = _DEFAULT_HEARTBEAT_INTERVAL
self._heartbeat_timeout = _DEFAULT_HEARTBEAT_TIMEOUT
self._invocation_timeout = _DEFAULT_INVOCATION_TIMEOUT
self._invocation_retry_pause = _DEFAULT_INVOCATION_RETRY_PAUSE
self._statistics_enabled = False
self._statistics_period = 3.0
self._statistics_period = _DEFAULT_STATISTICS_PERIOD
self._shuffle_member_list = True
self._backup_ack_to_client_enabled = True
self._operation_backup_timeout = 5.0
self._operation_backup_timeout = _DEFAULT_OPERATION_BACKUP_TIMEOUT
self._fail_on_indeterminate_operation_state = False

@property
Expand Down Expand Up @@ -812,8 +828,11 @@ def cluster_connect_timeout(self):
@cluster_connect_timeout.setter
def cluster_connect_timeout(self, value):
if isinstance(value, number_types):
if value < 0:
raise ValueError("cluster_connect_timeout must be non-negative")
if value < 0 and value != _DEFAULT_CLUSTER_CONNECT_TIMEOUT:
raise ValueError(
"cluster_connect_timeout must be non-negative or equal to %s"
% _DEFAULT_CLUSTER_CONNECT_TIMEOUT
)
self._cluster_connect_timeout = value
else:
raise TypeError("cluster_connect_timeout must be a number")
Expand Down
13 changes: 10 additions & 3 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def sleep(self):
time_passed = time.time() - self._cluster_connect_attempt_begin
if time_passed > self._cluster_connect_timeout:
_logger.warning(
"Unable to get live cluster connection, cluster connect timeout (%d) is reached. "
"Unable to get live cluster connection, cluster connect timeout (%ds) is reached. "
"Attempt %d.",
self._cluster_connect_timeout,
self._attempt,
Expand All @@ -69,7 +69,7 @@ def sleep(self):
)
sleep_time = min(sleep_time, self._cluster_connect_timeout - time_passed)
_logger.warning(
"Unable to get live cluster connection, retry in %ds, attempt: %d, "
"Unable to get live cluster connection, retry in %.2fs, attempt: %d, "
"cluster connect timeout: %ds, max backoff: %ds",
sleep_time,
self._attempt,
Expand Down Expand Up @@ -279,11 +279,18 @@ def _trigger_cluster_reconnection(self):
self._start_connect_to_cluster_thread()

def _init_wait_strategy(self, config):
cluster_connect_timeout = config.cluster_connect_timeout
if cluster_connect_timeout == -1:
# If the no timeout is specified by the
# user, or set to -1 explicitly, set
# the timeout to infinite.
cluster_connect_timeout = sys.maxsize

return _WaitStrategy(
config.retry_initial_backoff,
config.retry_max_backoff,
config.retry_multiplier,
config.cluster_connect_timeout,
cluster_connect_timeout,
config.retry_jitter,
)

Expand Down
4 changes: 2 additions & 2 deletions hazelcast/cp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sys
import time
from threading import RLock, Lock

from hazelcast import six
from hazelcast.errors import (
SessionExpiredError,
CPGroupDestroyedError,
Expand Down Expand Up @@ -277,7 +277,7 @@ def is_in_use(self):
def _is_expired(self, timestamp):
expiration_time = self.creation_time + self.ttl
if expiration_time < 0:
expiration_time = six.MAXSIZE
expiration_time = sys.maxsize
return timestamp > expiration_time

def __eq__(self, other):
Expand Down
3 changes: 2 additions & 1 deletion hazelcast/listener.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import sys
import threading
from uuid import uuid4

Expand Down Expand Up @@ -100,7 +101,7 @@ def deregister_listener(self, user_registration_id):
continue

invocation = Invocation(
deregister_request, connection=connection, timeout=six.MAXSIZE, urgent=True
deregister_request, connection=connection, timeout=sys.maxsize, urgent=True
)
self._invocation_service.invoke(invocation)

Expand Down
9 changes: 6 additions & 3 deletions tests/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def test_retry_jitter(self):

def test_retry_multiplier(self):
config = self.config
self.assertEqual(1, config.retry_multiplier)
self.assertEqual(1.05, config.retry_multiplier)

with self.assertRaises(ValueError):
config.retry_multiplier = 0.5
Expand All @@ -306,10 +306,13 @@ def test_retry_multiplier(self):

def test_cluster_connect_timeout(self):
config = self.config
self.assertEqual(120, config.cluster_connect_timeout)
self.assertEqual(-1, config.cluster_connect_timeout)

config.cluster_connect_timeout = -1
self.assertEqual(-1, config.cluster_connect_timeout)

with self.assertRaises(ValueError):
config.cluster_connect_timeout = -1
config.cluster_connect_timeout = -2

with self.assertRaises(TypeError):
config.cluster_connect_timeout = ""
Expand Down
2 changes: 0 additions & 2 deletions tests/connection_strategy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def on_state_change(event):
cluster_members=["localhost:5701"],
cluster_name=self.cluster.id,
reconnect_mode=ReconnectMode.OFF,
cluster_connect_timeout=six.MAXSIZE,
lifecycle_listeners=[event_collector],
)
m = self.client.get_map(random_string()).blocking()
Expand Down Expand Up @@ -119,7 +118,6 @@ def on_state_change(event):
cluster_members=["localhost:5701"],
cluster_name=self.cluster.id,
reconnect_mode=ReconnectMode.ASYNC,
cluster_connect_timeout=six.MAXSIZE,
lifecycle_listeners=[disconnected_collector],
)
m = self.client.get_map(random_string()).blocking()
Expand Down