Skip to content

Commit 5d335cb

Browse files
author
Zhen Li
committed
Driver should suppress subsequent connection errors into existing errors if any.
When a server is shutting down, it is possible that a transaction is killed in the middle, followed by the termination of connection. Before the connection is shutting down, the server might send an error message if there is at least one pending message sent by client. However the server will also ignore any further messages that are queued in the server inbound message queue. Thus when the client trying to send or reading more with the closed connection, the client will receive a connection terminated error. This PR makes sure when the above situation happens, the client will throw the original error first.
1 parent 71ff948 commit 5d335cb

File tree

7 files changed

+142
-18
lines changed

7 files changed

+142
-18
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
8888
else
8989
{
9090
failed = true;
91-
log.error( "Fatal error occurred in the pipeline", error );
91+
log.warn( "Fatal error occurred in the pipeline", error );
9292
fail( ctx, error );
9393
}
9494
}
9595

9696
private void fail( ChannelHandlerContext ctx, Throwable error )
9797
{
9898
Throwable cause = transformError( error );
99-
messageDispatcher.handleFatalError( cause );
99+
messageDispatcher.handleChannelError( cause );
100100
log.debug( "Closing channel because of a failure '%s'", error );
101101
ctx.close();
102102
}

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,17 @@ public void handleIgnoredMessage()
141141
handler.onFailure( error );
142142
}
143143

144-
public void handleFatalError( Throwable error )
144+
public void handleChannelError( Throwable error )
145145
{
146-
currentError = error;
146+
if ( currentError != null )
147+
{
148+
// we already have an error, this new error probably is caused by the existing one, thus we chain the new error on this current error
149+
currentError.addSuppressed( error );
150+
}
151+
else
152+
{
153+
currentError = error;
154+
}
147155
fatalErrorOccurred = true;
148156

149157
while ( !handlers.isEmpty() )

driver/src/test/java/org/neo4j/driver/internal/DirectDriverBoltKitTest.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,7 @@ void shouldSendReadAccessModeInStatementMetadata() throws Exception
149149
{
150150
StubServer server = StubServer.start( "hello_run_exit_read.script", 9001 );
151151

152-
Config config = Config.builder()
153-
.withoutEncryption()
154-
.build();
155-
156-
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config );
152+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
157153
Session session = driver.session( AccessMode.READ ) )
158154
{
159155
List<String> names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() );
@@ -170,11 +166,7 @@ void shouldNotSendWriteAccessModeInStatementMetadata() throws Exception
170166
{
171167
StubServer server = StubServer.start( "hello_run_exit.script", 9001 );
172168

173-
Config config = Config.builder()
174-
.withoutEncryption()
175-
.build();
176-
177-
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config );
169+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
178170
Session session = driver.session( AccessMode.WRITE ) )
179171
{
180172
List<String> names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() );
@@ -240,6 +232,50 @@ void shouldThrowRollbackErrorWhenTransactionClosed() throws Exception
240232
testTxCloseErrorPropagation( "rollback_error.script", false, "Unable to rollback" );
241233
}
242234

235+
@Test
236+
void shouldThrowCorrectErrorOnRunFailure() throws Throwable
237+
{
238+
StubServer server = StubServer.start( "database_shutdown.script", 9001 );
239+
240+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
241+
Session session = driver.session( "neo4j:bookmark:v1:tx0" );
242+
// has to enforce to flush BEGIN to have tx started.
243+
Transaction transaction = session.beginTransaction() )
244+
{
245+
TransientException error = assertThrows( TransientException.class, () -> {
246+
StatementResult result = transaction.run( "RETURN 1" );
247+
result.consume();
248+
} );
249+
assertThat( error.code(), equalTo( "Neo.TransientError.General.DatabaseUnavailable" ) );
250+
}
251+
finally
252+
{
253+
assertEquals( 0, server.exitStatus() );
254+
}
255+
}
256+
257+
@Test
258+
void shouldThrowCorrectErrorOnCommitFailure() throws Throwable
259+
{
260+
StubServer server = StubServer.start( "database_shutdown_at_commit.script", 9001 );
261+
262+
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
263+
Session session = driver.session() )
264+
{
265+
Transaction transaction = session.beginTransaction();
266+
StatementResult result = transaction.run( "CREATE (n {name:'Bob'})" );
267+
result.consume();
268+
transaction.success();
269+
270+
TransientException error = assertThrows( TransientException.class, transaction::close );
271+
assertThat( error.code(), equalTo( "Neo.TransientError.General.DatabaseUnavailable" ) );
272+
}
273+
finally
274+
{
275+
assertEquals( 0, server.exitStatus() );
276+
}
277+
}
278+
243279
private static void testTransactionCloseErrorPropagationWhenSessionClosed( String script, boolean commit,
244280
String expectedErrorMessage ) throws Exception
245281
{

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.neo4j.driver.v1.TransactionWork;
4949
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
5050
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
51+
import org.neo4j.driver.v1.exceptions.TransientException;
5152
import org.neo4j.driver.v1.net.ServerAddress;
5253
import org.neo4j.driver.v1.net.ServerAddressResolver;
5354
import org.neo4j.driver.v1.util.StubServer;
@@ -764,6 +765,41 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exc
764765
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
765766
}
766767

