Skip to content

Suppress subsequent connection errors into existing error #594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.IdentityResolver;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
Expand Down Expand Up @@ -66,6 +65,7 @@
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.internal.metrics.spi.Metrics.isMetricsEnabled;
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;

public class DriverFactory
{
Expand Down Expand Up @@ -371,10 +371,7 @@ private static void closeConnectionPoolAndSuppressError( ConnectionPool connecti
}
catch ( Throwable closeError )
{
if ( mainError != closeError )
{
mainError.addSuppressed( closeError );
}
addSuppressed( mainError, closeError );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

Expand Down Expand Up @@ -403,7 +404,7 @@ private <T> void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, C
{
if ( rollbackError != null )
{
error.addSuppressed( rollbackError );
addSuppressed( error, rollbackError );
}
resultFuture.completeExceptionally( error );
} );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error )
else
{
failed = true;
log.error( "Fatal error occurred in the pipeline", error );
log.warn( "Fatal error occurred in the pipeline", error );
fail( ctx, error );
}
}

private void fail( ChannelHandlerContext ctx, Throwable error )
{
Throwable cause = transformError( error );
messageDispatcher.handleFatalError( cause );
messageDispatcher.handleChannelError( cause );
log.debug( "Closing channel because of a failure '%s'", error );
ctx.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.messaging.request.ResetMessage.RESET;
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;

public class InboundMessageDispatcher implements ResponseMessageHandler
{
Expand Down Expand Up @@ -141,9 +142,17 @@ public void handleIgnoredMessage()
handler.onFailure( error );
}

public void handleFatalError( Throwable error )
public void handleChannelError( Throwable error )
{
currentError = error;
if ( currentError != null )
{
// we already have an error, this new error probably is caused by the existing one, thus we chain the new error on this current error
addSuppressed( currentError, error );
}
else
{
currentError = error;
}
fatalErrorOccurred = true;

while ( !handlers.isEmpty() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channe
@Override
public void channelReleased( Channel channel )
{
log.debug( "Channel [%s] released back to the pool", channel.id() );
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
decrementInUse( channel );
incrementIdle( channel );
channel.closeFuture().addListener( closeListener );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ private static String extractClassification( String code )
return parts[1];
}

public static void addSuppressed( Throwable mainError, Throwable error )
{
if ( mainError != error )
{
mainError.addSuppressed( error );
}
}

/**
* Exception which is merely a holder of an async stacktrace, which is not the primary stacktrace users are interested in.
* Used for blocking API calls that block on async API calls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.driver.internal.async.EventLoopGroupFactory;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.ErrorUtil.addSuppressed;

public final class Futures
{
Expand Down Expand Up @@ -184,10 +185,7 @@ public static CompletionException combineErrors( Throwable error1, Throwable err
{
Throwable cause1 = completionExceptionCause( error1 );
Throwable cause2 = completionExceptionCause( error2 );
if ( cause1 != cause2 )
{
cause1.addSuppressed( cause2 );
}
addSuppressed( cause1, cause2 );
return asCompletionException( cause1 );
}
else if ( error1 != null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,7 @@ void shouldSendReadAccessModeInStatementMetadata() throws Exception
{
StubServer server = StubServer.start( "hello_run_exit_read.script", 9001 );

Config config = Config.builder()
.withoutEncryption()
.build();

try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config );
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
Session session = driver.session( AccessMode.READ ) )
{
List<String> names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() );
Expand All @@ -170,11 +166,7 @@ void shouldNotSendWriteAccessModeInStatementMetadata() throws Exception
{
StubServer server = StubServer.start( "hello_run_exit.script", 9001 );

Config config = Config.builder()
.withoutEncryption()
.build();

try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", config );
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
Session session = driver.session( AccessMode.WRITE ) )
{
List<String> names = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() );
Expand Down Expand Up @@ -240,6 +232,50 @@ void shouldThrowRollbackErrorWhenTransactionClosed() throws Exception
testTxCloseErrorPropagation( "rollback_error.script", false, "Unable to rollback" );
}

@Test
void shouldThrowCorrectErrorOnRunFailure() throws Throwable
{
StubServer server = StubServer.start( "database_shutdown.script", 9001 );

try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
Session session = driver.session( "neo4j:bookmark:v1:tx0" );
// has to enforce to flush BEGIN to have tx started.
Transaction transaction = session.beginTransaction() )
{
TransientException error = assertThrows( TransientException.class, () -> {
StatementResult result = transaction.run( "RETURN 1" );
result.consume();
} );
assertThat( error.code(), equalTo( "Neo.TransientError.General.DatabaseUnavailable" ) );
}
finally
{
assertEquals( 0, server.exitStatus() );
}
}

@Test
void shouldThrowCorrectErrorOnCommitFailure() throws Throwable
{
StubServer server = StubServer.start( "database_shutdown_at_commit.script", 9001 );

try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", INSECURE_CONFIG );
Session session = driver.session() )
{
Transaction transaction = session.beginTransaction();
StatementResult result = transaction.run( "CREATE (n {name:'Bob'})" );
result.consume();
transaction.success();

TransientException error = assertThrows( TransientException.class, transaction::close );
assertThat( error.code(), equalTo( "Neo.TransientError.General.DatabaseUnavailable" ) );
}
finally
{
assertEquals( 0, server.exitStatus() );
}
}

private static void testTransactionCloseErrorPropagationWhenSessionClosed( String script, boolean commit,
String expectedErrorMessage ) throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.neo4j.driver.v1.TransactionWork;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
import org.neo4j.driver.v1.exceptions.TransientException;
import org.neo4j.driver.v1.net.ServerAddress;
import org.neo4j.driver.v1.net.ServerAddressResolver;
import org.neo4j.driver.v1.util.StubServer;
Expand Down Expand Up @@ -764,6 +765,41 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exc
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
}

@Test
void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemovedV3() throws Exception
{
// This test simulates a router in a cluster when a leader is removed.
// The router first returns a RT with a writer inside.
// However this writer is killed while the driver is running a tx with it.
// Then at the second time the router returns the same RT with the killed writer inside.
// At the third round, the router removes the the writer server from RT reply.
// Finally, the router returns a RT with a reachable writer.
StubServer router = StubServer.start( "acquire_endpoints_v3_leader_killed.script", 9001 );
StubServer brokenWriter = StubServer.start( "database_shutdown_at_commit.script", 9004 );
StubServer writer = StubServer.start( "write_server.script", 9008 );

Logger logger = mock( Logger.class );
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
Session session = driver.session() )
{
AtomicInteger invocations = new AtomicInteger();
List<Record> records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) );

assertEquals( 0, records.size() );
assertEquals( 2, invocations.get() );
}
finally
{
assertEquals( 0, router.exitStatus() );
assertEquals( 0, brokenWriter.exitStatus() );
assertEquals( 0, writer.exitStatus() );
}
verify( logger, times( 1 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( TransientException.class ) );
verify( logger, times( 2 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
}

@Test
void shouldRetryReadTransactionUntilFailure() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -157,7 +158,7 @@ void shouldPeekHandlerOnRecord()
}

@Test
void shouldFailAllHandlersOnFatalError()
void shouldFailAllHandlersOnChannelError()
{
InboundMessageDispatcher dispatcher = newDispatcher();

Expand All @@ -170,7 +171,7 @@ void shouldFailAllHandlersOnFatalError()
dispatcher.enqueue( handler3 );

RuntimeException fatalError = new RuntimeException( "Fatal!" );
dispatcher.handleFatalError( fatalError );
dispatcher.handleChannelError( fatalError );

InOrder inOrder = inOrder( handler1, handler2, handler3 );
inOrder.verify( handler1 ).onFailure( fatalError );
Expand All @@ -179,19 +180,36 @@ void shouldFailAllHandlersOnFatalError()
}

@Test
void shouldFailNewHandlerAfterFatalError()
void shouldFailNewHandlerAfterChannelError()
{
InboundMessageDispatcher dispatcher = newDispatcher();

RuntimeException fatalError = new RuntimeException( "Fatal!" );
dispatcher.handleFatalError( fatalError );
dispatcher.handleChannelError( fatalError );

ResponseHandler handler = mock( ResponseHandler.class );
dispatcher.enqueue( handler );

verify( handler ).onFailure( fatalError );
}

@Test
void shouldAttachChannelErrorOnExistingError()
{
InboundMessageDispatcher dispatcher = newDispatcher();

ResponseHandler handler = mock( ResponseHandler.class );
dispatcher.enqueue( handler );

dispatcher.handleFailureMessage( "Neo.ClientError", "First error!" );
RuntimeException fatalError = new RuntimeException( "Second Error!" );
dispatcher.handleChannelError( fatalError );

verify( handler ).onFailure( argThat(
error -> error instanceof ClientException && error.getMessage().equals( "First error!" ) &&
error.getSuppressed().length == 1 && error.getSuppressed()[0].getMessage().equals( "Second Error!" ) ) );
}

@Test
void shouldDequeHandlerOnIgnored()
{
Expand Down
12 changes: 12 additions & 0 deletions driver/src/test/resources/database_shutdown.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
!: BOLT 3

C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
C: RESET
S: SUCCESS {}
C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx0"]}
S: SUCCESS {}
C: RUN "RETURN 1" {} {}
PULL_ALL
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database shut down."}
S: <EXIT>
14 changes: 14 additions & 0 deletions driver/src/test/resources/database_shutdown_at_commit.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
!: BOLT 3
!: AUTO RESET

C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
C: BEGIN {}
RUN "CREATE (n {name:'Bob'})" {} {}
PULL_ALL
S: SUCCESS {}
SUCCESS {}
SUCCESS {}
C: COMMIT
S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Database shut down."}
S: <EXIT>