Skip to content

Use AccessMode correctly in NetworkSession #426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
Expand Down Expand Up @@ -115,7 +116,17 @@ public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
connection.runAndFlush( BEGIN_QUERY, initialBookmark.asBeginTransactionParameters(),
NoOpResponseHandler.INSTANCE, new BeginTxResponseHandler<>( beginFuture, this ) );
return beginFuture;

return beginFuture.handle( ( tx, beginError ) ->
{
if ( beginError != null )
{
// release connection if begin failed, transaction can't be started
connection.releaseNow();
throw new CompletionException( Futures.completionErrorCause( beginError ) );
}
return tx;
} );
}
}

Expand Down
109 changes: 66 additions & 43 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.neo4j.driver.internal.async.InternalStatementResultCursor;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.async.ResultCursorsHolder;
import org.neo4j.driver.internal.logging.DelegatingLogger;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
Expand Down Expand Up @@ -60,12 +59,12 @@ public class NetworkSession implements Session
private final ConnectionProvider connectionProvider;
private final AccessMode mode;
private final RetryLogic retryLogic;
private final ResultCursorsHolder resultCursors;
protected final Logger logger;

private volatile Bookmark bookmark = Bookmark.empty();
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedFuture( null );
private volatile CompletionStage<Connection> connectionStage = completedFuture( null );
private volatile CompletionStage<InternalStatementResultCursor> resultCursorStage = completedFuture( null );

private final AtomicBoolean open = new AtomicBoolean( true );

Expand All @@ -75,7 +74,6 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
this.resultCursors = new ResultCursorsHolder();
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
}

Expand Down Expand Up @@ -163,22 +161,28 @@ public CompletionStage<Void> closeAsync()
{
if ( open.compareAndSet( true, false ) )
{
return resultCursors.retrieveNotConsumedError()
.thenCompose( error -> releaseResources().thenApply( ignore ->
{
Throwable queryError = Futures.completionErrorCause( error );
if ( queryError != null )
{
// connection has been acquired and there is an unconsumed error in result cursor
throw new CompletionException( queryError );
}
else
{
// either connection acquisition failed or
// there are no unconsumed errors in the result cursor
return null;
}
} ) );
return resultCursorStage.thenCompose( cursor ->
{
if ( cursor == null )
{
return completedFuture( null );
}
return cursor.failureAsync();
} ).thenCompose( error -> releaseResources().thenApply( ignore ->
{
Throwable queryError = Futures.completionErrorCause( error );
if ( queryError != null )
{
// connection has been acquired and there is an unconsumed error in result cursor
throw new CompletionException( queryError );
}
else
{
// either connection acquisition failed or
// there are no unconsumed errors in the result cursor
return null;
}
} ) );
}
return completedFuture( null );
}
Expand Down Expand Up @@ -275,7 +279,7 @@ CompletionStage<Boolean> currentConnectionIsOpen()
return connectionStage.handle( ( connection, error ) ->
error == null && // no acquisition error
connection != null && // some connection has actually been acquired
connection.isInUse() ); // and it's still being used
connection.isOpen() ); // and it's still open
}

private <T> T transaction( AccessMode mode, TransactionWork<T> work )
Expand Down Expand Up @@ -412,7 +416,7 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
{
ensureSessionIsOpen();

CompletionStage<InternalStatementResultCursor> cursorStage = ensureNoOpenTxBeforeRunningQuery()
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
.thenCompose( ignore -> acquireConnection( mode ) )
.thenCompose( connection ->
{
Expand All @@ -426,8 +430,9 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
}
} );

resultCursors.add( cursorStage );
return cursorStage;
resultCursorStage = newResultCursorStage.exceptionally( error -> null );

return newResultCursorStage;
}

private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
Expand All @@ -447,28 +452,46 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m

private CompletionStage<Connection> acquireConnection( AccessMode mode )
{
// memorize in local so same instance is transformed and used in callbacks
CompletionStage<Connection> currentAsyncConnectionStage = connectionStage;
CompletionStage<Connection> currentConnectionStage = connectionStage;

connectionStage = currentAsyncConnectionStage
.exceptionally( error -> null ) // handle previous acquisition failures
.thenCompose( connection ->
{
if ( connection != null && connection.tryMarkInUse() )
{
// previous acquisition attempt was successful and connection has not been released yet
// continue using same connection
return currentAsyncConnectionStage;
}
else
{
// previous acquisition attempt failed or connection has been released
// acquire new connection
return connectionProvider.acquireConnection( mode );
}
} );
CompletionStage<Connection> newConnectionStage = resultCursorStage.thenCompose( cursor ->
{
if ( cursor == null )
{
return completedFuture( null );
}
// make sure previous result is fully consumed and connection is released back to the pool
return cursor.failureAsync();
} ).thenCompose( error ->
{
if ( error == null )
{
// there is no unconsumed error, so one of the following is true:
// 1) this is first time connection is acquired in this session
// 2) previous result has been successful and is fully consumed
// 3) previous result failed and error has been consumed

// return existing connection, which should've been released back to the pool by now
return currentConnectionStage.exceptionally( ignore -> null );
}
else
{
// there exists unconsumed error, re-throw it
throw new CompletionException( error );
}
} ).thenCompose( existingConnection ->
{
if ( existingConnection != null && existingConnection.isOpen() )
{
// there somehow is an existing open connection, this should not happen, just a precondition
throw new IllegalStateException( "Existing open connection detected" );
}
return connectionProvider.acquireConnection( mode );
} );

connectionStage = newConnectionStage.exceptionally( error -> null );

return connectionStage;
return newConnectionStage;
}

