Skip to content

Java 8, CompletionStage in async and stress test improvements #406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.neo4j.driver.internal;

import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.async.AsyncConnection;
import org.neo4j.driver.internal.async.InternalFuture;
import org.neo4j.driver.internal.async.Futures;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;
Expand Down Expand Up @@ -53,7 +55,7 @@ public PooledConnection acquireConnection( AccessMode mode )
}

@Override
public InternalFuture<AsyncConnection> acquireAsyncConnection( AccessMode mode )
public CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode )
{
return asyncPool.acquire( address );
}
Expand All @@ -62,7 +64,7 @@ public InternalFuture<AsyncConnection> acquireAsyncConnection( AccessMode mode )
public void close() throws Exception
{
pool.close();
asyncPool.closeAsync().syncUninterruptibly();
Futures.getBlocking( asyncPool.closeAsync() );
}

public BoltServerAddress getAddress()
Expand Down
30 changes: 13 additions & 17 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.neo4j.driver.internal.async.AsyncConnectorImpl;
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.Futures;
import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
Expand Down Expand Up @@ -85,15 +86,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
try
{
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
asyncConnectionPool, eventLoopGroup );
asyncConnectionPool );
}
catch ( Throwable driverError )
{
// we need to close the connection pool if driver creation threw exception
try
{
connectionPool.close();
asyncConnectionPool.closeAsync().syncUninterruptibly();
Futures.getBlocking( asyncConnectionPool.closeAsync() );
}
catch ( Throwable closeError )
{
Expand All @@ -120,19 +121,17 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
}

private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
AsyncConnectionPool asyncConnectionPool )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
eventExecutorGroup );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
eventExecutorGroup );
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
Expand All @@ -144,13 +143,12 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* <b>This method is protected only for testing</b>
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
EventExecutorGroup eventExecutorGroup )
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
{
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand All @@ -160,16 +158,14 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* <b>This method is protected only for testing</b>
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand Down Expand Up @@ -251,9 +247,9 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
* <b>This method is protected only for testing</b>
*/
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup, Config config )
Config config )
{
return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import org.neo4j.driver.ResultResourcesHandler;
import org.neo4j.driver.internal.async.AsyncConnection;
import org.neo4j.driver.internal.async.InternalFuture;
import org.neo4j.driver.internal.async.InternalPromise;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
import org.neo4j.driver.internal.handlers.BookmarkResponseHandler;
Expand All @@ -33,9 +34,7 @@
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.StatementResultCursor;
Expand All @@ -45,8 +44,9 @@
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.types.TypeSystem;
import org.neo4j.driver.v1.util.Function;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.async.Futures.failedFuture;
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
import static org.neo4j.driver.v1.Values.ofValue;
import static org.neo4j.driver.v1.Values.value;
Expand Down Expand Up @@ -118,25 +118,23 @@ public void begin( Bookmark initialBookmark )
}
}

public InternalFuture<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
{
InternalPromise<ExplicitTransaction> beginTxPromise = asyncConnection.newPromise();

Map<String,Value> parameters = initialBookmark.asBeginTransactionParameters();
asyncConnection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );

if ( initialBookmark.isEmpty() )
{
asyncConnection.pullAll( NoOpResponseHandler.INSTANCE );
beginTxPromise.setSuccess( this );
return completedFuture( this );
}
else
{
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginTxPromise, this ) );
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginFuture, this ) );
asyncConnection.flush();
return beginFuture;
}

return beginTxPromise;
}

@Override
Expand Down Expand Up @@ -215,21 +213,15 @@ private void rollbackTx()
}

@Override
public Response<Void> commitAsync()
{
return internalCommitAsync();
}

InternalFuture<Void> internalCommitAsync()
public CompletionStage<Void> commitAsync()
{
if ( state == State.COMMITTED )
{
return asyncConnection.<Void>newPromise().setSuccess( null );
return completedFuture( null );
}
else if ( state == State.ROLLED_BACK )
{
return asyncConnection.<Void>newPromise().setFailure(
new ClientException( "Can't commit, transaction has already been rolled back" ) );
return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) );
}
else
{
Expand All @@ -238,21 +230,15 @@ else if ( state == State.ROLLED_BACK )
}

@Override
public Response<Void> rollbackAsync()
{
return internalRollbackAsync();
}

