Skip to content

Propagate not consumed failures when closing session and transaction #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import org.neo4j.driver.internal.async.InternalStatementResultCursor;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.async.ResultCursorsHolder;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
Expand All @@ -43,6 +48,7 @@

import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.internal.util.Futures.getBlocking;
import static org.neo4j.driver.v1.Values.value;
Expand All @@ -56,28 +62,36 @@ public class ExplicitTransaction implements Transaction
private enum State
{
/** The transaction is running with no explicit success or failure marked */
ACTIVE,
ACTIVE( true ),

/** Running, user marked for success, meaning it'll value committed */
MARKED_SUCCESS,
MARKED_SUCCESS( true ),

/** User marked as failed, meaning it'll be rolled back. */
MARKED_FAILED,
MARKED_FAILED( true ),

/**
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
*/
TERMINATED,
TERMINATED( false ),

/** This transaction has successfully committed */
COMMITTED,
COMMITTED( false ),

/** This transaction has been rolled back */
ROLLED_BACK
ROLLED_BACK( false );

final boolean txOpen;

State( boolean txOpen )
{
this.txOpen = txOpen;
}
}

private final Connection connection;
private final NetworkSession session;
private final ResultCursorsHolder resultCursors;

private volatile Bookmark bookmark = Bookmark.empty();
private volatile State state = State.ACTIVE;
Expand All @@ -86,6 +100,7 @@ public ExplicitTransaction( Connection connection, NetworkSession session )
{
this.connection = connection;
this.session = session;
this.resultCursors = new ResultCursorsHolder();
}

public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
Expand Down Expand Up @@ -162,7 +177,9 @@ else if ( state == State.TERMINATED )
}
else
{
return doCommitAsync().whenComplete( transactionClosed( State.COMMITTED ) );
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
.whenComplete( transactionClosed( State.COMMITTED ) );
}
}

Expand All @@ -185,38 +202,12 @@ else if ( state == State.TERMINATED )
}
else
{
return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) );
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
}
}

private BiConsumer<Void,Throwable> transactionClosed( State newState )
{
return ( ignore, error ) ->
{
state = newState;
connection.releaseInBackground();
session.setBookmark( bookmark );
};
}

private CompletionStage<Void> doCommitAsync()
{
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
new CommitTxResponseHandler( commitFuture, this ) );

return commitFuture.thenRun( () -> state = State.COMMITTED );
}

private CompletionStage<Void> doRollbackAsync()
{
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
new RollbackTxResponseHandler( rollbackFuture ) );

return rollbackFuture.thenRun( () -> state = State.ROLLED_BACK );
}

@Override
public StatementResult run( String statementText, Value statementParameters )
{
Expand Down Expand Up @@ -273,23 +264,31 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
@Override
public StatementResult run( Statement statement )
{
ensureCanRunQueries();
StatementResultCursor cursor = getBlocking( QueryRunner.runAsBlocking( connection, statement, this ) );
StatementResultCursor cursor = getBlocking( run( statement, false ) );
return new InternalStatementResult( cursor );
}

@Override
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
{
ensureCanRunQueries();
//noinspection unchecked
return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this );
return (CompletionStage) run( statement, true );
}

@Override
public boolean isOpen()
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean asAsync )
{
return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED;
ensureCanRunQueries();
CompletionStage<InternalStatementResultCursor> cursorStage;
if ( asAsync )
{
cursorStage = QueryRunner.runAsAsync( connection, statement, this );
}
else
{
cursorStage = QueryRunner.runAsBlocking( connection, statement, this );
}
resultCursors.add( cursorStage );
return cursorStage;
}

private void ensureCanRunQueries()
Expand Down Expand Up @@ -317,6 +316,12 @@ else if ( state == State.TERMINATED )
}
}

@Override
public boolean isOpen()
{
return state.txOpen;
}

@Override
public TypeSystem typeSystem()
{
Expand All @@ -340,4 +345,49 @@ public void setBookmark( Bookmark bookmark )
this.bookmark = bookmark;
}
}

private CompletionStage<Void> doCommitAsync()
{
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this );
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
return commitFuture;
}

private CompletionStage<Void> doRollbackAsync()
{
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture );
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
return rollbackFuture;
}

private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursorFailure )
{
return ( ignore, commitOrRollbackError ) ->
{
if ( cursorFailure != null )
{
throw new CompletionException( completionErrorCause( cursorFailure ) );
}
else if ( commitOrRollbackError != null )
{
throw new CompletionException( completionErrorCause( commitOrRollbackError ) );
}
else
{
return null;
}
};
}