private CompletionStage<Void> releaseResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ public class NettyConnection implements Connection
private final ChannelPool channelPool;
private final Clock clock;

private final AtomicBoolean open = new AtomicBoolean( true );
private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true );

private final NettyConnectionState state = new NettyConnectionState();

public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
{
this.channel = channel;
Expand All @@ -63,15 +62,9 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
}

@Override
public boolean isInUse()
{
return state.isInUse();
}

@Override
public boolean tryMarkInUse()
public boolean isOpen()
{
return state.markInUse();
return open.get();
}

@Override
Expand Down Expand Up @@ -109,7 +102,7 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
@Override
public void releaseInBackground()
{
if ( state.release() )
if ( open.compareAndSet( true, false ) )
{
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) );
}
Expand All @@ -118,7 +111,7 @@ public void releaseInBackground()
@Override
public CompletionStage<Void> releaseNow()
{
if ( state.forceRelease() )
if ( open.compareAndSet( true, false ) )
{
Promise<Void> releasePromise = channel.eventLoop().newPromise();
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ public RoutingConnection( Connection delegate, AccessMode accessMode, RoutingErr
this.errorHandler = errorHandler;
}

@Override
public boolean tryMarkInUse()
{
return delegate.tryMarkInUse();
}

@Override
public void enableAutoRead()
{
Expand Down Expand Up @@ -82,9 +76,9 @@ public void releaseInBackground()
}

@Override
public boolean isInUse()
public boolean isOpen()
{
return delegate.isInUse();
return delegate.isOpen();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@

public interface Connection
{
boolean isInUse();

boolean tryMarkInUse();
boolean isOpen();

void enableAutoRead();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
import org.junit.Test;
import org.mockito.InOrder;

import java.util.function.Consumer;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.v1.Transaction;

import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -220,6 +226,36 @@ public void shouldNotOverwriteBookmarkWithEmptyBookmark()
assertEquals( "Cat", tx.bookmark().maxBookmarkAsString() );
}

@Test
public void shouldReleaseConnectionWhenBeginFails()
{
RuntimeException error = new RuntimeException( "Wrong bookmark!" );
Connection connection = connectionWithBegin( handler -> handler.onFailure( error ) );
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );

try
{
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );
fail( "Exception expected" );
}
catch ( RuntimeException e )
{
assertEquals( error, e );
}

verify( connection ).releaseNow();
}

@Test
public void shouldNotReleaseConnectionWhenBeginSucceeds()
{
Connection connection = connectionWithBegin( handler -> handler.onSuccess( emptyMap() ) );
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );

verify( connection, never() ).releaseNow();
}

private static ExplicitTransaction beginTx( Connection connection )
{
return beginTx( connection, Bookmark.empty() );
Expand All @@ -236,4 +272,18 @@ private static ExplicitTransaction beginTx( Connection connection, NetworkSessio
ExplicitTransaction tx = new ExplicitTransaction( connection, session );
return getBlocking( tx.beginAsync( initialBookmark ) );
}

private static Connection connectionWithBegin( Consumer<ResponseHandler> beginBehaviour )
{
Connection connection = mock( Connection.class );

doAnswer( invocation ->
{
ResponseHandler beginHandler = invocation.getArgumentAt( 3, ResponseHandler.class );
beginBehaviour.accept( beginHandler );
return null;
} ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() );

return connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,24 @@ private static void finalize( Session session ) throws Exception
finalizeMethod.invoke( session );
}

private static LeakLoggingNetworkSession newSession( Logging logging, boolean inUseConnection )
private static LeakLoggingNetworkSession newSession( Logging logging, boolean openConnection )
{
return new LeakLoggingNetworkSession( connectionProviderMock( inUseConnection ), READ,
return new LeakLoggingNetworkSession( connectionProviderMock( openConnection ), READ,
new FixedRetryLogic( 0 ), logging );
}

private static ConnectionProvider connectionProviderMock( boolean inUseConnection )
private static ConnectionProvider connectionProviderMock( boolean openConnection )
{
ConnectionProvider provider = mock( ConnectionProvider.class );
Connection connection = connectionMock( inUseConnection );
Connection connection = connectionMock( openConnection );
when( provider.acquireConnection( any( AccessMode.class ) ) ).thenReturn( completedFuture( connection ) );
return provider;
}

private static Connection connectionMock( boolean inUse )
private static Connection connectionMock( boolean open )
{
Connection connection = mock( Connection.class );
when( connection.isInUse() ).thenReturn( inUse );
when( connection.isOpen() ).thenReturn( open );
return connection;
}
}
Loading