Skip to content

Commit 4ea8a19

Browse files
authored
[API-701] v5 SQL: Use different logic for getting query connections (#457)
* v5 SQL: Use different logic for getting query connections In v5, the logic behind selecting a query connection is changed a little bit. Formerly, we were relying on the load balancers to fetch the next data member. Now, we are selecting a random connection from the largest same-version (ignoring the patch version) member group. This way, SQL will be available on the client-side during rolling upgrade scenarios. * remove unused method * correctly override _mark_minimum_server_version * address review comments * json -> json-flat * make the name of version markers more clear * update black version
1 parent 3e9bda8 commit 4ea8a19

File tree

15 files changed

+340
-141
lines changed

15 files changed

+340
-141
lines changed

hazelcast/connection.py

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,17 @@
3232
client_authentication_custom_codec,
3333
client_ping_codec,
3434
)
35-
from hazelcast.util import AtomicInteger, calculate_version, UNKNOWN_VERSION
35+
from hazelcast.util import (
36+
AtomicInteger,
37+
calculate_version,
38+
UNKNOWN_VERSION,
39+
member_of_larger_same_version_group,
40+
)
3641

3742
_logger = logging.getLogger(__name__)
3843

3944
_INF = float("inf")
45+
_SQL_CONNECTION_RANDOM_ATTEMPTS = 10
4046

4147

4248
class _WaitStrategy(object):
@@ -156,26 +162,66 @@ def add_listener(self, on_connection_opened=None, on_connection_closed=None):
156162
def get_connection(self, member_uuid):
157163
return self.active_connections.get(member_uuid, None)
158164

159-
def get_random_connection(self, should_get_data_member=False):
165+
def get_random_connection(self):
166+
# Try getting the connection from the load balancer, if smart routing is enabled
160167
if self._smart_routing_enabled:
161-
connection = self._get_connection_from_load_balancer(should_get_data_member)
162-
if connection:
163-
return connection
164-
165-
# We should not get to this point under normal circumstances
166-
# for the smart client. For uni-socket client, there would be
167-
# a single connection in the dict. Therefore, copying the list
168-
# should be acceptable.
169-
for member_uuid, connection in list(six.iteritems(self.active_connections)):
170-
if should_get_data_member:
171-
member = self._cluster_service.get_member(member_uuid)
172-
if not member or member.lite_member:
173-
continue
168+
member = self._load_balancer.next()
169+
if member:
170+
connection = self.get_connection(member.uuid)
171+
if connection:
172+
return connection
174173

174+
# Otherwise iterate over connections and return the first one
175+
for connection in list(six.itervalues(self.active_connections)):
175176
return connection
176177

178+
# Failed to get a connection
177179
return None
178180

181+
def get_random_connection_for_sql(self):
182+
"""Returns a random connection for SQL.
183+
184+
The connection is tried to be selected in the following order.
185+
186+
- Random connection to a data member from the larger same-version
187+
group.
188+
- Random connection to a data member.
189+
- Any random connection
190+
- ``None``, if there is no connection.
191+
192+
Returns:
193+
Connection: A random connection for SQL.
194+
"""
195+
if self._smart_routing_enabled:
196+
# There might be a race - the chosen member might be just connected or disconnected.
197+
# Try a couple of times, the member_of_larger_same_version_group returns a random
198+
# connection, we might be lucky...
199+
for _ in range(_SQL_CONNECTION_RANDOM_ATTEMPTS):
200+
members = self._cluster_service.get_members()
201+
member = member_of_larger_same_version_group(members)
202+
if not member:
203+
break
204+
205+
connection = self.get_connection(member.uuid)
206+
if connection:
207+
return connection
208+
209+
# Otherwise iterate over connections and return the first one
210+
# that's not to a lite member.
211+
first_connection = None
212+
for member_uuid, connection in list(six.iteritems(self.active_connections)):
213+
if not first_connection:
214+
first_connection = connection
215+
216+
member = self._cluster_service.get_member(member_uuid)
217+
if not member or member.lite_member:
218+
continue
219+
220+
return connection
221+
222+
# Failed to get a connection to a data member.
223+
return first_connection
224+
179225
def start(self, load_balancer):
180226
if self.live:
181227
return
@@ -271,20 +317,6 @@ def check_invocation_allowed(self):
271317
else:
272318
raise IOError("No connection found to cluster")
273319

