|
| 1 | +import time |
| 2 | +import itertools |
| 3 | + |
1 | 4 | from hazelcast import HazelcastClient |
2 | 5 | from hazelcast.core import Address |
3 | 6 | from tests.base import HazelcastTestCase |
@@ -32,45 +35,45 @@ def tearDown(self): |
32 | 35 | def test_heartbeat_stopped(self): |
33 | 36 |
|
34 | 37 | def member_added_func(m): |
| 38 | + retry_count = itertools.count(3, -1) |
| 39 | + |
35 | 40 | def connection_callback(f): |
36 | | - conn = f.result() |
37 | | - self.simulate_heartbeat_lost(self.client, Address(conn._address[0], conn._address[1]), 2) |
| 41 | + try: |
| 42 | + conn = f.result() |
| 43 | + self.simulate_heartbeat_lost(self.client, Address(conn._address[0], conn._address[1]), 2) |
| 44 | + except: |
| 45 | + if next(retry_count) > 0: |
| 46 | + time.sleep(1) |
| 47 | + self.client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) |
| 48 | + else: |
| 49 | + self.fail("Couldn't connect to address {}".format(m.address)) |
38 | 50 |
|
39 | 51 | self.client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) |
40 | 52 |
|
41 | 53 | self.client.cluster.add_listener(member_added=member_added_func) |
42 | 54 |
|
43 | | - def heartbeat_stopped_collector(): |
44 | | - connections = [] |
45 | | - |
46 | | - def connection_collector(c): |
47 | | - connections.append(c) |
48 | | - |
49 | | - connection_collector.connections = connections |
50 | | - return connection_collector |
51 | | - |
52 | | - def heartbeat_restored_collector(): |
| 55 | + def connection_collector(): |
53 | 56 | connections = [] |
54 | 57 |
|
55 | | - def connection_collector(c): |
| 58 | + def collector(c): |
56 | 59 | connections.append(c) |
57 | 60 |
|
58 | | - connection_collector.connections = connections |
59 | | - return connection_collector |
| 61 | + collector.connections = connections |
| 62 | + return collector |
60 | 63 |
|
61 | | - stopped_collector = heartbeat_stopped_collector() |
62 | | - restored_collector = heartbeat_restored_collector() |
| 64 | + heartbeat_stopped_collector = connection_collector() |
| 65 | + heartbeat_restored_collector = connection_collector() |
63 | 66 |
|
64 | | - self.client.heartbeat.add_listener(on_heartbeat_stopped=stopped_collector, |
65 | | - on_heartbeat_restored=restored_collector) |
| 67 | + self.client.heartbeat.add_listener(on_heartbeat_stopped=heartbeat_stopped_collector, |
| 68 | + on_heartbeat_restored=heartbeat_restored_collector) |
66 | 69 |
|
67 | 70 | member2 = self.rc.startMember(self.cluster.id) |
68 | 71 |
|
69 | 72 | def assert_heartbeat_stopped_and_restored(): |
70 | | - self.assertEqual(1, len(stopped_collector.connections)) |
71 | | - self.assertEqual(1, len(restored_collector.connections)) |
72 | | - connection_stopped = stopped_collector.connections[0] |
73 | | - connection_restored = restored_collector.connections[0] |
| 73 | + self.assertEqual(1, len(heartbeat_stopped_collector.connections)) |
| 74 | + self.assertEqual(1, len(heartbeat_restored_collector.connections)) |
| 75 | + connection_stopped = heartbeat_stopped_collector.connections[0] |
| 76 | + connection_restored = heartbeat_restored_collector.connections[0] |
74 | 77 | self.assertEqual(connection_stopped._address, (member2.host, member2.port)) |
75 | 78 | self.assertEqual(connection_restored._address, (member2.host, member2.port)) |
76 | 79 |
|
|
0 commit comments