Skip to content

Commit 69d4080

Browse files
Spikhalskiymarcosnils
authored andcommitted
Issue #1252: Fix creating a lot of new Jedis instances on unstable cluster, fix slots clearing without filling (#1253)
* Issue #1252: Fix creating lot of new Jedis instances on unstable cluster, fix slots clearing without filling * Issue #1252: Acquire one long lock for trying all nodes when rediscover cluster
1 parent 3a637f5 commit 69d4080

File tree

3 files changed

+89
-80
lines changed

3 files changed

+89
-80
lines changed

src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package redis.clients.jedis;
22

33
import java.io.Closeable;
4-
import java.util.ArrayList;
5-
import java.util.Collections;
6-
import java.util.List;
74
import java.util.Map;
85
import java.util.Set;
96

@@ -25,8 +22,7 @@ public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
2522
abstract Jedis getConnectionFromSlot(int slot);
2623

2724
public Jedis getConnectionFromNode(HostAndPort node) {
28-
cache.setNodeIfNotExist(node);
29-
return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource();
25+
return cache.setupNodeIfNotExist(node).getResource();
3026
}
3127

3228
public Map<String, JedisPool> getNodes() {
@@ -53,39 +49,15 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPool
5349
}
5450

5551
public void renewSlotCache() {
56-
for (JedisPool jp : getShuffledNodesPool()) {
57-
Jedis jedis = null;
58-
try {
59-
jedis = jp.getResource();
60-
cache.discoverClusterSlots(jedis);
61-
break;
62-
} catch (JedisConnectionException e) {
63-
// try next nodes
64-
} finally {
65-
if (jedis != null) {
66-
jedis.close();
67-
}
68-
}
69-
}
52+
cache.renewClusterSlots(null);
7053
}
7154

7255
public void renewSlotCache(Jedis jedis) {
73-
try {
74-
cache.discoverClusterSlots(jedis);
75-
} catch (JedisConnectionException e) {
76-
renewSlotCache();
77-
}
56+
cache.renewClusterSlots(jedis);
7857
}
7958

8059
@Override
8160
public void close() {
8261
cache.reset();
8362
}
84-
85-
protected List<JedisPool> getShuffledNodesPool() {
86-
List<JedisPool> pools = new ArrayList<JedisPool>();
87-
pools.addAll(cache.getNodes().values());
88-
Collections.shuffle(pools);
89-
return pools;
90-
}
9163
}

src/main/java/redis/clients/jedis/JedisClusterInfoCache.java

Lines changed: 77 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package redis.clients.jedis;
22

33
import java.util.ArrayList;
4+
import java.util.Collections;
45
import java.util.HashMap;
56
import java.util.List;
67
import java.util.Map;
@@ -13,6 +14,8 @@
1314

1415
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
1516

17+
import redis.clients.jedis.exceptions.JedisConnectionException;
18+
import redis.clients.jedis.exceptions.JedisException;
1619
import redis.clients.util.SafeEncoder;
1720

1821
public class JedisClusterInfoCache {
@@ -68,7 +71,7 @@ public void discoverClusterNodesAndSlots(Jedis jedis) {
6871
}
6972

7073
HostAndPort targetNode = generateHostAndPort(hostInfos);
71-
setNodeIfNotExist(targetNode);
74+
setupNodeIfNotExist(targetNode);
7275
if (i == MASTER_NODE_INDEX) {
7376
assignSlotsToNode(slotNums, targetNode);
7477
}
@@ -79,37 +82,34 @@ public void discoverClusterNodesAndSlots(Jedis jedis) {
7982
}
8083
}
8184

