-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Issue #1252: Fix creating a lot of new Jedis instances on unstable cluster, fix slots clearing without filling #1253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package redis.clients.jedis; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
@@ -13,6 +14,8 @@ | |
|
|
||
| import org.apache.commons.pool2.impl.GenericObjectPoolConfig; | ||
|
|
||
| import redis.clients.jedis.exceptions.JedisConnectionException; | ||
| import redis.clients.jedis.exceptions.JedisException; | ||
| import redis.clients.util.SafeEncoder; | ||
|
|
||
| public class JedisClusterInfoCache { | ||
|
|
@@ -66,7 +69,7 @@ public void discoverClusterNodesAndSlots(Jedis jedis) { | |
| } | ||
|
|
||
| HostAndPort targetNode = generateHostAndPort(hostInfos); | ||
| setNodeIfNotExist(targetNode); | ||
| setupNodeIfNotExist(targetNode); | ||
| if (i == MASTER_NODE_INDEX) { | ||
| assignSlotsToNode(slotNums, targetNode); | ||
| } | ||
|
|
@@ -77,37 +80,34 @@ public void discoverClusterNodesAndSlots(Jedis jedis) { | |
| } | ||
| } | ||
|
|
||
| public void discoverClusterSlots(Jedis jedis) { | ||
| public void renewClusterSlots(Jedis jedis) { | ||
| //If rediscovering is already in process - no need to start one more same rediscovering, just return | ||
| if (!rediscovering) { | ||
| w.lock(); | ||
| rediscovering = true; | ||
|
|
||
| try { | ||
| this.slots.clear(); | ||
|
|
||
| List<Object> slots = jedis.clusterSlots(); | ||
|
|
||
| for (Object slotInfoObj : slots) { | ||
| List<Object> slotInfo = (List<Object>) slotInfoObj; | ||
|
|
||
| if (slotInfo.size() <= 2) { | ||
| continue; | ||
| w.lock(); | ||
| rediscovering = true; | ||
|
|
||
| if (jedis != null) { | ||
| try { | ||
| discoverClusterSlots(jedis); | ||
| return; | ||
| } catch (JedisException e) { | ||
| //try nodes from all pools | ||
| } | ||
| } | ||
|
|
||
| List<Integer> slotNums = getAssignedSlotArray(slotInfo); | ||
|
|
||
| // hostInfos | ||
| List<Object> hostInfos = (List<Object>) slotInfo.get(2); | ||
| if (hostInfos.isEmpty()) { | ||
| continue; | ||
| for (JedisPool jp : getShuffledNodesPool()) { | ||
| try { | ||
| jedis = jp.getResource(); | ||
| discoverClusterSlots(jedis); | ||
| return; | ||
| } catch (JedisConnectionException e) { | ||
| // try next nodes | ||
| } finally { | ||
| if (jedis != null) { | ||
| jedis.close(); | ||
| } | ||
| } | ||
|
|
||
| // at this time, we just use master, discard slave information | ||
| HostAndPort targetNode = generateHostAndPort(hostInfos); | ||
|
|
||
| setNodeIfNotExist(targetNode); | ||
| assignSlotsToNode(slotNums, targetNode); | ||
| } | ||
| } finally { | ||
| rediscovering = false; | ||
|
|
@@ -116,50 +116,81 @@ public void discoverClusterSlots(Jedis jedis) { | |
| } | ||
| } | ||
|
|
||
| private void discoverClusterSlots(Jedis jedis) { | ||
| List<Object> slots = jedis.clusterSlots(); | ||
| this.slots.clear(); | ||
|
|
||
| for (Object slotInfoObj : slots) { | ||
| List<Object> slotInfo = (List<Object>) slotInfoObj; | ||
|
|
||
| if (slotInfo.size() <= MASTER_NODE_INDEX) { | ||
| continue; | ||
| } | ||
|
|
||
| List<Integer> slotNums = getAssignedSlotArray(slotInfo); | ||
|
|
||
| // hostInfos | ||
| List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX); | ||
| if (hostInfos.isEmpty()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it is possible for redis clusterSlots command not to return the node IP and port, it would be a redis bug
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's refactored part of old code |
||
| continue; | ||
| } | ||
|
|
||
| // at this time, we just use master, discard slave information | ||
| HostAndPort targetNode = generateHostAndPort(hostInfos); | ||
| assignSlotsToNode(slotNums, targetNode); | ||
| } | ||
| } | ||
|
|
||
| private HostAndPort generateHostAndPort(List<Object> hostInfos) { | ||
| return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)), | ||
| ((Long) hostInfos.get(1)).intValue()); | ||
| } | ||
|
|
||
| public void setNodeIfNotExist(HostAndPort node) { | ||
| public JedisPool setupNodeIfNotExist(HostAndPort node) { | ||
| w.lock(); | ||
| try { | ||
| String nodeKey = getNodeKey(node); | ||
| if (nodes.containsKey(nodeKey)) return; | ||
| JedisPool existingPool = nodes.get(nodeKey); | ||
| if (existingPool != null) return existingPool; | ||
|
|
||
| JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), | ||
| connectionTimeout, soTimeout, null, 0, null, false, null, null, null); | ||
| nodes.put(nodeKey, nodePool); | ||
| return nodePool; | ||
| } finally { | ||
| w.unlock(); | ||
| } | ||
| } | ||
|
|
||
| public void setNodeIfNotExist(HostAndPort node, boolean ssl) { | ||
| public JedisPool setupNodeIfNotExist(HostAndPort node, boolean ssl) { | ||
| w.lock(); | ||
| try { | ||
| String nodeKey = getNodeKey(node); | ||
| if (nodes.containsKey(nodeKey)) return; | ||
| JedisPool existingPool = nodes.get(nodeKey); | ||
| if (existingPool != null) return existingPool; | ||
|
|
||
| JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), | ||
| connectionTimeout, soTimeout, null, 0, null, ssl, null, null, null); | ||
| nodes.put(nodeKey, nodePool); | ||
| return nodePool; | ||
| } finally { | ||
| w.unlock(); | ||
| } | ||
| } | ||
|
|
||
| public void setNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory sslSocketFactory, | ||
| SSLParameters sslParameters, HostnameVerifier hostnameVerifier) { | ||
| public JedisPool setupNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory sslSocketFactory, | ||
| SSLParameters sslParameters, HostnameVerifier hostnameVerifier) { | ||
| w.lock(); | ||
| try { | ||
| String nodeKey = getNodeKey(node); | ||
| if (nodes.containsKey(nodeKey)) return; | ||
| JedisPool existingPool = nodes.get(nodeKey); | ||
| if (existingPool != null) return existingPool; | ||
|
|
||
| JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), | ||
| connectionTimeout, soTimeout, null, 0, null, ssl, sslSocketFactory, sslParameters, | ||
| hostnameVerifier); | ||
| nodes.put(nodeKey, nodePool); | ||
| return nodePool; | ||
| } finally { | ||
| w.unlock(); | ||
| } | ||
|
|
@@ -168,12 +199,7 @@ public void setNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory ss | |
| public void assignSlotToNode(int slot, HostAndPort targetNode) { | ||
| w.lock(); | ||
| try { | ||
| JedisPool targetPool = nodes.get(getNodeKey(targetNode)); | ||
|
|
||
| if (targetPool == null) { | ||
| setNodeIfNotExist(targetNode); | ||
| targetPool = nodes.get(getNodeKey(targetNode)); | ||
| } | ||
| JedisPool targetPool = setupNodeIfNotExist(targetNode); | ||
| slots.put(slot, targetPool); | ||
| } finally { | ||
| w.unlock(); | ||
|
|
@@ -183,13 +209,7 @@ public void assignSlotToNode(int slot, HostAndPort targetNode) { | |
| public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { | ||
| w.lock(); | ||
| try { | ||
| JedisPool targetPool = nodes.get(getNodeKey(targetNode)); | ||
|
|
||
| if (targetPool == null) { | ||
| setNodeIfNotExist(targetNode); | ||
| targetPool = nodes.get(getNodeKey(targetNode)); | ||
| } | ||
|
|
||
| JedisPool targetPool = setupNodeIfNotExist(targetNode); | ||
| for (Integer slot : targetSlots) { | ||
| slots.put(slot, targetPool); | ||
| } | ||
|
|
@@ -225,6 +245,17 @@ public Map<String, JedisPool> getNodes() { | |
| } | ||
| } | ||
|
|
||
| public List<JedisPool> getShuffledNodesPool() { | ||
| r.lock(); | ||
| try { | ||
| List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values()); | ||
| Collections.shuffle(pools); | ||
| return pools; | ||
| } finally { | ||
| r.unlock(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Clear discovered nodes collections and gently release allocated resources | ||
| */ | ||
|
|
@@ -267,5 +298,4 @@ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) { | |
| } | ||
| return slotNums; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ public Jedis getConnection() { | |
| // ping-pong) | ||
| // or exception if all connections are invalid | ||
|
|
||
| List<JedisPool> pools = getShuffledNodesPool(); | ||
| List<JedisPool> pools = cache.getShuffledNodesPool(); | ||
|
|
||
| for (JedisPool pool : pools) { | ||
| Jedis jedis = null; | ||
|
|
@@ -61,7 +61,14 @@ public Jedis getConnectionFromSlot(int slot) { | |
| // assignment | ||
| return connectionPool.getResource(); | ||
| } else { | ||
| return getConnection(); | ||
| renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state | ||
|
||
| connectionPool = cache.getSlotPool(slot); | ||
| if (connectionPool != null) { | ||
| return connectionPool.getResource(); | ||
| } else { | ||
| //no choice, fallback to new connection to random node | ||
| return getConnection(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing this? Masters could not have replicas, right?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's old code
if (slotInfo.size() <= 2) {, which is absolutely not related to PR. You can ask original author.