Skip to content

DATAREDIS-976 - Allow extension of Lettuce Connection and Subscription classes. #457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
DATAREDIS-976 - Allow extension of Lettuce Connection and Subscriptio…
…n classes.

LettuceConnection, LettuceClusterConnection, and LettuceSubscription can now be properly subclassed so they can be extended and created by LettuceConnectionFactory.

LettuceConnectionFactory provides template methods doCreateLettuceConnection and doCreateLettuceClusterConnection.

Original pull request: #450.
  • Loading branch information
mp911de committed Jun 18, 2019
commit cfc22593b02a4745d2cfd6446d93c793d9f31477
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
Expand Down Expand Up @@ -66,7 +67,6 @@ public class LettuceClusterConnection extends LettuceConnection implements Defau
new LettuceExceptionConverter());

private final Log log = LogFactory.getLog(getClass());
private final RedisClusterClient clusterClient;

private ClusterCommandExecutor clusterCommandExecutor;
private ClusterTopologyProvider topologyProvider;
Expand Down Expand Up @@ -121,8 +121,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider) {
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
"LettuceConnectionProvider must be a ClusterConnectionProvider.");

this.clusterClient = getClient();
this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
this.clusterCommandExecutor = new ClusterCommandExecutor(this.topologyProvider,
new LettuceClusterNodeResourceProvider(getConnectionProvider()), exceptionConverter);
this.disposeClusterCommandExecutorOnClose = true;
Expand Down Expand Up @@ -158,8 +157,7 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
Assert.isTrue(connectionProvider instanceof ClusterConnectionProvider,
"LettuceConnectionProvider must be a ClusterConnectionProvider.");

this.clusterClient = getClient();
this.topologyProvider = new LettuceClusterTopologyProvider(this.clusterClient);
this.topologyProvider = new LettuceClusterTopologyProvider(getClient());
this.clusterCommandExecutor = executor;
this.disposeClusterCommandExecutorOnClose = false;
}
Expand All @@ -170,22 +168,20 @@ public LettuceClusterConnection(LettuceConnectionProvider connectionProvider, Cl
*
* @param sharedConnection may be {@literal null} if no shared connection used.
* @param connectionProvider must not be {@literal null}.
* @param clusterClient must not be {@literal null}.
* @param clusterTopologyProvider must not be {@literal null}.
* @param executor must not be {@literal null}.
* @param timeout must not be {@literal null}.
* @since 2.1
*/
LettuceClusterConnection(@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, RedisClusterClient clusterClient, ClusterCommandExecutor executor,
Duration timeout) {
protected LettuceClusterConnection(@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, ClusterTopologyProvider clusterTopologyProvider,
ClusterCommandExecutor executor, Duration timeout) {

super(sharedConnection, connectionProvider, timeout.toMillis(), 0);

Assert.notNull(executor, "ClusterCommandExecutor must not be null.");
Assert.notNull(clusterClient, "RedisClusterClient must not be null.");

this.clusterClient = clusterClient;
this.topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
this.topologyProvider = clusterTopologyProvider;
this.clusterCommandExecutor = executor;
this.disposeClusterCommandExecutorOnClose = false;
}
Expand All @@ -205,13 +201,6 @@ private RedisClusterClient getClient() {
connectionProvider.getClass().getName()));
}

