Skip to content

Add metrics for tracking total disconnected time and reconnection attempts #3220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 59 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
d743896
v0.1
tishun Mar 12, 2025
0d0e385
Simple reconnect now working
tishun Mar 12, 2025
5f498f2
Bind address from message is now considered
tishun Mar 12, 2025
843a566
Self-register the handler
tishun Mar 13, 2025
9294a67
Format code
tishun Mar 13, 2025
426ef53
Filter push messages in a more stable way
tishun Mar 13, 2025
c4c35c7
(very hacky) Relax comand expire timers globbaly
tishun Mar 13, 2025
843db59
Configure if timeout relaxing should be applied
tishun Mar 13, 2025
a07d88e
Proper way to close channel
tishun Mar 18, 2025
1cd0708
Configure the timneout relaxing
tishun Mar 27, 2025
734d475
Sequential handover implemented
uglide Mar 26, 2024
c1508c9
Did not address formatting
tishun Apr 14, 2025
fd5bf10
Add metric for tracking total disccnnected time on reconnect
ggivo Mar 18, 2025
5604f31
Integration test
ggivo Mar 18, 2025
380e395
formatting
ggivo Mar 18, 2025
21d70f0
clean up & test
ggivo Mar 18, 2025
8acb0d0
rename metric lettuce.reconnection.attempts
ggivo Mar 18, 2025
446b8f2
[Experimental] publish DefaultEndpoint/CommandHandler internal queue …
ggivo Mar 20, 2025
9b644d0
Expose Metric lettuce.endpoint.command.queue tracking endpoint QUEUE_…
ggivo Mar 20, 2025
40c5fee
Revert partially 'clean up & test' since it introduces a bug
ggivo Mar 25, 2025
d8a820d
Fix : do not reset attempt count on subsequent channelInactive
ggivo Apr 2, 2025
9380da3
Trying to troubleshoot stale connection after redis cluster upgrade
ggivo Apr 14, 2025
bdb66b4
Merge proactive connection handover
ggivo Apr 14, 2025
0041294
Fix NPE when using standard connections
ggivo Apr 15, 2025
a0fce76
Prolong the rebind windwow for relaxed tiemouts
tishun Apr 16, 2025
3502380
Prolong the rebind windwow for relaxed tiemouts
tishun Apr 16, 2025
f62bf25
PubSub no longer required; CommandExpiryWriter is now channel aware; …
tishun Apr 28, 2025
07a0559
Use the new MOVING push message from the RE server
tishun May 8, 2025
178c051
PubSub no longer required; CommandExpiryWriter is now channel aware; …
tishun Apr 28, 2025
3669c74
Use the new MOVING push message from the RE server
tishun May 8, 2025
1d72d1e
Unit test was not chaining delgates in the same way that the RedisCli…
tishun May 9, 2025
76e3785
Merge changes from tishun/feature/proactive-watchdog
ggivo May 9, 2025
03bf922
Fix REBIND message validation
ggivo May 12, 2025
baa5183
Merge remote-tracking branch 'tishun/feature/proactive-watchdog' into…
ggivo May 12, 2025
501af13
Fix syntax error after merge
ggivo May 12, 2025
1938b57
v0.1
tishun Mar 12, 2025
f7723c8
Simple reconnect now working
tishun Mar 12, 2025
97761fe
Bind address from message is now considered
tishun Mar 12, 2025
73f3834
Self-register the handler
tishun Mar 13, 2025
cad3370
Format code
tishun Mar 13, 2025
e53dbf6
Filter push messages in a more stable way
tishun Mar 13, 2025
04112c2
(very hacky) Relax comand expire timers globbaly
tishun Mar 13, 2025
fc47155
Configure if timeout relaxing should be applied
tishun Mar 13, 2025
bd85245
Proper way to close channel
tishun Mar 18, 2025
c397419
Configure the timneout relaxing
tishun Mar 27, 2025
465e249
Sequential handover implemented
uglide Mar 26, 2024
b74a168
Did not address formatting
tishun Apr 14, 2025
bb1dd67
Prolong the rebind windwow for relaxed tiemouts
tishun Apr 16, 2025
b5b2118
PubSub no longer required; CommandExpiryWriter is now channel aware; …
tishun Apr 28, 2025
81f783f
Use the new MOVING push message from the RE server
tishun May 8, 2025
b86dfec
Unit test was not chaining delgates in the same way that the RedisCli…
tishun May 9, 2025
b09a297
Fix REBIND message validation
ggivo May 12, 2025
f94cff0
Fixed the expiry mechanism
tishun May 19, 2025
2bb04e8
Polishing
tishun May 19, 2025
43abe71
Merge branch 'feature/proactive-watchdog' into lettuce-observability
ggivo May 19, 2025
30f94ce
formatting
ggivo May 19, 2025
27094c4
Fix NPE.
ggivo May 19, 2025
c7522bb
Merge branch 'feature/proactive-watchdog' into lettuce-observability
ggivo May 19, 2025
5c1d980
Fix error after merrge
ggivo May 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class ClientOptions implements Serializable {

public static final boolean DEFAULT_AUTO_RECONNECT = true;

public static final boolean DEFAULT_PROACTIVE_REBIND = false;

public static final Predicate<RedisCommand<?, ?, ?>> DEFAULT_REPLAY_FILTER = (cmd) -> false;

public static final int DEFAULT_BUFFER_USAGE_RATIO = 3;
Expand Down Expand Up @@ -96,6 +98,8 @@ public class ClientOptions implements Serializable {

private final boolean autoReconnect;

private final boolean proactiveRebind;

private final Predicate<RedisCommand<?, ?, ?>> replayFilter;

private final boolean cancelCommandsOnReconnectFailure;
Expand Down Expand Up @@ -132,6 +136,7 @@ public class ClientOptions implements Serializable {

protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.proactiveRebind = builder.proactiveRebind;
this.replayFilter = builder.replayFilter;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
Expand All @@ -153,6 +158,7 @@ protected ClientOptions(Builder builder) {

protected ClientOptions(ClientOptions original) {
this.autoReconnect = original.isAutoReconnect();
this.proactiveRebind = original.isProactiveRebindEnabled();
this.replayFilter = original.getReplayFilter();
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
Expand Down Expand Up @@ -207,6 +213,8 @@ public static class Builder {

private boolean autoReconnect = DEFAULT_AUTO_RECONNECT;

private boolean proactiveRebind = DEFAULT_PROACTIVE_REBIND;

private Predicate<RedisCommand<?, ?, ?>> replayFilter = DEFAULT_REPLAY_FILTER;

private boolean cancelCommandsOnReconnectFailure = DEFAULT_CANCEL_CMD_RECONNECT_FAIL;
Expand Down Expand Up @@ -256,6 +264,20 @@ public Builder autoReconnect(boolean autoReconnect) {
return this;
}

/**
* Configure whether the driver should listen for server events that indicate the current endpoint is being re-bound.
* When enabled, the proactive re-bind will help with the connection handover and reduce the number of failed commands.
* This feature requires the server to support proactive re-binds. Defaults to {@code false}. See
* {@link #DEFAULT_PROACTIVE_REBIND}.
*
* @param proactiveRebind true/false
* @return {@code this}
*/
public Builder proactiveRebind(boolean proactiveRebind) {
this.proactiveRebind = proactiveRebind;
return this;
}

/**
* When {@link #autoReconnect(boolean)} is set to true, this {@link Predicate} is used to filter commands to replay when
* the connection is reestablished after a disconnect. Returning <code>false</code> means the command will not be
Expand Down Expand Up @@ -551,14 +573,15 @@ public ClientOptions build() {
public ClientOptions.Builder mutate() {
Builder builder = new Builder();

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.replayFilter(getReplayFilter()).decodeBufferPolicy(getDecodeBufferPolicy())
.disconnectedBehavior(getDisconnectedBehavior()).reauthenticateBehavior(getReauthenticateBehaviour())
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
builder.autoReconnect(isAutoReconnect()).proactiveRebind(isProactiveRebindEnabled())
.cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()).replayFilter(getReplayFilter())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
.timeoutOptions(getTimeoutOptions());

return builder;
}
Expand All @@ -576,6 +599,19 @@ public boolean isAutoReconnect() {
return autoReconnect;
}

/**
* Controls auto-reconnect behavior on connections. If auto-reconnect is {@code true} (default), it is enabled. As soon as a
* connection gets closed/reset without the intention to close it, the client will try to reconnect and re-issue any queued
* commands.
*
* This flag has also the effect that disconnected connections will refuse commands and cancel these with an exception.
*
* @return {@code true} if auto-reconnect is enabled.
*/
public boolean isProactiveRebindEnabled() {
return proactiveRebind;
}

/**
* Controls which {@link RedisCommand} will be replayed after a re-connect. The {@link Predicate} returns <code>true</code>
* if command should be filtered out and not replayed. Defaults to {@link #DEFAULT_REPLAY_FILTER}.
Expand Down
14 changes: 11 additions & 3 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import io.lettuce.core.protocol.RebindAwareConnectionWatchdog;
import jdk.net.ExtendedSocketOptions;
import reactor.core.publisher.Mono;
import io.lettuce.core.internal.LettuceAssert;
Expand Down Expand Up @@ -153,9 +154,16 @@ protected ConnectionWatchdog createConnectionWatchdog() {
LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true");
LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true");

ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint);
ConnectionWatchdog watchdog;
if (clientOptions.isProactiveRebindEnabled()) {
watchdog = new RebindAwareConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint, clientResources.connectionMonitor());
} else {
watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap,
clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener,
connection, clientResources.eventBus(), endpoint, clientResources.connectionMonitor());
}

endpoint.registerConnectionWatchdog(watchdog);

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -413,7 +413,7 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -580,7 +580,7 @@ private <K, V> ConnectionFuture<StatefulRedisSentinelConnection<K, V>> doConnect
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getOptions())) {
writer = new CommandExpiryWriter(writer, getOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down
40 changes: 38 additions & 2 deletions src/main/java/io/lettuce/core/TimeoutOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@
@SuppressWarnings("serial")
public class TimeoutOptions implements Serializable {

public static final Duration DISABLED_TIMEOUT = Duration.ZERO.minusSeconds(1);

public static final boolean DEFAULT_TIMEOUT_COMMANDS = false;

public static final Duration DEFAULT_RELAXED_TIMEOUT = DISABLED_TIMEOUT;

private final boolean timeoutCommands;

private final boolean applyConnectionTimeout;

private final Duration relaxedTimeout;

private final TimeoutSource source;

private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source) {
private TimeoutOptions(boolean timeoutCommands, boolean applyConnectionTimeout, TimeoutSource source,
Duration relaxedTimeout) {

this.timeoutCommands = timeoutCommands;
this.applyConnectionTimeout = applyConnectionTimeout;
this.relaxedTimeout = relaxedTimeout;
this.source = source;
}

Expand Down Expand Up @@ -84,6 +92,8 @@ public static class Builder {

private boolean applyConnectionTimeout = false;

private Duration relaxedTimeout = DEFAULT_RELAXED_TIMEOUT;

private TimeoutSource source;

/**
Expand All @@ -107,6 +117,25 @@ public Builder timeoutCommands(boolean enabled) {
return this;
}

/**
* Enable proactive timeout relaxing. Disabled by default, see {@link #DEFAULT_RELAXED_TIMEOUT}.
* <p/>
* If the Redis server supports this, and the client is set up to use it by the
* {@link ClientOptions#isProactiveRebindEnabled()} option, the client would listen to notifications that the current
* endpoint is about to go down (as part of some maintenance activity, for example). In such cases, the driver could
* extend the existing timeout settings for newly issued commands, or such that are in flight, to make sure they do not
* time out during this process. These commands could be either a part of the offline buffer or waiting for a reply.
*
* @param duration {@link Duration} to relax timeouts proactively, must not be {@code null}.
* @return {@code this}
*/
public Builder proactiveTimeoutsRelaxing(Duration duration) {
LettuceAssert.notNull(duration, "Duration must not be null");

this.relaxedTimeout = duration;
return this;
}

/**
* Set a fixed timeout for all commands.
*
Expand Down Expand Up @@ -158,7 +187,7 @@ public TimeoutOptions build() {
}
}

return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source);
return new TimeoutOptions(timeoutCommands, applyConnectionTimeout, source, relaxedTimeout);
}

}
Expand All @@ -177,6 +206,13 @@ public boolean isApplyConnectionTimeout() {
return applyConnectionTimeout;
}

/**
* @return the {@link Duration} to relax timeouts proactively, {@link #DISABLED_TIMEOUT} if disabled.
*/
public Duration getRelaxedTimeout() {
return relaxedTimeout;
}

/**
* @return the timeout source to determine the timeout for a {@link RedisCommand}. Can be {@code null} if
* {@link #isTimeoutCommands()} is {@code false}.
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -634,7 +634,7 @@ <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubToNode
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -680,7 +680,7 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -798,7 +798,7 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con
RedisChannelWriter writer = endpoint;

if (CommandExpiryWriter.isSupported(getClusterClientOptions())) {
writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources());
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getClusterClientOptions(), getResources());
}

if (CommandListenerWriter.isSupported(getCommandListeners())) {
Expand Down Expand Up @@ -1091,6 +1091,7 @@ private CompletionStage<Partitions> fetchPartitions(Iterable<RedisURI> topologyR
getClusterClientOptions().getSocketOptions().getConnectTimeout(), useDynamicRefreshSources());

return topology.thenApply(partitions -> {
logger.debug("Topology Refresh Views: {} ", partitions);

if (partitions.isEmpty()) {
throw new RedisException(String.format("Cannot retrieve initial cluster partitions from initial URIs %s",
Expand All @@ -1108,7 +1109,7 @@ private CompletionStage<Partitions> fetchPartitions(Iterable<RedisURI> topologyR
}

topologyRefreshScheduler.activateTopologyRefreshIfNeeded();

logger.debug("Topology Refresh loadedPartitions: {}", loadedPartitions);
return loadedPartitions;
});
}
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/io/lettuce/core/metrics/ConnectionMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.lettuce.core.metrics;

public interface ConnectionMonitor {

static ConnectionMonitor disabled() {

return new ConnectionMonitor() {

@Override
public void recordDisconnectedTime(String epid, long time) {

}

@Override
public void incrementReconnectionAttempts(String epid) {

}

@Override
public boolean isEnabled() {
return false;
}

};
}

void recordDisconnectedTime(String epid, long time);

void incrementReconnectionAttempts(String epid);

boolean isEnabled();

}
Loading