Skip to content

Commit 9ed5a46

Browse files
authored
server should not close clients when client only listens (#109)
* server should not close clients when client only listens * add heartbeat tests * fix heartbeat resume condition * make client_test and heartbeat_test compatible with client properties class
1 parent dcfce9b commit 9ed5a46

File tree

5 files changed

+153
-5
lines changed

5 files changed

+153
-5
lines changed

hazelcast/connection.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,19 +219,20 @@ def _heartbeat(self):
219219
now = time.time()
220220
for connection in list(self._client.connection_manager.connections.values()):
221221
time_since_last_read = now - connection.last_read
222+
time_since_last_write = now - connection.last_write
222223
if time_since_last_read > self._heartbeat_timeout:
223224
if connection.heartbeating:
224225
self.logger.warning(
225226
"Heartbeat: Did not hear back after %ss from %s" % (time_since_last_read, connection))
226227
self._on_heartbeat_stopped(connection)
227-
228-
if time_since_last_read > self._heartbeat_interval:
229-
request = client_ping_codec.encode_request()
230-
self._client.invoker.invoke_on_connection(request, connection, ignore_heartbeat=True)
231228
else:
232229
if not connection.heartbeating:
233230
self._on_heartbeat_restored(connection)
234231

232+
if time_since_last_write > self._heartbeat_interval:
233+
request = client_ping_codec.encode_request()
234+
self._client.invoker.invoke_on_connection(request, connection, ignore_heartbeat=True)
235+
235236
def _on_heartbeat_restored(self, connection):
236237
self.logger.info("Heartbeat: Heartbeat restored for connection %s" % connection)
237238
connection.heartbeating = True
@@ -264,6 +265,7 @@ def __init__(self, address, connection_closed_callback, message_callback):
264265
self._builder = ClientMessageBuilder(message_callback)
265266
self._read_buffer = b""
266267
self.last_read = time.time()
268+
self.last_write = 0
267269

268270
def live(self):
269271
"""

hazelcast/reactor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ def handle_write(self):
150150
except IndexError:
151151
return
152152
sent = self.send(data)
153+
self.last_write = time.time()
153154
self.sent_protocol_bytes = True
154155
if sent < len(data):
155156
self._write_queue.appendleft(data[sent:])
@@ -175,6 +176,7 @@ def write(self, data):
175176
if len(self._write_queue) == 0 and self._write_lock.acquire(False):
176177
try:
177178
sent = self.send(data)
179+
self.last_write = time.time()
178180
if sent < len(data):
179181
self.logger.info("adding to queue")
180182
self._write_queue.appendleft(data[sent:])

tests/client_test.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import time
2+
3+
from tests.base import HazelcastTestCase
4+
from hazelcast.config import ClientConfig, ClientProperties
5+
from hazelcast.client import HazelcastClient
6+
from hazelcast.lifecycle import LIFECYCLE_STATE_DISCONNECTED
7+
8+
9+
class ClientTest(HazelcastTestCase):
10+
def test_client_only_listens(self):
11+
rc = self.create_rc()
12+
client_heartbeat_seconds = 8
13+
14+
cluster_config = """<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.10.xsd"
15+
xmlns="http://www.hazelcast.com/schema/config"
16+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
17+
<properties>
18+
<property name="hazelcast.client.max.no.heartbeat.seconds">{}</property>
19+
</properties>
20+
</hazelcast>""".format(client_heartbeat_seconds)
21+
cluster = self.create_cluster(rc, cluster_config)
22+
member = cluster.start_member()
23+
24+
client_config = ClientConfig()
25+
client_config.set_property(ClientProperties.HEARTBEAT_INTERVAL.name, 1000)
26+
27+
client1 = HazelcastClient(client_config)
28+
29+
def lifecycle_event_collector():
30+
events = []
31+
32+
def event_collector(e):
33+
if e == LIFECYCLE_STATE_DISCONNECTED:
34+
events.append(e)
35+
36+
event_collector.events = events
37+
return event_collector
38+
39+
collector = lifecycle_event_collector()
40+
client1.lifecycle.add_listener(collector)
41+
client2 = HazelcastClient()
42+
43+
key = "topic-name"
44+
topic = client1.get_topic(key)
45+
46+
def message_listener(e):
47+
pass
48+
49+
topic.add_listener(message_listener)
50+
51+
client2topic = client2.get_topic(key)
52+
begin = time.time()
53+
54+
while (time.time() - begin) < 2 * client_heartbeat_seconds:
55+
client2topic.publish("message")
56+
time.sleep(0.5)
57+
58+
self.assertEqual(0, len(collector.events))
59+
client1.shutdown()
60+
client2.shutdown()
61+
rc.exit()

tests/heartbeat_test.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
from hazelcast import HazelcastClient
2+
from hazelcast.core import Address
3+
from tests.base import HazelcastTestCase
4+
from hazelcast.config import ClientConfig, ClientProperties
5+
from tests.util import configure_logging
6+
7+
8+
class HeartbeatTest(HazelcastTestCase):
9+
@classmethod
10+
def setUpClass(cls):
11+
configure_logging()
12+
cls.rc = cls.create_rc()
13+
14+
@classmethod
15+
def tearDownClass(cls):
16+
cls.rc.exit()
17+
18+
def setUp(self):
19+
self.cluster = self.create_cluster(self.rc)
20+
self.member = self.rc.startMember(self.cluster.id)
21+
self.config = ClientConfig()
22+
23+
self.config.set_property(ClientProperties.HEARTBEAT_INTERVAL.name, 500)
24+
self.config.set_property(ClientProperties.HEARTBEAT_TIMEOUT.name, 2000)
25+
26+
self.client = HazelcastClient(self.config)
27+
28+
def tearDown(self):
29+
self.client.shutdown()
30+
self.rc.shutdownCluster(self.cluster.id)
31+
32+
def test_heartbeat_stopped(self):
33+
34+
def member_added_func(m):
35+
def connection_callback(f):
36+
conn = f.result()
37+
self.simulate_heartbeat_lost(self.client, Address(conn._address[0], conn._address[1]), 2)
38+
39+
self.client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback)
40+
41+
self.client.cluster.add_listener(member_added=member_added_func)
42+
43+
def heartbeat_stopped_collector():
44+
connections = []
45+
46+
def connection_collector(c):
47+
connections.append(c)
48+
49+
connection_collector.connections = connections
50+
return connection_collector
51+
52+
def heartbeat_restored_collector():
53+
connections = []
54+
55+
def connection_collector(c):
56+
connections.append(c)
57+
58+
connection_collector.connections = connections
59+
return connection_collector
60+
61+
stopped_collector = heartbeat_stopped_collector()
62+
restored_collector = heartbeat_restored_collector()
63+
64+
self.client.heartbeat.add_listener(on_heartbeat_stopped=stopped_collector,
65+
on_heartbeat_restored=restored_collector)
66+
67+
member2 = self.rc.startMember(self.cluster.id)
68+
69+
def assert_heartbeat_stopped_and_restored():
70+
self.assertEqual(1, len(stopped_collector.connections))
71+
self.assertEqual(1, len(restored_collector.connections))
72+
connection_stopped = stopped_collector.connections[0]
73+
connection_restored = restored_collector.connections[0]
74+
self.assertEqual(connection_stopped._address, (member2.host, member2.port))
75+
self.assertEqual(connection_restored._address, (member2.host, member2.port))
76+
77+
self.assertTrueEventually(assert_heartbeat_stopped_and_restored)
78+
79+
@staticmethod
80+
def simulate_heartbeat_lost(client, address, timeout):
81+
client.connection_manager.connections[address].last_read -= timeout

tests/reconnect_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ def test_start_client_with_no_member(self):
3232
self.create_client(config)
3333

3434
def test_start_client_before_member(self):
35-
Thread(target=self.cluster.start_member).start()
35+
t = Thread(target=self.cluster.start_member)
36+
t.start()
3637
config = ClientConfig()
3738
config.network_config.connection_attempt_limit = 10
3839
self.create_client(config)
40+
t.join()
3941

4042
def test_restart_member(self):
4143
member = self.cluster.start_member()

0 commit comments

Comments
 (0)