/**
* @return access to {@link RedisClusterClient} for non-connection access.
*/
private Partitions getPartitions() {
return clusterClient.getPartitions();
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.lettuce.LettuceConnection#geoCommands()
Expand Down Expand Up @@ -330,7 +319,11 @@ public Integer clusterGetSlotForKey(byte[] key) {
@Override
public RedisClusterNode clusterGetNodeForSlot(int slot) {

return LettuceConverters.toRedisClusterNode(getPartitions().getPartitionBySlot(slot));
Set<RedisClusterNode> nodes = topologyProvider.getTopology().getSlotServingNodes(slot);
if (nodes.isEmpty()) {
return null;
}
return nodes.iterator().next();
}

/*
Expand Down Expand Up @@ -574,7 +567,7 @@ public void select(int dbIndex) {
*/
@Override
public List<RedisClusterNode> clusterGetNodes() {
return LettuceConverters.partitionsToClusterNodes(getPartitions());
return new ArrayList<>(topologyProvider.getTopology().getNodes());
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,21 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {
}

private LettuceSubscription initSubscription(MessageListener listener) {
return new LettuceSubscription(listener, switchToPubSub(), connectionProvider);
return doCreateSubscription(listener, switchToPubSub(), connectionProvider);
}

/**
* Customization hook to create a {@link LettuceSubscription}.
*
* @param listener the {@link MessageListener} to notify.
* @param connection Pub/Sub connection.
* @param connectionProvider the {@link LettuceConnectionProvider} for connection release.
* @return a {@link LettuceSubscription}.
* @since 2.2
*/
protected LettuceSubscription doCreateSubscription(MessageListener listener,
StatefulRedisPubSubConnection<byte[], byte[]> connection, LettuceConnectionProvider connectionProvider) {
return new LettuceSubscription(listener, connection, connectionProvider);
}

void pipeline(LettuceResult result) {
Expand Down Expand Up @@ -1250,7 +1264,7 @@ public CommandOutput getTypeHint(CommandType type, CommandOutput defaultType) {
}

@RequiredArgsConstructor
private class LettucePoolConnectionProvider implements LettuceConnectionProvider {
static class LettucePoolConnectionProvider implements LettuceConnectionProvider {

private final LettucePool pool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.redis.connection.lettuce;

import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
Expand All @@ -40,6 +42,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
Expand Down Expand Up @@ -273,7 +276,7 @@ public void afterPropertiesSet() {

this.client = createClient();

this.connectionProvider = createConnectionProvider(client, LettuceConnection.CODEC);
this.connectionProvider = createConnectionProvider(client, CODEC);
this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);

if (isClusterAware()) {
Expand Down Expand Up @@ -341,13 +344,7 @@ public RedisConnection getConnection() {
}

LettuceConnection connection;

if (pool != null) {
connection = new LettuceConnection(getSharedConnection(), getTimeout(), null, pool, getDatabase());
} else {
connection = new LettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
}

connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), getDatabase());
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return connection;
}
Expand All @@ -365,12 +362,51 @@ public RedisClusterConnection getClusterConnection() {

RedisClusterClient clusterClient = (RedisClusterClient) client;

return getShareNativeConnection()
? new LettuceClusterConnection(
(StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection(),
connectionProvider, clusterClient, clusterCommandExecutor, clientConfiguration.getCommandTimeout())
: new LettuceClusterConnection(null, connectionProvider, clusterClient, clusterCommandExecutor,
clientConfiguration.getCommandTimeout());
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection = getShareNativeConnection()
? (StatefulRedisClusterConnection<byte[], byte[]>) getOrCreateSharedConnection().getConnection()
: null;

LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider,
clusterCommandExecutor, clientConfiguration.getCommandTimeout());
}

/**
* Customization hook for {@link LettuceConnection} creation.
*
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
* {@literal true}; {@literal null} otherwise.
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
* @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
* @param database database index to operate on.
* @return the {@link LettuceConnection}.
* @since 2.2
*/
protected LettuceConnection doCreateLettuceConnection(StatefulRedisConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, long timeout, int database) {

return new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
}

/**
* Customization hook for {@link LettuceClusterConnection} creation.
*
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
* {@literal true}; {@literal null} otherwise.
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
* @param topologyProvider the {@link ClusterTopologyProvider}.
* @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
* @param commandTimeout command timeout {@link Duration}.
* @return the {@link LettuceConnection}.
* @since 2.2
*/
protected LettuceClusterConnection doCreateLettuceClusterConnection(
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider,
ClusterTopologyProvider topologyProvider, ClusterCommandExecutor clusterCommandExecutor,
Duration commandTimeout) {

return new LettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider, clusterCommandExecutor,
commandTimeout);
}

/*
Expand Down Expand Up @@ -909,6 +945,10 @@ protected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection

private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {

if (this.pool != null) {
return new LettucePoolConnectionProvider(this.pool);
}

LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);

if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,23 @@
* @author Mark Paluch
* @author Christoph Strobl
*/
class LettuceSubscription extends AbstractSubscription {
public class LettuceSubscription extends AbstractSubscription {

private final StatefulRedisPubSubConnection<byte[], byte[]> connection;
private final LettuceMessageListener listener;
private final LettuceConnectionProvider connectionProvider;
private final RedisPubSubCommands<byte[], byte[]> pubsub;

LettuceSubscription(MessageListener listener, StatefulRedisPubSubConnection<byte[], byte[]> pubsubConnection,
LettuceConnectionProvider connectionProvider) {
/**
* Creates a new {@link LettuceSubscription} given {@link MessageListener}, {@link StatefulRedisPubSubConnection}, and
* {@link LettuceConnectionProvider}.
*
* @param listener the listener to notify, must not be {@literal null}.
* @param pubsubConnection must not be {@literal null}.
* @param connectionProvider must not be {@literal null}.
*/
protected LettuceSubscription(MessageListener listener,
StatefulRedisPubSubConnection<byte[], byte[]> pubsubConnection, LettuceConnectionProvider connectionProvider) {

super(listener);

Expand All @@ -52,33 +60,33 @@ protected StatefulRedisPubSubConnection<byte[], byte[]> getNativeConnection() {
return connection;
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doClose()
*/
protected void doClose() {

if (!getChannels().isEmpty()) {
pubsub.unsubscribe(new byte[0]);
doUnsubscribe(true, new byte[0]);
}

if (!getPatterns().isEmpty()) {
pubsub.punsubscribe(new byte[0]);
doPUnsubscribe(true, new byte[0]);
}

connection.removeListener(this.listener);
connectionProvider.release(connection);
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doPsubscribe(byte[][])
*/
protected void doPsubscribe(byte[]... patterns) {
pubsub.psubscribe(patterns);
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doPUnsubscribe(boolean, byte[][])
*/
Expand All @@ -88,15 +96,15 @@ protected void doPUnsubscribe(boolean all, byte[]... patterns) {
pubsub.punsubscribe(patterns);
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doSubscribe(byte[][])
*/
protected void doSubscribe(byte[]... channels) {
pubsub.subscribe(channels);
}

/*
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.util.AbstractSubscription#doUnsubscribe(boolean, byte[][])
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.data.redis.connection.ClusterCommandExecutor;
import org.springframework.data.redis.connection.ClusterNodeResourceProvider;
import org.springframework.data.redis.connection.ClusterTopologyProvider;
import org.springframework.data.redis.connection.RedisClusterCommands.AddSlots;
import org.springframework.data.redis.connection.RedisClusterNode;

Expand All @@ -67,6 +68,7 @@ public class LettuceClusterConnectionUnitTests {
static final byte[] KEY_3_BYTES = KEY_3.getBytes();

@Mock RedisClusterClient clusterMock;
@Mock ClusterTopologyProvider topologyProviderMock;

@Mock LettuceConnectionProvider connectionProviderMock;
@Mock ClusterCommandExecutor executorMock;
Expand Down Expand Up @@ -363,7 +365,7 @@ public void shouldExecuteOnSharedConnection() {
when(sharedConnectionMock.sync()).thenReturn(sync);

LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock,
clusterMock, executorMock, Duration.ZERO);
topologyProviderMock, executorMock, Duration.ZERO);

connection.keyCommands().del(KEY_1_BYTES);

Expand All @@ -381,7 +383,7 @@ public void shouldExecuteOnDedicatedConnection() {
when(dedicatedConnection.sync()).thenReturn(sync);

LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnectionMock, connectionProviderMock,
clusterMock, executorMock, Duration.ZERO);
topologyProviderMock, executorMock, Duration.ZERO);

connection.listCommands().bLPop(1, KEY_1_BYTES);

Expand Down
Loading