768+
@Test
769+
void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemovedV3() throws Exception
770+
{
771+
// This test simulates a router in a cluster when a leader is removed.
772+
// The router first returns a RT with a writer inside.
773+
// However this writer is killed while the driver is running a tx with it.
774+
// Then at the second time the router returns the same RT with the killed writer inside.
775+
// At the third round, the router removes the the writer server from RT reply.
776+
// Finally, the router returns a RT with a reachable writer.
777+
StubServer router = StubServer.start( "acquire_endpoints_v3_leader_killed.script", 9001 );
778+
StubServer brokenWriter = StubServer.start( "database_shutdown_at_commit.script", 9004 );
779+
StubServer writer = StubServer.start( "write_server.script", 9008 );
780+
781+
Logger logger = mock( Logger.class );
782+
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
783+
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
784+
Session session = driver.session() )
785+
{
786+
AtomicInteger invocations = new AtomicInteger();
787+
List<Record> records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) );
788+
789+
assertEquals( 0, records.size() );
790+
assertEquals( 2, invocations.get() );
791+
}
792+
finally
793+
{
794+
assertEquals( 0, router.exitStatus() );
795+
assertEquals( 0, brokenWriter.exitStatus() );
796+
assertEquals( 0, writer.exitStatus() );
797+
}
798+
verify( logger, times( 1 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( TransientException.class ) );
799+
verify( logger, times( 2 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
800+
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
801+
}
802+
767803
@Test
768804
void shouldRetryReadTransactionUntilFailure() throws Exception
769805
{

driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.junit.jupiter.api.Assertions.assertThrows;
4242
import static org.mockito.ArgumentMatchers.any;
4343
import static org.mockito.ArgumentMatchers.anyBoolean;
44+
import static org.mockito.ArgumentMatchers.argThat;
4445
import static org.mockito.ArgumentMatchers.eq;
4546
import static org.mockito.Mockito.inOrder;
4647
import static org.mockito.Mockito.mock;
@@ -157,7 +158,7 @@ void shouldPeekHandlerOnRecord()
157158
}
158159

159160
@Test
160-
void shouldFailAllHandlersOnFatalError()
161+
void shouldFailAllHandlersOnChannelError()
161162
{
162163
InboundMessageDispatcher dispatcher = newDispatcher();
163164

@@ -170,7 +171,7 @@ void shouldFailAllHandlersOnFatalError()
170171
dispatcher.enqueue( handler3 );
171172

172173
RuntimeException fatalError = new RuntimeException( "Fatal!" );
173-
dispatcher.handleFatalError( fatalError );
174+
dispatcher.handleChannelError( fatalError );
174175

175176
InOrder inOrder = inOrder( handler1, handler2, handler3 );
176177
inOrder.verify( handler1 ).onFailure( fatalError );
@@ -179,19 +180,36 @@ void shouldFailAllHandlersOnFatalError()
179180
}
180181

181182
@Test
182-
void shouldFailNewHandlerAfterFatalError()
183+
void shouldFailNewHandlerAfterChannelError()
183184
{
184185
InboundMessageDispatcher dispatcher = newDispatcher();
185186

186187
RuntimeException fatalError = new RuntimeException( "Fatal!" );
187-
dispatcher.handleFatalError( fatalError );
188+
dispatcher.handleChannelError( fatalError );
188189

189190
ResponseHandler handler = mock( ResponseHandler.class );
190191
dispatcher.enqueue( handler );
191192

192193
verify( handler ).onFailure( fatalError );
193194
}
194195

196+
@Test
197+
void shouldAttachChannelErrorOnExistingError()
198+
{
199+
InboundMessageDispatcher dispatcher = newDispatcher();
200+
201+
ResponseHandler handler = mock( ResponseHandler.class );
202+
dispatcher.enqueue( handler );
203+
204+
dispatcher.handleFailureMessage( "Neo.ClientError", "First error!" );
205+
RuntimeException fatalError = new RuntimeException( "Second Error!" );
206+
dispatcher.handleChannelError( fatalError );
207+
208+
verify( handler ).onFailure( argThat(
209+
error -> error instanceof ClientException && error.getMessage().equals( "First error!" ) &&
210+
error.getSuppressed().length == 1 && error.getSuppressed()[0].getMessage().equals( "Second Error!" ) ) );
211+
}
212+
195213
@Test
196214
void shouldDequeHandlerOnIgnored()
197215
{
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
!: BOLT 3
2+
3+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
4+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
5+
C: RESET
6+
S: SUCCESS {}
7+
C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx0"]}
8+
S: SUCCESS {}
9+
C: RUN "RETURN 1" {} {}
10+
PULL_ALL
11+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database shut down."}
12+
S: <EXIT>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
!: BOLT 3
2+
!: AUTO RESET
3+
4+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
5+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
6+
C: BEGIN {}
7+
RUN "CREATE (n {name:'Bob'})" {} {}
8+
PULL_ALL
9+
S: SUCCESS {}
10+
SUCCESS {}
11+
SUCCESS {}
12+
C: COMMIT
13+
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database shut down."}
14+
S: <EXIT>

0 commit comments

Comments
 (0)