Skip to content

API updates #325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import org.neo4j.driver.internal.net.SocketConnector;
import org.neo4j.driver.internal.net.pooling.PoolSettings;
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
import org.neo4j.driver.internal.retry.ExponentialBackoff;
import org.neo4j.driver.internal.retry.RetryDecision;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
Expand All @@ -47,15 +51,18 @@

public class DriverFactory
{
public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings routingSettings, Config config )
public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings routingSettings,
RetrySettings retrySettings, Config config )
{
BoltServerAddress address = BoltServerAddress.from( uri );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
RetryLogic<RetryDecision> retryLogic = createRetryLogic( retrySettings );

try
{
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan );
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan,
retryLogic );
}
catch ( Throwable driverError )
{
Expand All @@ -73,14 +80,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}

private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
RetryLogic<RetryDecision> retryLogic )
{
switch ( scheme.toLowerCase() )
{
case "bolt":
return createDirectDriver( address, connectionPool, config, securityPlan );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic );
case "bolt+routing":
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan );
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
Expand All @@ -92,10 +100,10 @@ private Driver createDriver( BoltServerAddress address, String scheme, Connectio
* <b>This method is protected only for testing</b>
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan )
SecurityPlan securityPlan, RetryLogic<RetryDecision> retryLogic )
{
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand All @@ -105,14 +113,15 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* <b>This method is protected only for testing</b>
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
RetryLogic<RetryDecision> retryLogic )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand Down Expand Up @@ -180,9 +189,20 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu
* <p>
* <b>This method is protected only for testing</b>
*/
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, Config config )
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
RetryLogic<RetryDecision> retryLogic, Config config )
{
return new SessionFactoryImpl( connectionProvider, config, config.logging() );
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
}

