Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package redis.clients.jedis;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -25,8 +22,7 @@ public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
abstract Jedis getConnectionFromSlot(int slot);

public Jedis getConnectionFromNode(HostAndPort node) {
cache.setNodeIfNotExist(node);
return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource();
return cache.setupNodeIfNotExist(node).getResource();
}

public Map<String, JedisPool> getNodes() {
Expand All @@ -50,39 +46,15 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPool
}

public void renewSlotCache() {
for (JedisPool jp : getShuffledNodesPool()) {
Jedis jedis = null;
try {
jedis = jp.getResource();
cache.discoverClusterSlots(jedis);
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
cache.renewClusterSlots(null);
}

public void renewSlotCache(Jedis jedis) {
try {
cache.discoverClusterSlots(jedis);
} catch (JedisConnectionException e) {
renewSlotCache();
}
cache.renewClusterSlots(jedis);
}

@Override
public void close() {
cache.reset();
}

protected List<JedisPool> getShuffledNodesPool() {
List<JedisPool> pools = new ArrayList<JedisPool>();
pools.addAll(cache.getNodes().values());
Collections.shuffle(pools);
return pools;
}
}
124 changes: 77 additions & 47 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis.clients.jedis;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -13,6 +14,8 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.SafeEncoder;

public class JedisClusterInfoCache {
Expand Down Expand Up @@ -66,7 +69,7 @@ public void discoverClusterNodesAndSlots(Jedis jedis) {
}

HostAndPort targetNode = generateHostAndPort(hostInfos);
setNodeIfNotExist(targetNode);
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
Expand All @@ -77,37 +80,34 @@ public void discoverClusterNodesAndSlots(Jedis jedis) {
}
}

public void discoverClusterSlots(Jedis jedis) {
public void renewClusterSlots(Jedis jedis) {
//If rediscovering is already in process - no need to start one more same rediscovering, just return
if (!rediscovering) {
w.lock();
rediscovering = true;

try {
this.slots.clear();

List<Object> slots = jedis.clusterSlots();

for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;

if (slotInfo.size() <= 2) {
continue;
w.lock();
rediscovering = true;

if (jedis != null) {
try {
discoverClusterSlots(jedis);
return;
} catch (JedisException e) {
//try nodes from all pools
}
}

List<Integer> slotNums = getAssignedSlotArray(slotInfo);

// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
if (hostInfos.isEmpty()) {
continue;
for (JedisPool jp : getShuffledNodesPool()) {
try {
jedis = jp.getResource();
discoverClusterSlots(jedis);
return;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}

// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);

setNodeIfNotExist(targetNode);
assignSlotsToNode(slotNums, targetNode);
}
} finally {
rediscovering = false;
Expand All @@ -116,50 +116,81 @@ public void discoverClusterSlots(Jedis jedis) {
}
}

private void discoverClusterSlots(Jedis jedis) {
List<Object> slots = jedis.clusterSlots();
this.slots.clear();

for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;

if (slotInfo.size() <= MASTER_NODE_INDEX) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this? Masters could not have replicas, right?

Copy link
Contributor Author

@Spikhalskiy Spikhalskiy Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's old code if (slotInfo.size() <= 2) {, which is absolutely not related to PR. You can ask original author.

continue;
}

List<Integer> slotNums = getAssignedSlotArray(slotInfo);

// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
if (hostInfos.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is possible for redis clusterSlots command not to return the node IP and port, it would be a redis bug

Copy link
Contributor Author

@Spikhalskiy Spikhalskiy Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's refactored part of old code if (hostInfos.isEmpty()) { continue; which is not related to PR, I just made it more readable.

continue;
}

// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);
assignSlotsToNode(slotNums, targetNode);
}
}

private HostAndPort generateHostAndPort(List<Object> hostInfos) {
return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
((Long) hostInfos.get(1)).intValue());
}

public void setNodeIfNotExist(HostAndPort node) {
public JedisPool setupNodeIfNotExist(HostAndPort node) {
w.lock();
try {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey)) return;
JedisPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool;

JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
connectionTimeout, soTimeout, null, 0, null, false, null, null, null);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
w.unlock();
}
}

public void setNodeIfNotExist(HostAndPort node, boolean ssl) {
public JedisPool setupNodeIfNotExist(HostAndPort node, boolean ssl) {
w.lock();
try {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey)) return;
JedisPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool;

JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
connectionTimeout, soTimeout, null, 0, null, ssl, null, null, null);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
w.unlock();
}
}

public void setNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory sslSocketFactory,
SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
public JedisPool setupNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory sslSocketFactory,
SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
w.lock();
try {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey)) return;
JedisPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool;

JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
connectionTimeout, soTimeout, null, 0, null, ssl, sslSocketFactory, sslParameters,
hostnameVerifier);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
w.unlock();
}
Expand All @@ -168,12 +199,7 @@ public void setNodeIfNotExist(HostAndPort node, boolean ssl, SSLSocketFactory ss
public void assignSlotToNode(int slot, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));

if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
JedisPool targetPool = setupNodeIfNotExist(targetNode);
slots.put(slot, targetPool);
} finally {
w.unlock();
Expand All @@ -183,13 +209,7 @@ public void assignSlotToNode(int slot, HostAndPort targetNode) {
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));

if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}

JedisPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
Expand Down Expand Up @@ -225,6 +245,17 @@ public Map<String, JedisPool> getNodes() {
}
}

public List<JedisPool> getShuffledNodesPool() {
r.lock();
try {
List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());
Collections.shuffle(pools);
return pools;
} finally {
r.unlock();
}
}

/**
* Clear discovered nodes collections and gently release allocated resources
*/
Expand Down Expand Up @@ -267,5 +298,4 @@ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
}
return slotNums;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Jedis getConnection() {
// ping-pong)
// or exception if all connections are invalid

List<JedisPool> pools = getShuffledNodesPool();
List<JedisPool> pools = cache.getShuffledNodesPool();

for (JedisPool pool : pools) {
Jedis jedis = null;
Expand Down Expand Up @@ -61,7 +61,14 @@ public Jedis getConnectionFromSlot(int slot) {
// assignment
return connectionPool.getResource();
} else {
return getConnection();
renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems nice cause renewSlotCache() should succeed to renew slot cache if cluster is not down. Great improvement.

connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
return connectionPool.getResource();
} else {
//no choice, fallback to new connection to random node
return getConnection();
}
}
}
}