Skip to content

Fixed error when connection killed during transaction #454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import org.neo4j.driver.internal.async.QueryRunner;
Expand Down Expand Up @@ -69,7 +68,8 @@ private enum State
MARKED_FAILED( true ),

/**
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
* This transaction has been terminated either because of explicit {@link Session#reset()} or because of a
* fatal connection error.
*/
TERMINATED( false ),

Expand Down Expand Up @@ -158,7 +158,7 @@ CompletionStage<Void> closeAsync()
{
return commitAsync();
}
else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED )
else if ( state != State.COMMITTED && state != State.ROLLED_BACK )
{
return rollbackAsync();
}
Expand All @@ -181,14 +181,13 @@ else if ( state == State.ROLLED_BACK )
}
else if ( state == State.TERMINATED )
{
return failedFuture(
new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) );
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
}
else
{
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
.whenComplete( transactionClosed( State.COMMITTED ) );
.whenComplete( ( ignore, error ) -> transactionClosed( State.COMMITTED ) );
}
}

Expand All @@ -205,15 +204,15 @@ else if ( state == State.ROLLED_BACK )
}
else if ( state == State.TERMINATED )
{
// transaction has been terminated by RESET and should be rolled back by the database
state = State.ROLLED_BACK;
// no need for explicit rollback, transaction should've been rolled back by the database
transactionClosed( State.ROLLED_BACK );
return completedWithNull();
}
else
{
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
.whenComplete( ( ignore, error ) -> transactionClosed( State.ROLLED_BACK ) );
}
}

Expand Down Expand Up @@ -314,8 +313,7 @@ else if ( state == State.MARKED_FAILED )
}
else if ( state == State.TERMINATED )
{
throw new ClientException(
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
throw new ClientException( "Cannot run more statements in this transaction, it has been terminated" );
}
}

Expand Down Expand Up @@ -394,14 +392,11 @@ else if ( commitOrRollbackError != null )
};
}

private BiConsumer<Object,Throwable> transactionClosed( State newState )
private void transactionClosed( State newState )
{
return ( ignore, error ) ->
{
state = newState;
connection.release(); // release in background
session.setBookmark( bookmark );
};
state = newState;
connection.release(); // release in background
session.setBookmark( bookmark );
}

private void terminateConnectionOnThreadInterrupt( String reason )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.v1.Statement;

import static java.util.Objects.requireNonNull;
Expand All @@ -43,6 +44,13 @@ protected void afterSuccess()
@Override
protected void afterFailure( Throwable error )
{
tx.failure();
if ( ErrorUtil.isFatal( error ) )
{
tx.markTerminated();
}
else
{
tx.failure();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,18 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds()
verify( connection, never() ).release();
}

@Test
public void shouldReleaseConnectionWhenTerminatedAndRolledBack()
{
Connection connection = connectionMock();
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );

tx.markTerminated();
await( tx.rollbackAsync() );

verify( connection ).release();
}

