Skip to content

Commit 192d33b

Browse files
authored
Merge pull request #405 from lutovich/1.5-async-result-cursor-api
Improvements for StatementResultCursor API
2 parents 8290bab + 6eef420 commit 192d33b

33 files changed

+1714
-251
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.neo4j.driver.internal;
2020

2121
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.channel.EventLoopGroup;
23+
import io.netty.util.concurrent.EventExecutorGroup;
2224

2325
import java.io.IOException;
2426
import java.net.URI;
@@ -72,14 +74,18 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7274
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
7375
SecurityPlan securityPlan = createSecurityPlan( address, config );
7476
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
75-
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );
7677

77-
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config );
78+
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
79+
EventLoopGroup eventLoopGroup = bootstrap.config().group();
80+
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );
81+
82+
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
83+
config );
7884

7985
try
8086
{
8187
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
82-
asyncConnectionPool );
88+
asyncConnectionPool, eventLoopGroup );
8389
}
8490
catch ( Throwable driverError )
8591
{
@@ -98,14 +104,13 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
98104
}
99105

100106
private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
101-
Config config )
107+
Bootstrap bootstrap, Config config )
102108
{
103109
Clock clock = createClock();
104110
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
105111
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
106112
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
107113
activeChannelTracker, config.logging(), clock );
108-
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
109114
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
110115
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
111116
config.maxConnectionPoolSize(),
@@ -116,16 +121,18 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
116121

117122
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
118123
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
119-
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
124+
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
120125
{
121126
String scheme = uri.getScheme().toLowerCase();
122127
switch ( scheme )
123128
{
124129
case BOLT_URI_SCHEME:
125130
assertNoRoutingContext( uri, routingSettings );
126-
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
131+
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
132+
eventExecutorGroup );
127133
case BOLT_ROUTING_URI_SCHEME:
128-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
134+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
135+
eventExecutorGroup );
129136
default:
130137
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
131138
}
@@ -137,11 +144,13 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
137144
* <b>This method is protected only for testing</b>
138145
*/
139146
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
140-
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
147+
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
148+
EventExecutorGroup eventExecutorGroup )
141149
{
142150
ConnectionProvider connectionProvider =
143151
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
144-
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
152+
SessionFactory sessionFactory =
153+
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
145154
return createDriver( config, securityPlan, sessionFactory );
146155
}
147156

@@ -151,14 +160,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
151160
* <b>This method is protected only for testing</b>
152161
*/
153162
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
154-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
163+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
164+
EventExecutorGroup eventExecutorGroup )
155165
{
156166
if ( !securityPlan.isRoutingCompatible() )
157167
{
158168
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
159169
}
160170
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
161-
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
171+
SessionFactory sessionFactory =
172+
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
162173
return createDriver( config, securityPlan, sessionFactory );
163174
}
164175

@@ -239,20 +250,21 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
239250
* <p>
240251
* <b>This method is protected only for testing</b>
241252
*/
242-
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
243-
RetryLogic retryLogic, Config config )
253+
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
254+
EventExecutorGroup eventExecutorGroup, Config config )
244255
{
245-
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
256+
return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
246257
}
247258

248259
/**
249260
* Creates new {@link RetryLogic >}.
250261
* <p>
251262
* <b>This method is protected only for testing</b>
252263
*/
253-
protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging )
264+
protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup,
265+
Logging logging )
254266
{
255-
return new ExponentialBackoffRetryLogic( settings, createClock(), logging );
267+
return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
256268
}
257269

258270
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
3434
import org.neo4j.driver.internal.spi.Connection;
3535
import org.neo4j.driver.internal.types.InternalTypeSystem;
36+
import org.neo4j.driver.internal.util.BiConsumer;
3637
import org.neo4j.driver.v1.Record;
3738
import org.neo4j.driver.v1.Response;
3839
import org.neo4j.driver.v1.Statement;
@@ -219,7 +220,7 @@ public Response<Void> commitAsync()
219220
return internalCommitAsync();
220221
}
221222

222-
private InternalFuture<Void> internalCommitAsync()
223+
InternalFuture<Void> internalCommitAsync()
223224
{
224225
if ( state == State.COMMITTED )
225226
{
@@ -259,12 +260,12 @@ else if ( state == State.ROLLED_BACK )
259260
}
260261
}
261262

262-
private Runnable releaseConnectionAndNotifySession()
263+
private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
263264
{
264-
return new Runnable()
265+
return new BiConsumer<Void,Throwable>()
265266
{
266267
@Override
267-
public void run()
268+
public void accept( Void result, Throwable error )
268269
{
269270
asyncConnection.release();
270271
session.asyncTransactionClosed( ExplicitTransaction.this );

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class InternalStatementResult implements StatementResult
5050
{
5151
this.statement = statement;
5252
this.connection = connection;
53-
this.runResponseHandler = new RunResponseHandler( null );
53+
this.runResponseHandler = new RunResponseHandler( null, null );
5454
this.pullAllResponseHandler = new RecordsResponseHandler( runResponseHandler );
5555
this.resourcesHandler = resourcesHandler;
5656
}

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import io.netty.util.concurrent.EventExecutorGroup;
22+
2123
import org.neo4j.driver.internal.retry.RetryLogic;
2224
import org.neo4j.driver.internal.spi.ConnectionProvider;
2325
import org.neo4j.driver.v1.AccessMode;
@@ -30,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession
3032
private final String stackTrace;
3133

3234
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
33-
Logging logging )
35+
EventExecutorGroup eventExecutorGroup, Logging logging )
3436
{
35-
super( connectionProvider, mode, retryLogic, logging );
37+
super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
3638
this.stackTrace = captureStackTrace();
3739
}
3840

0 commit comments

Comments
 (0)