/**
* Creates new {@link RetryLogic<RetryDecision>}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected RetryLogic<RetryDecision> createRetryLogic( RetrySettings settings )
{
return ExponentialBackoff.create( settings, createClock() );
}

private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,21 @@ public final Session session()

@Override
public final Session session( AccessMode mode )
{
return session( mode, null );
}

@Override
public final Session session( String bookmark )
{
return session( AccessMode.WRITE, bookmark );
}

@Override
public final Session session( AccessMode mode, String bookmark )
{
assertOpen();
Session session = sessionFactory.newInstance( mode );
Session session = sessionFactory.newInstance( mode, bookmark );
if ( closed.get() )
{
// the driver is already closed and we either 1. obtain this session from the old session pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal;

import org.neo4j.driver.internal.retry.RetryDecision;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Logging;
Expand All @@ -28,9 +30,10 @@ class LeakLoggingNetworkSession extends NetworkSession
{
private final String stackTrace;

LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, Logging logging )
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode,
RetryLogic<RetryDecision> retryLogic, Logging logging )
{
super( connectionProvider, mode, logging );
super( connectionProvider, mode, retryLogic, logging );
this.stackTrace = captureStackTrace();
}

Expand Down
124 changes: 108 additions & 16 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.neo4j.driver.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.retry.RetryDecision;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.PooledConnection;
Expand All @@ -37,13 +41,15 @@
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.types.TypeSystem;
import org.neo4j.driver.v1.util.Function;

import static org.neo4j.driver.v1.Values.value;

public class NetworkSession implements Session, SessionResourcesHandler
{
private final ConnectionProvider connectionProvider;
private final AccessMode mode;
private final RetryLogic<RetryDecision> retryLogic;
protected final Logger logger;

private String lastBookmark;
Expand All @@ -52,10 +58,12 @@ public class NetworkSession implements Session, SessionResourcesHandler

private final AtomicBoolean isOpen = new AtomicBoolean( true );

public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, Logging logging )
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic<RetryDecision> retryLogic,
Logging logging )
{
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
this.logger = logging.getLog( "Session-" + hashCode() );
}

Expand Down Expand Up @@ -92,12 +100,13 @@ public StatementResult run( Statement statement )
ensureNoOpenTransactionBeforeRunningSession();

syncAndCloseCurrentConnection();
currentConnection = acquireConnection();
currentConnection = acquireConnection( mode );

return run( currentConnection, statement, this );
}

public static StatementResult run( Connection connection, Statement statement, SessionResourcesHandler resourcesHandler )
public static StatementResult run( Connection connection, Statement statement,
SessionResourcesHandler resourcesHandler )
{
InternalStatementResult result = new InternalStatementResult( connection, resourcesHandler, null, statement );
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ),
Expand All @@ -116,7 +125,7 @@ public synchronized void reset()
if ( currentTransaction != null )
{
currentTransaction.markToClose();
lastBookmark = currentTransaction.bookmark();
updateLastBookmarkFrom( currentTransaction );
currentTransaction = null;
}
if ( currentConnection != null )
Expand Down Expand Up @@ -155,28 +164,38 @@ public void close()
}
}
}

syncAndCloseCurrentConnection();
}

@Override
public Transaction beginTransaction()
public synchronized Transaction beginTransaction()
{
return beginTransaction( null );
return beginTransaction( mode );
}

@Override
public synchronized Transaction beginTransaction( String bookmark )
{
ensureSessionIsOpen();
ensureNoOpenTransactionBeforeOpeningTransaction();
lastBookmark = bookmark;
return beginTransaction();
}

syncAndCloseCurrentConnection();
currentConnection = acquireConnection();
@Override
public <T> T readTransaction( Function<Transaction,T> work )
{
return transaction( AccessMode.READ, work );
}

currentTransaction = new ExplicitTransaction( currentConnection, this, bookmark );
currentConnection.setResourcesHandler( this );
return currentTransaction;
@Override
public <T> T writeTransaction( Function<Transaction,T> work )
{
return transaction( AccessMode.WRITE, work );
}

void setLastBookmark( String bookmark )
{
lastBookmark = bookmark;
}

@Override
Expand All @@ -203,7 +222,7 @@ public synchronized void onTransactionClosed( ExplicitTransaction tx )
if ( currentTransaction != null && currentTransaction == tx )
{
closeCurrentConnection();
lastBookmark = currentTransaction.bookmark();
updateLastBookmarkFrom( currentTransaction );
currentTransaction = null;
}
}
Expand All @@ -225,6 +244,47 @@ public synchronized void onConnectionError( boolean recoverable )
}
}

private synchronized <T> T transaction( AccessMode mode, Function<Transaction,T> work )
{
RetryDecision decision = null;
List<Throwable> errors = null;

while ( true )
{
try ( Transaction tx = beginTransaction( mode ) )
{
return work.apply( tx );
}
catch ( Throwable newError )
{
decision = retryLogic.apply( newError, decision );

if ( decision.shouldRetry() )
{
errors = recordError( newError, errors );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just directly chain the errors? why we have to first remember them (in recordError) and then build the error chain (in addSuppressed)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Printed stacktrace of a chain of suppressed methods looks quite strange, every next suppressed trace is shifted to the right. With this approach all previous errors will be on the same level :)

}
else
{
addSuppressed( newError, errors );
throw newError;
}
}
}
}

private synchronized Transaction beginTransaction( AccessMode mode )
{
ensureSessionIsOpen();
ensureNoOpenTransactionBeforeOpeningTransaction();

syncAndCloseCurrentConnection();
currentConnection = acquireConnection( mode );

currentTransaction = new ExplicitTransaction( currentConnection, this, lastBookmark );
currentConnection.setResourcesHandler( this );
return currentTransaction;
}

private void ensureNoUnrecoverableError()
{
if ( currentConnection != null && currentConnection.hasUnrecoverableErrors() )
Expand Down Expand Up @@ -268,7 +328,7 @@ private void ensureSessionIsOpen()
}
}

private PooledConnection acquireConnection()
private PooledConnection acquireConnection( AccessMode mode )
{
PooledConnection connection = connectionProvider.acquireConnection( mode );
logger.debug( "Acquired connection " + connection.hashCode() );
Expand Down Expand Up @@ -312,4 +372,36 @@ private void closeCurrentConnection( boolean sync )
logger.debug( "Released connection " + connection.hashCode() );
}
}

private void updateLastBookmarkFrom( ExplicitTransaction tx )
{
if ( tx.bookmark() != null )
{
lastBookmark = tx.bookmark();
}
}

private static List<Throwable> recordError( Throwable error, List<Throwable> errors )
{
if ( errors == null )
{
errors = new ArrayList<>();
}
errors.add( error );
return errors;
}

private static void addSuppressed( Throwable error, List<Throwable> suppressedErrors )
{
if ( suppressedErrors != null )
{
for ( Throwable suppressedError : suppressedErrors )
{
if ( error != suppressedError )
{
error.addSuppressed( suppressedError );
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@

public interface SessionFactory extends AutoCloseable
{
Session newInstance( AccessMode mode );
Session newInstance( AccessMode mode, String bookmark );
}
Loading