Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -627,13 +627,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE)
&& ctx.channel().attr(REBIND_ATTRIBUTE).get() != null
&& ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED);
if (debugEnabled && rebindInProgress) {
logger.debug("{} Processing command while rebind is in progress, stack has {} more to process", logPrefix(),
stack.size());
}

if (pristine) {

Expand Down Expand Up @@ -720,9 +713,17 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
}
}

if (rebindInProgress && stack.isEmpty()) {
logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now());
ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
final boolean rebindInProgress = ctx.channel().hasAttr(REBIND_ATTRIBUTE)
&& ctx.channel().attr(REBIND_ATTRIBUTE).get() != null
&& ctx.channel().attr(REBIND_ATTRIBUTE).get().equals(RebindState.STARTED);

if (rebindInProgress) {
if (stack.isEmpty()) {
logger.info("{} Rebind completed at {}", logPrefix(), LocalTime.now());
ctx.channel().attr(REBIND_ATTRIBUTE).set(RebindState.COMPLETED);
} else {
logger.debug("{} Rebind in progress, {} commands remaining in the stack", logPrefix(), stack.size());
}
}

decodeBufferPolicy.afterDecoding(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,13 @@ private static final class State {
this.rebindAddress = rebindAddress;
}

public String toString() {
StringBuilder sb = new StringBuilder();

return sb.append("State [cutoff=").append(cutoff).append(", rebindAddress=").append(rebindAddress).append("]")
.toString();
}

}

private final AtomicReference<State> state = new AtomicReference<>();
Expand Down Expand Up @@ -401,11 +408,17 @@ public void rebind(Duration duration, SocketAddress rebindAddress) {
public Mono<SocketAddress> wrappedSupplier(Mono<SocketAddress> original) {
return Mono.defer(() -> {
State current = state.get();
logger.debug("RebindAwareAddressSupplier rebind state: {}", state.get());
if (current != null && current.rebindAddress != null && clock.instant().isBefore(current.cutoff)) {
return Mono.just(current.rebindAddress);
logger.debug("RebindAwareAddressSupplier using rebind address: {}", state.get());
return Mono.just(current.rebindAddress)
.doOnSubscribe(s -> logger.debug("RebindAwareAddressSupplier subscribed to rebind address"))
.doOnNext(address -> logger.debug("RebindAwareAddressSupplier rebind address: {}", address));
} else {
logger.debug("RebindAwareAddressSupplier falling back to original.");
state.compareAndSet(current, null);
return original;
return original.doOnSubscribe(s -> logger.debug("RebindAwareAddressSupplier original to rebind address"))
.doOnNext(address -> logger.debug("RebindAwareAddressSupplier original address: {}", address));
}
});
}
Expand Down