Skip to content

Commit 6d39cd7

Browse files
committed
Fix Bolt handshake write handling and timeout management
1 parent 100762f commit 6d39cd7

File tree

3 files changed

+34
-3
lines changed

3 files changed

+34
-3
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectedListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.neo4j.driver.internal.async.connection;
1818

1919
import static java.lang.String.format;
20-
import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.handshakeBuf;
2120
import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.handshakeString;
2221

2322
import io.netty.channel.ChannelFuture;
@@ -57,7 +56,12 @@ public void operationComplete(ChannelFuture future) {
5756
var pipeline = channel.pipeline();
5857
pipeline.addLast(new HandshakeHandler(pipelineBuilder, handshakeCompletedPromise, logging));
5958
log.debug("C: [Bolt Handshake] %s", handshakeString());
60-
channel.writeAndFlush(handshakeBuf(), channel.voidPromise());
59+
channel.writeAndFlush(BoltProtocolUtil.handshakeBuf()).addListener(f -> {
60+
if (!f.isSuccess()) {
61+
this.handshakeCompletedPromise.setFailure(new ServiceUnavailableException(
62+
String.format("Unable to write Bolt handshake to %s.", this.address), f.cause()));
63+
}
64+
});
6165
} else {
6266
handshakeCompletedPromise.setFailure(databaseUnavailableError(address, future.cause()));
6367
}

driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ private void installHandshakeCompletedListeners(
142142

143143
// remove timeout handler from the pipeline once TLS and Bolt handshakes are completed. regular protocol
144144
// messages will flow next and we do not want to have read timeout for them
145-
handshakeCompleted.addListener(future -> pipeline.remove(ConnectTimeoutHandler.class));
145+
handshakeCompleted.addListener(future -> {
146+
if (future.isSuccess()) {
147+
pipeline.remove(ConnectTimeoutHandler.class);
148+
}
149+
});
146150

147151
// add listener that sends an INIT message. connection is now fully established. channel pipeline if fully
148152
// set to send/receive messages for a selected protocol version

driver/src/test/java/org/neo4j/driver/internal/async/connection/ChannelConnectedListenerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.neo4j.driver.internal.async.connection;
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
21+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
2022
import static org.junit.jupiter.api.Assertions.assertNotNull;
2123
import static org.junit.jupiter.api.Assertions.assertThrows;
2224
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,7 +29,9 @@
2729

2830
import io.netty.channel.ChannelPromise;
2931
import io.netty.channel.embedded.EmbeddedChannel;
32+
import io.netty.util.concurrent.Future;
3033
import java.io.IOException;
34+
import java.util.concurrent.CompletableFuture;
3135
import org.junit.jupiter.api.AfterEach;
3236
import org.junit.jupiter.api.Test;
3337
import org.neo4j.driver.exceptions.ServiceUnavailableException;
@@ -70,6 +74,25 @@ void shouldWriteHandshakeWhenChannelConnected() {
7074
assertEquals(handshakeBuf(), channel.readOutbound());
7175
}
7276

77+
@Test
78+
void shouldCompleteHandshakePromiseExceptionallyOnWriteFailure() {
79+
var handshakeCompletedPromise = channel.newPromise();
80+
var listener = newListener(handshakeCompletedPromise);
81+
var channelConnectedPromise = channel.newPromise();
82+
channelConnectedPromise.setSuccess();
83+
channel.close();
84+
85+
listener.operationComplete(channelConnectedPromise);
86+
87+
assertTrue(handshakeCompletedPromise.isDone());
88+
var future = new CompletableFuture<Future<?>>();
89+
handshakeCompletedPromise.addListener(future::complete);
90+
var handshakeFuture = future.join();
91+
assertTrue(handshakeFuture.isDone());
92+
assertFalse(handshakeFuture.isSuccess());
93+
assertInstanceOf(ServiceUnavailableException.class, handshakeFuture.cause());
94+
}
95+
7396
private static ChannelConnectedListener newListener(ChannelPromise handshakeCompletedPromise) {
7497
return new ChannelConnectedListener(
7598
LOCAL_DEFAULT, new ChannelPipelineBuilderImpl(), handshakeCompletedPromise, DEV_NULL_LOGGING);

0 commit comments

Comments
 (0)