Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,21 @@ pid = /tmp/stunnel.pid
[redis]
accept = 127.0.0.1:6390
connect = 127.0.0.1:6379
[redis_cluster_1]
accept = 127.0.0.1:8379
connect = 127.0.0.1:7379
[redis_cluster_2]
accept = 127.0.0.1:8380
connect = 127.0.001:7380
[redis_cluster_3]
accept = 127.0.0.1:8381
connect = 127.0.001:7381
[redis_cluster_4]
accept = 127.0.0.1:8382
connect = 127.0.0.1:7382
[redis_cluster_5]
accept = 127.0.0.1:8383
connect = 127.0.0.1:7383
endef

export REDIS1_CONF
Expand Down
17 changes: 16 additions & 1 deletion src/main/java/redis/clients/jedis/BinaryJedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.util.JedisClusterHashTagUtil;
import redis.clients.jedis.util.KeyMergeUtil;
import redis.clients.jedis.util.SafeEncoder;

Expand All @@ -15,9 +16,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.util.JedisClusterHashTagUtil;

public class BinaryJedisCluster implements BinaryJedisClusterCommands,
MultiKeyBinaryJedisClusterCommands, JedisClusterBinaryScriptingCommands, Closeable {
Expand Down Expand Up @@ -64,6 +67,18 @@ public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeo
this.maxAttempts = maxAttempts;
}

public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, String clientName, GenericObjectPoolConfig poolConfig,
boolean ssl) {
this(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig, ssl, null, null, null, null);
}

public BinaryJedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, String clientName, GenericObjectPoolConfig poolConfig,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
this.connectionHandler = new JedisSlotBasedConnectionHandler(jedisClusterNode, poolConfig,
connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
this.maxAttempts = maxAttempts;
}

@Override
public void close() {
if (connectionHandler != null) {
Expand Down
38 changes: 34 additions & 4 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package redis.clients.jedis;

import redis.clients.jedis.params.GeoRadiusParam;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.params.ZAddParams;
import redis.clients.jedis.params.ZIncrByParams;
import redis.clients.jedis.commands.JedisClusterCommands;
import redis.clients.jedis.commands.JedisClusterScriptingCommands;
import redis.clients.jedis.commands.MultiKeyJedisClusterCommands;
import redis.clients.jedis.util.JedisClusterHashTagUtil;
import redis.clients.jedis.util.KeyMergeUtil;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.util.JedisClusterHashTagUtil;

public class JedisCluster extends BinaryJedisCluster implements JedisClusterCommands,
MultiKeyJedisClusterCommands, JedisClusterScriptingCommands {

Expand Down Expand Up @@ -62,6 +64,20 @@ public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout,
this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig);
}

public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
boolean ssl) {
this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig, ssl);
}

public JedisCluster(HostAndPort node, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
this(Collections.singleton(node), connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig,
ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
}

public JedisCluster(Set<HostAndPort> nodes) {
this(nodes, DEFAULT_TIMEOUT);
}
Expand Down Expand Up @@ -100,7 +116,21 @@ public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, in
public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig) {
super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig);
}
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
boolean ssl) {
super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig, ssl);
}

public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout,
int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig,
ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMap);
}

@Override
public String set(final String key, final String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import java.io.Closeable;
import java.util.Map;
import java.util.Set;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

Expand All @@ -12,15 +15,23 @@ public abstract class JedisClusterConnectionHandler implements Closeable {
protected final JedisClusterInfoCache cache;

public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
this(nodes, poolConfig, connectionTimeout, soTimeout, password, null);
}

public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName);
initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName);
}
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {
this(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName, false, null, null, null, null);
}

public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName,
ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
}

abstract Jedis getConnection();

Expand All @@ -35,11 +46,12 @@ public Map<String, JedisPool> getNodes() {
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig,
int connectionTimeout, int soTimeout, String password, String clientName) {
int connectionTimeout, int soTimeout, String password, String clientName,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = null;
try {
jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);
jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
if (password != null) {
jedis.auth(password);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package redis.clients.jedis;

public interface JedisClusterHostAndPortMap {
HostAndPort getSSLHostAndPort(String host, int port);
}
35 changes: 32 additions & 3 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

Expand All @@ -29,6 +32,12 @@ public class JedisClusterInfoCache {
private String password;
private String clientName;

private boolean ssl;
private SSLSocketFactory sslSocketFactory;
private SSLParameters sslParameters;
private HostnameVerifier hostnameVerifier;
private JedisClusterHostAndPortMap hostAndPortMap;

private static final int MASTER_NODE_INDEX = 2;

public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) {
Expand All @@ -37,11 +46,23 @@ public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeo

public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,
final int connectionTimeout, final int soTimeout, final String password, final String clientName) {
this(poolConfig, connectionTimeout, soTimeout, password, clientName, false, null, null, null, null);
}

public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig,
final int connectionTimeout, final int soTimeout, final String password, final String clientName,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap hostAndPortMap) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.clientName = clientName;
this.ssl = ssl;
this.sslSocketFactory = sslSocketFactory;
this.sslParameters = sslParameters;
this.hostnameVerifier = hostnameVerifier;
this.hostAndPortMap = hostAndPortMap;
}

public void discoverClusterNodesAndSlots(Jedis jedis) {
Expand Down Expand Up @@ -143,8 +164,15 @@ private void discoverClusterSlots(Jedis jedis) {
}

private HostAndPort generateHostAndPort(List<Object> hostInfos) {
return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
((Long) hostInfos.get(1)).intValue());
String host = SafeEncoder.encode((byte[]) hostInfos.get(0));
int port = ((Long) hostInfos.get(1)).intValue();
if (ssl && hostAndPortMap != null) {
HostAndPort hostAndPort = hostAndPortMap.getSSLHostAndPort(host, port);
if (hostAndPort != null) {
return hostAndPort;
}
}
return new HostAndPort(host, port);
}

public JedisPool setupNodeIfNotExist(HostAndPort node) {
Expand All @@ -155,7 +183,8 @@ public JedisPool setupNodeIfNotExist(HostAndPort node) {
if (existingPool != null) return existingPool;

JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
connectionTimeout, soTimeout, password, 0, clientName, false, null, null, null);
connectionTimeout, soTimeout, password, 0, clientName,
ssl, sslSocketFactory, sslParameters, hostnameVerifier);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.util.List;
import java.util.Set;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

Expand All @@ -28,6 +31,11 @@ public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPool
super(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName);
}

public JedisSlotBasedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
super(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
}

@Override
public Jedis getConnection() {
// In antirez's redis-rb-cluster implementation,
Expand Down
Loading