private BiConsumer<Object,Throwable> transactionClosed( State newState )
{
return ( ignore, error ) ->
{
state = newState;
connection.releaseInBackground();
session.setBookmark( bookmark );
};
}
}
56 changes: 24 additions & 32 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.neo4j.driver.internal.async.InternalStatementResultCursor;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.async.ResultCursorsHolder;
import org.neo4j.driver.internal.logging.DelegatingLogger;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
Expand Down Expand Up @@ -59,12 +60,12 @@ public class NetworkSession implements Session
private final ConnectionProvider connectionProvider;
private final AccessMode mode;
private final RetryLogic retryLogic;
private final ResultCursorsHolder resultCursors;
protected final Logger logger;

private volatile Bookmark bookmark = Bookmark.empty();
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedFuture( null );
private volatile CompletionStage<Connection> connectionStage = completedFuture( null );
private volatile CompletionStage<InternalStatementResultCursor> lastResultStage = completedFuture( null );

private final AtomicBoolean open = new AtomicBoolean( true );

Expand All @@ -74,6 +75,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
this.resultCursors = new ResultCursorsHolder();
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
}

Expand Down Expand Up @@ -153,45 +155,34 @@ public boolean isOpen()
@Override
public void close()
{
if ( open.compareAndSet( true, false ) )
{
// todo: should closeAsync() also do this waiting for buffered result?
// todo: unit test result buffering?
getBlocking( lastResultStage
.exceptionally( error -> null )
.thenCompose( this::ensureBuffered )
.thenCompose( error -> releaseResources().thenApply( ignore ->
{
if ( error != null )
{
throw new CompletionException( error );
}
return null;
} ) ) );
}
getBlocking( closeAsync() );
}

@Override
public CompletionStage<Void> closeAsync()
{
// todo: wait for buffered result?
if ( open.compareAndSet( true, false ) )
{
return releaseResources();
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> releaseResources().thenApply( ignore ->
{
Throwable queryError = Futures.completionErrorCause( error );
if ( queryError != null )
{
// connection has been acquired and there is an unconsumed error in result cursor
throw new CompletionException( queryError );
}
else
{
// either connection acquisition failed or
// there are no unconsumed errors in the result cursor
return null;
}
} ) );
}
return completedFuture( null );
}

// todo: test this method
CompletionStage<Throwable> ensureBuffered( InternalStatementResultCursor cursor )
{
if ( cursor == null )
{
return completedFuture( null );
}
return cursor.resultBuffered();
}

@Override
public Transaction beginTransaction()
{
Expand Down Expand Up @@ -421,7 +412,7 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
{
ensureSessionIsOpen();

lastResultStage = ensureNoOpenTxBeforeRunningQuery()
CompletionStage<InternalStatementResultCursor> cursorStage = ensureNoOpenTxBeforeRunningQuery()
.thenCompose( ignore -> acquireConnection( mode ) )
.thenCompose( connection ->
{
Expand All @@ -435,7 +426,8 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
}
} );

return lastResultStage;
resultCursors.add( cursorStage );
return cursorStage;
}

private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
Expand Down Expand Up @@ -496,7 +488,7 @@ private CompletionStage<Void> rollbackTransaction()
} ).exceptionally( error ->
{
Throwable cause = Futures.completionErrorCause( error );
logger.error( "Failed to rollback active transaction", cause );
logger.warn( "Active transaction rolled back with an error", cause );
return null;
} );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.driver.v1.util.Function;
import org.neo4j.driver.v1.util.Functions;

// todo: unit tests
public class InternalStatementResultCursor implements StatementResultCursor
{
// todo: maybe smth better than these two string constants?
Expand Down Expand Up @@ -142,10 +143,9 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
return resultFuture;
}

// todo: test this method and give it better name
public CompletionStage<Throwable> resultBuffered()
public CompletionStage<Throwable> failureAsync()
{
return pullAllHandler.resultBuffered();
return pullAllHandler.failureAsync();
}

private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Conn
}

public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Connection connection,
Statement statement,
ExplicitTransaction tx )
Statement statement, ExplicitTransaction tx )
{
return runAsAsync( connection, statement, tx, false );
}
Expand All @@ -61,8 +60,7 @@ public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connect
}

public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
Statement statement,
ExplicitTransaction tx )
Statement statement, ExplicitTransaction tx )
{
return runAsAsync( connection, statement, tx, true );
}
Expand Down
Loading