private static ExplicitTransaction beginTx( Connection connection )
{
return beginTx( connection, Bookmark.empty() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@

import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
import org.neo4j.driver.v1.exceptions.TransientException;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -35,7 +40,26 @@
public class TransactionPullAllResponseHandlerTest
{
@Test
public void shouldMarkTransactionAsFailedOnFailure()
public void shouldMarkTransactionAsFailedOnNonFatalFailures()
{
testErrorHandling( new ClientException( "Neo.ClientError.Cluster.NotALeader", "" ), false );
testErrorHandling( new ClientException( "Neo.ClientError.Procedure.ProcedureCallFailed", "" ), false );
testErrorHandling( new TransientException( "Neo.TransientError.Transaction.Terminated", "" ), false );
testErrorHandling( new TransientException( "Neo.TransientError.General.DatabaseUnavailable", "" ), false );
}

@Test
public void shouldMarkTransactionAsTerminatedOnFatalFailures()
{
testErrorHandling( new RuntimeException(), true );
testErrorHandling( new IOException(), true );
testErrorHandling( new ServiceUnavailableException( "" ), true );
testErrorHandling( new SessionExpiredException( "" ), true );
testErrorHandling( new SessionExpiredException( "" ), true );
testErrorHandling( new ClientException( "Neo.ClientError.Request.Invalid" ), true );
}

private static void testErrorHandling( Throwable error, boolean fatal )
{
Connection connection = mock( Connection.class );
when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT );
Expand All @@ -44,8 +68,15 @@ public void shouldMarkTransactionAsFailedOnFailure()
TransactionPullAllResponseHandler handler = new TransactionPullAllResponseHandler( new Statement( "RETURN 1" ),
new RunResponseHandler( new CompletableFuture<>() ), connection, tx );

handler.onFailure( new RuntimeException() );
handler.onFailure( error );

verify( tx ).failure();
if ( fatal )
{
verify( tx ).markTerminated();
}
else
{
verify( tx ).failure();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ConnectionSettings;
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.ChannelConnector;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
Expand All @@ -36,11 +37,24 @@
public class ChannelTrackingDriverFactory extends DriverFactoryWithClock
{
private final List<Channel> channels = new CopyOnWriteArrayList<>();
private final int eventLoopThreads;
private ConnectionPool pool;

public ChannelTrackingDriverFactory( Clock clock )
{
this( 0, clock );
}

public ChannelTrackingDriverFactory( int eventLoopThreads, Clock clock )
{
super( clock );
this.eventLoopThreads = eventLoopThreads;
}

@Override
protected Bootstrap createBootstrap()
{
return eventLoopThreads == 0 ? super.createBootstrap() : BootstrapFactory.newBootstrap( eventLoopThreads );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable
assertThat( tx2, notNullValue() );

exception.expect( ClientException.class ); // errors differ depending of neo4j version
exception.expectMessage(
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" );

tx1.run( "RETURN 1" );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,15 +653,16 @@ public void shouldFailWhenListTransformationFunctionFails()
}

@Test
public void shouldFailWhenServerIsRestarted()
public void shouldFailToCommitWhenServerIsRestarted()
{
Transaction tx = await( session.beginTransactionAsync() );

await( tx.runAsync( "CREATE ()" ) );

neo4j.killDb();

try
{
await( tx.runAsync( "CREATE ()" ) );
await( tx.commitAsync() );
fail( "Exception expected" );
}
Expand Down Expand Up @@ -806,7 +807,7 @@ public void shouldFailToCommitAfterTermination()
}
catch ( ClientException e )
{
assertEquals( "Can't commit, transaction has been terminated by `Session#reset()`", e.getMessage() );
assertEquals( "Can't commit, transaction has been terminated", e.getMessage() );
}
assertFalse( tx.isOpen() );
}
Expand Down Expand Up @@ -924,8 +925,7 @@ public void shouldFailToRunQueryWhenTerminated()
}
catch ( ClientException e )
{
assertEquals( "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`",
e.getMessage() );
assertEquals( "Cannot run more statements in this transaction, it has been terminated", e.getMessage() );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.v1.integration;

import io.netty.channel.Channel;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -27,6 +28,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
Expand All @@ -39,12 +45,14 @@

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.retry.RetrySettings.DEFAULT;

public class TransactionIT
{
Expand Down Expand Up @@ -249,7 +257,7 @@ public void shouldHandleResetBeforeRun() throws Throwable
{
// Expect
exception.expect( ClientException.class );
exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated by" );
exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" );
// When
Transaction tx = session.beginTransaction();
session.reset();
Expand Down Expand Up @@ -391,7 +399,7 @@ public void shouldPropagateFailureFromSummary()
@Test
public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exception
{
try ( Session otherSession = this.session.driver().session() )
try ( Session otherSession = session.driver().session() )
{
session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume();

Expand Down Expand Up @@ -425,7 +433,7 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exc
@Test
public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exception
{
try ( Session otherSession = this.session.driver().session() )
try ( Session otherSession = session.driver().session() )
{
session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume();

Expand Down Expand Up @@ -458,4 +466,52 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exc
}
}
}

@Test
public void shouldThrowWhenConnectionKilledDuringTransaction()
{
testFailWhenConnectionKilledDuringTransaction( false );
}

@Test
public void shouldThrowWhenConnectionKilledDuringTransactionMarkedForSuccess()
{
testFailWhenConnectionKilledDuringTransaction( true );
}

private void testFailWhenConnectionKilledDuringTransaction( boolean markForSuccess )
{
ChannelTrackingDriverFactory factory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM );
RoutingSettings instance = new RoutingSettings( 1, 0 );
Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig();

try ( Driver driver = factory.newInstance( session.uri(), session.authToken(), instance, DEFAULT, config ) )
{
try ( Session session = driver.session();
Transaction tx = session.beginTransaction() )
{
tx.run( "CREATE (:MyNode {id: 1})" ).consume();

if ( markForSuccess )
{
tx.success();
}

// kill all network channels
for ( Channel channel : factory.channels() )
{
channel.close().syncUninterruptibly();
}

tx.run( "CREATE (:MyNode {id: 1})" ).consume();
fail( "Exception expected" );
}
catch ( ServiceUnavailableException e )
{
assertThat( e.getMessage(), containsString( "Connection to the database terminated" ) );
}
}

assertEquals( 0, session.run( "MATCH (n:MyNode {id: 1}) RETURN count(n)" ).single().get( 0 ).asInt() );
}
}