InternalFuture<Void> internalRollbackAsync()
public CompletionStage<Void> rollbackAsync()
{
if ( state == State.COMMITTED )
{
return asyncConnection.<Void>newPromise()
.setFailure( new ClientException( "Can't rollback, transaction has already been committed" ) );
return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) );
}
else if ( state == State.ROLLED_BACK )
{
return asyncConnection.<Void>newPromise().setSuccess( null );
return completedFuture( null );
}
else
{
Expand All @@ -262,51 +248,39 @@ else if ( state == State.ROLLED_BACK )

private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
{
return new BiConsumer<Void,Throwable>()
return ( ignore, error ) ->
{
@Override
public void accept( Void result, Throwable error )
{
asyncConnection.release();
session.asyncTransactionClosed( ExplicitTransaction.this );
}
asyncConnection.release();
session.asyncTransactionClosed( ExplicitTransaction.this );
};
}

private InternalFuture<Void> doCommitAsync()
private CompletionStage<Void> doCommitAsync()
{
InternalPromise<Void> commitTxPromise = asyncConnection.newPromise();
CompletableFuture<Void> commitFuture = new CompletableFuture<>();

asyncConnection.run( COMMIT_QUERY, Collections.<String,Value>emptyMap(), NoOpResponseHandler.INSTANCE );
asyncConnection.pullAll( new CommitTxResponseHandler( commitTxPromise, this ) );
asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
asyncConnection.pullAll( new CommitTxResponseHandler( commitFuture, this ) );
asyncConnection.flush();

return commitTxPromise.thenApply( new Function<Void,Void>()
return commitFuture.thenApply( ignore ->
{
@Override
public Void apply( Void ignore )
{
ExplicitTransaction.this.state = State.COMMITTED;
return null;
}
ExplicitTransaction.this.state = State.COMMITTED;
return null;
} );
}

private InternalFuture<Void> doRollbackAsync()
private CompletionStage<Void> doRollbackAsync()
{
InternalPromise<Void> rollbackTxPromise = asyncConnection.newPromise();
asyncConnection.run( ROLLBACK_QUERY, Collections.<String,Value>emptyMap(), NoOpResponseHandler.INSTANCE );
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackTxPromise ) );
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackFuture ) );
asyncConnection.flush();

return rollbackTxPromise.thenApply( new Function<Void,Void>()
return rollbackFuture.thenApply( ignore ->
{
@Override
public Void apply( Void ignore )
{
ExplicitTransaction.this.state = State.ROLLED_BACK;
return null;
}
ExplicitTransaction.this.state = State.ROLLED_BACK;
return null;
} );
}

Expand All @@ -317,7 +291,7 @@ public StatementResult run( String statementText, Value statementParameters )
}

@Override
public Response<StatementResultCursor> runAsync( String statementText, Value parameters )
public CompletionStage<StatementResultCursor> runAsync( String statementText, Value parameters )
{
return runAsync( new Statement( statementText, parameters ) );
}
Expand All @@ -329,7 +303,7 @@ public StatementResult run( String statementText )
}

@Override
public Response<StatementResultCursor> runAsync( String statementTemplate )
public CompletionStage<StatementResultCursor> runAsync( String statementTemplate )
{
return runAsync( statementTemplate, Values.EmptyMap );
}
Expand All @@ -342,7 +316,8 @@ public StatementResult run( String statementText, Map<String,Object> statementPa
}

@Override
public Response<StatementResultCursor> runAsync( String statementTemplate, Map<String,Object> statementParameters )
public CompletionStage<StatementResultCursor> runAsync( String statementTemplate,
Map<String,Object> statementParameters )
{
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
return runAsync( statementTemplate, params );
Expand All @@ -356,7 +331,7 @@ public StatementResult run( String statementTemplate, Record statementParameters
}

@Override
public Response<StatementResultCursor> runAsync( String statementTemplate, Record statementParameters )
public CompletionStage<StatementResultCursor> runAsync( String statementTemplate, Record statementParameters )
{
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
return runAsync( statementTemplate, params );
Expand Down Expand Up @@ -388,7 +363,7 @@ public StatementResult run( Statement statement )
}

@Override
public Response<StatementResultCursor> runAsync( Statement statement )
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
{
ensureNotFailed();
return QueryRunner.runAsync( asyncConnection, statement, this );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal;

import io.netty.util.concurrent.EventExecutorGroup;

import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
Expand All @@ -32,9 +30,9 @@ class LeakLoggingNetworkSession extends NetworkSession
private final String stackTrace;

LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup, Logging logging )
Logging logging )
{
super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
super( connectionProvider, mode, retryLogic, logging );
this.stackTrace = captureStackTrace();
}

Expand Down
Loading