274-
def _get_connection_from_load_balancer(self, should_get_data_member):
275-
load_balancer = self._load_balancer
276-
member = None
277-
if should_get_data_member:
278-
if load_balancer.can_get_next_data_member():
279-
member = load_balancer.next_data_member()
280-
else:
281-
member = load_balancer.next()
282-
283-
if not member:
284-
return None
285-
286-
return self.get_connection(member.uuid)
287-
288320
def _get_or_connect_to_address(self, address):
289321
for connection in list(six.itervalues(self.active_connections)):
290322
if connection.remote_address == address:

hazelcast/sql.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,23 +1313,11 @@ def execute_statement(self, statement):
13131313
Returns:
13141314
SqlResult: The execution result.
13151315
"""
1316-
1317-
# Get a random Data member (non-lite member)
1318-
connection = self._connection_manager.get_random_connection(True)
1319-
if not connection:
1320-
# Either the client is not connected to the cluster, or
1321-
# there are no data members in the cluster.
1322-
raise HazelcastSqlError(
1323-
self.get_client_id(),
1324-
_SqlErrorCode.CONNECTION_PROBLEM,
1325-
"Client is not currently connected to the cluster.",
1326-
None,
1327-
)
1316+
connection = self._get_query_connection()
1317+
# Create a new, unique query id.
1318+
query_id = _SqlQueryId.from_uuid(connection.remote_uuid)
13281319

13291320
try:
1330-
# Create a new, unique query id.
1331-
query_id = _SqlQueryId.from_uuid(connection.remote_uuid)
1332-
13331321
# Serialize the passed parameters.
13341322
serialized_params = [
13351323
self._serialization_service.to_data(param) for param in statement.parameters
@@ -1397,15 +1385,15 @@ def re_raise(self, error, connection):
13971385
13981386
Args:
13991387
error (Exception): The error to reraise.
1400-
connection (hazelcast.connection.Connection): Connection
1388+
connection (hazelcast.connection.Connection|None): Connection
14011389
that the query requests are routed to. If it is not
14021390
live, we will inform the user about the possible
14031391
cluster topology change.
14041392
14051393
Returns:
14061394
HazelcastSqlError: The reraised error.
14071395
"""
1408-
if not connection.live:
1396+
if connection and not connection.live:
14091397
return HazelcastSqlError(
14101398
self.get_client_id(),
14111399
_SqlErrorCode.CONNECTION_PROBLEM,
@@ -1438,6 +1426,22 @@ def close(self, connection, query_id):
14381426
self._invocation_service.invoke(invocation)
14391427
return invocation.future
14401428

1429+
def _get_query_connection(self):
1430+
try:
1431+
connection = self._connection_manager.get_random_connection_for_sql()
1432+
except Exception as e:
1433+
raise self.re_raise(e, None)
1434+
1435+
if not connection:
1436+
raise HazelcastSqlError(
1437+
self.get_client_id(),
1438+
_SqlErrorCode.CONNECTION_PROBLEM,
1439+
"Client is not connected",
1440+
None,
1441+
)
1442+
1443+
return connection
1444+
14411445

14421446
class SqlExpectedResultType(object):
14431447
"""The expected statement result type."""

hazelcast/util.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,3 +454,71 @@ def try_to_get_error_message(error):
454454
elif len(error.args) > 0:
455455
return error.args[0]
456456
return None
457+
458+
459+
def _is_same_version(v1, v2):
460+
# Ignores the patch version
461+
return v1.major == v2.major and v1.minor == v2.minor
462+
463+
464+
def _is_newer_version(v1, v2):
465+
# Ignores the patch version
466+
return v1.major > v2.major or (v1.major == v2.major and v1.minor > v2.minor)
467+
468+
469+
def member_of_larger_same_version_group(members):
470+
"""Finds a larger same-version group of data members from a collection of
471+
members and return a random member from the group.
472+
473+
If the same-version groups have the same size, return a member from the
474+
newer group.
475+
476+
Args:
477+
members (list[hazelcast.core.MemberInfo]): List of all members.
478+
479+
Returns:
480+
hazelcast.core.MemberInfo: The chosen member or ``None``, if no data
481+
member is found.
482+
"""
483+
# The members should have at most 2 different version (ignoring the patch
484+
# version).
485+
486+
version0 = None
487+
version1 = None
488+
count0 = 0
489+
count1 = 0
490+
491+
for member in members:
492+
if member.lite_member:
493+
continue
494+
495+
v = member.version
496+
if not version0 or _is_same_version(version0, v):
497+
version0 = v
498+
count0 += 1
499+
elif not version1 or _is_same_version(version1, v):
500+
version1 = v
501+
count1 += 1
502+
else:
503+
raise ValueError(
504+
"More than 2 distinct member versions found: %s, %s, %s" % (version0, version1, v)
505+
)
506+
507+
# no data members
508+
if count0 == 0:
509+
return None
510+
511+
if count0 > count1 or count0 == count1 and _is_newer_version(version0, version1):
512+
count = count0
513+
version = version0
514+
else:
515+
count = count0
516+
version = version1
517+
518+
# return a random member from the larger group
519+
random_member_idx = random.randrange(0, count)
520+
for member in members:
521+
if not member.lite_member and _is_same_version(version, member.version):
522+
random_member_idx -= 1
523+
if random_member_idx < 0:
524+
return member

requirements-dev.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
-r requirements-test.txt
2-
black==20.8b1; python_version >= "3.6"
2+
black==21.7b0; python_version >= "3.6"
33
Sphinx==3.4.2; python_version >= "3.5"
44
sphinx-rtd-theme==0.5.1; python_version >= "3.5"

tests/integration/backward_compatible/authentication_tests/authentication_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from hazelcast.errors import HazelcastError
55
from tests.base import HazelcastTestCase
6-
from tests.util import get_abs_path, set_attr, is_client_version_older_than
6+
from tests.util import get_abs_path, set_attr, compare_client_version
77
from hazelcast.client import HazelcastClient
88

99
try:
@@ -14,7 +14,7 @@
1414

1515
@set_attr(enterprise=True)
1616
@unittest.skipIf(
17-
is_client_version_older_than("4.2.1"), "Tests the features added in 4.2.1 version of the client"
17+
compare_client_version("4.2.1") < 0, "Tests the features added in 4.2.1 version of the client"
1818
)
1919
class AuthenticationTest(HazelcastTestCase):
2020
current_directory = os.path.dirname(__file__)

tests/integration/backward_compatible/cluster_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from hazelcast import HazelcastClient, six
55
from hazelcast.util import RandomLB, RoundRobinLB
66
from tests.base import HazelcastTestCase
7-
from tests.util import set_attr, random_string, event_collector, mark_client_version_at_least
7+
from tests.util import set_attr, random_string, event_collector, skip_if_client_version_older_than
88

99

1010
class ClusterTest(HazelcastTestCase):
@@ -228,7 +228,7 @@ def assertion():
228228
self.assertEqual(member.uuid, str(members[0].uuid))
229229

230230
def test_when_member_started_with_the_same_address(self):
231-
mark_client_version_at_least(self, "4.2")
231+
skip_if_client_version_older_than(self, "4.2")
232232

233233
old_member = self.cluster.start_member()
234234
self.client = HazelcastClient(cluster_name=self.cluster.id)

tests/integration/backward_compatible/proxy/cp/atomic_long_test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from hazelcast.errors import DistributedObjectDestroyedError
22
from hazelcast.serialization.api import IdentifiedDataSerializable
33
from tests.integration.backward_compatible.proxy.cp import CPTestCase
4-
from tests.util import mark_server_version_at_least
4+
from tests.util import skip_if_server_version_older_than
55

66

77
class Multiplication(IdentifiedDataSerializable):
@@ -108,28 +108,28 @@ def test_set(self):
108108

109109
def test_alter(self):
110110
# the class is defined in the 4.1 JAR
111-
mark_server_version_at_least(self, self.client, "4.1")
111+
skip_if_server_version_older_than(self, self.client, "4.1")
112112
self.atomic_long.set(2)
113113
self.assertIsNone(self.atomic_long.alter(Multiplication(5)))
114114
self.assertEqual(10, self.atomic_long.get())
115115

116116
def test_alter_and_get(self):
117117
# the class is defined in the 4.1 JAR
118-
mark_server_version_at_least(self, self.client, "4.1")
118+
skip_if_server_version_older_than(self, self.client, "4.1")
119119
self.atomic_long.set(-3)
120120
self.assertEqual(-9, self.atomic_long.alter_and_get(Multiplication(3)))
121121
self.assertEqual(-9, self.atomic_long.get())
122122

123123
def test_get_and_alter(self):
124124
# the class is defined in the 4.1 JAR
125-
mark_server_version_at_least(self, self.client, "4.1")
125+
skip_if_server_version_older_than(self, self.client, "4.1")
126126
self.atomic_long.set(123)
127127
self.assertEqual(123, self.atomic_long.get_and_alter(Multiplication(-1)))
128128
self.assertEqual(-123, self.atomic_long.get())
129129

130130
def test_apply(self):
131131
# the class is defined in the 4.1 JAR
132-
mark_server_version_at_least(self, self.client, "4.1")
132+
skip_if_server_version_older_than(self, self.client, "4.1")
133133
self.atomic_long.set(42)
134134
self.assertEqual(84, self.atomic_long.apply(Multiplication(2)))
135135
self.assertEqual(42, self.atomic_long.get())

tests/integration/backward_compatible/proxy/cp/atomic_reference_test.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from tests.integration.backward_compatible.util import write_string_to_output
55
from tests.integration.backward_compatible.proxy.cp import CPTestCase
6-
from tests.util import mark_server_version_at_least
6+
from tests.util import skip_if_server_version_older_than
77

88

99
class AppendString(IdentifiedDataSerializable):
@@ -110,35 +110,35 @@ def test_contains(self):
110110

111111
def test_alter(self):
112112
# the class is defined in the 4.1 JAR
113-
mark_server_version_at_least(self, self.client, "4.1")
113+
skip_if_server_version_older_than(self, self.client, "4.1")
114114
self.ref.set("hey")
115115
self.assertIsNone(self.ref.alter(AppendString("123")))
116116
self.assertEqual("hey123", self.ref.get())
117117

118118
def test_alter_with_incompatible_types(self):
119119
# the class is defined in the 4.1 JAR
120-
mark_server_version_at_least(self, self.client, "4.1")
120+
skip_if_server_version_older_than(self, self.client, "4.1")
121121
self.ref.set(42)
122122
with self.assertRaises(ClassCastError):
123123
self.ref.alter(AppendString("."))
124124

125125
def test_alter_and_get(self):
126126
# the class is defined in the 4.1 JAR
127-
mark_server_version_at_least(self, self.client, "4.1")
127+
skip_if_server_version_older_than(self, self.client, "4.1")
128128
self.ref.set("123")
129129
self.assertEqual("123...", self.ref.alter_and_get(AppendString("...")))
130130
self.assertEqual("123...", self.ref.get())
131131

132132
def test_get_and_alter(self):
133133
# the class is defined in the 4.1 JAR
134-
mark_server_version_at_least(self, self.client, "4.1")
134+
skip_if_server_version_older_than(self, self.client, "4.1")
135135
self.ref.set("hell")
136136
self.assertEqual("hell", self.ref.get_and_alter(AppendString("o")))
137137
self.assertEqual("hello", self.ref.get())
138138

139139
def test_apply(self):
140140
# the class is defined in the 4.1 JAR
141-
mark_server_version_at_least(self, self.client, "4.1")
141+
skip_if_server_version_older_than(self, self.client, "4.1")
142142
self.ref.set("hell")
143143
self.assertEqual("hello", self.ref.apply(AppendString("o")))
144144
self.assertEqual("hell", self.ref.get())

0 commit comments

Comments
 (0)