82-
public void discoverClusterSlots(Jedis jedis) {
85+
public void renewClusterSlots(Jedis jedis) {
8386
//If rediscovering is already in process - no need to start one more same rediscovering, just return
8487
if (!rediscovering) {
85-
w.lock();
86-
rediscovering = true;
87-
8888
try {
89-
this.slots.clear();
90-
91-
List<Object> slots = jedis.clusterSlots();
92-
93-
for (Object slotInfoObj : slots) {
94-
List<Object> slotInfo = (List<Object>) slotInfoObj;
95-
96-
if (slotInfo.size() <= 2) {
97-
continue;
89+
w.lock();
90+
rediscovering = true;
91+
92+
if (jedis != null) {
93+
try {
94+
discoverClusterSlots(jedis);
95+
return;
96+
} catch (JedisException e) {
97+
//try nodes from all pools
9898
}
99+
}
99100

100-
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
101-
102-
// hostInfos
103-
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
104-
if (hostInfos.isEmpty()) {
105-
continue;
101+
for (JedisPool jp : getShuffledNodesPool()) {
102+
try {
103+
jedis = jp.getResource();
104+
discoverClusterSlots(jedis);
105+
return;
106+
} catch (JedisConnectionException e) {
107+
// try next nodes
108+
} finally {
109+
if (jedis != null) {
110+
jedis.close();
111+
}
106112
}
107-
108-
// at this time, we just use master, discard slave information
109-
HostAndPort targetNode = generateHostAndPort(hostInfos);
110-
111-
setNodeIfNotExist(targetNode);
112-
assignSlotsToNode(slotNums, targetNode);
113113
}
114114
} finally {
115115
rediscovering = false;
@@ -118,50 +118,81 @@ public void discoverClusterSlots(Jedis jedis) {
118118
}
119119
}
120120

121+
private void discoverClusterSlots(Jedis jedis) {
122+
List<Object> slots = jedis.clusterSlots();
123+
this.slots.clear();
124+
125+
for (Object slotInfoObj : slots) {
126+
List<Object> slotInfo = (List<Object>) slotInfoObj;
127+
128+
if (slotInfo.size() <= MASTER_NODE_INDEX) {
129+
continue;
130+
}
131+
132+
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
133+
134+
// hostInfos
135+
List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
136+
if (hostInfos.isEmpty()) {
137+
continue;
138+
}
139+
140+
// at this time, we just use master, discard slave information
141+
HostAndPort targetNode = generateHostAndPort(hostInfos);
142+
assignSlotsToNode(slotNums, targetNode);
143+
}
144+
}
145+
121146
private HostAndPort generateHostAndPort(List<Object> hostInfos) {
122147
return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
123148
((Long) hostInfos.get(1)).intValue());
124149
}
125150

126-
public void setNodeIfNotExist(HostAndPort node) {
151+
public JedisPool setupNodeIfNotExist(HostAndPort node) {
127152
w.lock();
128153
try {
129154
String nodeKey = getNodeKey(node);
130-
if (nodes.containsKey(nodeKey)) return;
155+
JedisPool existingPool = nodes.get(nodeKey);
156+
if (existingPool != null) return existingPool;
131157

132158
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
133159
connectionTimeout, soTimeout, password, 0, null, false, null, null, null);
134160
nodes.put(nodeKey, nodePool);
161+
return nodePool;
135162
} finally {
136163
w.unlock();
137164
}
138165
}
139166

140-
public void setNodeIfNotExist(HostAndPort node, boolean ssl) {
167+
public JedisPool setupNodeIfNotExist(HostAndPort node, boolean ssl) {
141168
w.lock();
142169
try {
143170
String nodeKey = getNodeKey(node);
144-
if (nodes.containsKey(nodeKey)) return;
171+
JedisPool existingPool = nodes.get(nodeKey);
172+
if (existingPool != null) return existingPool;
145173

146174
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
147175
connectionTimeout, soTimeout, password, 0, null, ssl, null, null, null);
148176
nodes.put(nodeKey, nodePool);
177+
return nodePool;
149178
} finally {
150179
w.unlock();
151180
}
152181
}
153182

154-
public void setNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory sslSocketFactory,
155-
SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
183+
public JedisPool setupNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory sslSocketFactory,
184+
SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
156185
w.lock();
157186
try {
158187
String nodeKey = getNodeKey(node);
159-
if (nodes.containsKey(nodeKey)) return;
188+
JedisPool existingPool = nodes.get(nodeKey);
189+
if (existingPool != null) return existingPool;
160190

161191
JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
162192
connectionTimeout, soTimeout, password, 0, null, ssl, sslSocketFactory, sslParameters,
163193
hostnameVerifier);
164194
nodes.put(nodeKey, nodePool);
195+
return nodePool;
165196
} finally {
166197
w.unlock();
167198
}
@@ -170,12 +201,7 @@ public void setNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory ss
170201
public void assignSlotToNode(int slot, HostAndPort targetNode) {
171202
w.lock();
172203
try {
173-
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
174-
175-
if (targetPool == null) {
176-
setNodeIfNotExist(targetNode);
177-
targetPool = nodes.get(getNodeKey(targetNode));
178-
}
204+
JedisPool targetPool = setupNodeIfNotExist(targetNode);
179205
slots.put(slot, targetPool);
180206
} finally {
181207
w.unlock();
@@ -185,13 +211,7 @@ public void assignSlotToNode(int slot, HostAndPort targetNode) {
185211
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
186212
w.lock();
187213
try {
188-
JedisPool targetPool = nodes.get(getNodeKey(targetNode));
189-
190-
if (targetPool == null) {
191-
setNodeIfNotExist(targetNode);
192-
targetPool = nodes.get(getNodeKey(targetNode));
193-
}
194-
214+
JedisPool targetPool = setupNodeIfNotExist(targetNode);
195215
for (Integer slot : targetSlots) {
196216
slots.put(slot, targetPool);
197217
}
@@ -227,6 +247,17 @@ public Map<String, JedisPool> getNodes() {
227247
}
228248
}
229249

250+
public List<JedisPool> getShuffledNodesPool() {
251+
r.lock();
252+
try {
253+
List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());
254+
Collections.shuffle(pools);
255+
return pools;
256+
} finally {
257+
r.unlock();
258+
}
259+
}
260+
230261
/**
231262
* Clear discovered nodes collections and gently release allocated resources
232263
*/
@@ -269,5 +300,4 @@ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
269300
}
270301
return slotNums;
271302
}
272-
273303
}

src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public Jedis getConnection() {
3131
// ping-pong)
3232
// or exception if all connections are invalid
3333

34-
List<JedisPool> pools = getShuffledNodesPool();
34+
List<JedisPool> pools = cache.getShuffledNodesPool();
3535

3636
for (JedisPool pool : pools) {
3737
Jedis jedis = null;
@@ -65,7 +65,14 @@ public Jedis getConnectionFromSlot(int slot) {
6565
// assignment
6666
return connectionPool.getResource();
6767
} else {
68-
return getConnection();
68+
renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
69+
connectionPool = cache.getSlotPool(slot);
70+
if (connectionPool != null) {
71+
return connectionPool.getResource();
72+
} else {
73+
//no choice, fallback to new connection to random node
74+
return getConnection();
75+
}
6976
}
7077
}
7178
}

0 commit comments

Comments
 (0)