Skip to content

Handle thread interruption by closing the channel #441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 72 additions & 30 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static java.lang.String.format;
import static org.neo4j.driver.internal.security.SecurityPlan.insecure;
Expand All @@ -78,26 +79,12 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r

ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config );

try
{
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
eventExecutorGroup, securityPlan, retryLogic );
Futures.blockingGet( driver.verifyConnectivity() );
return driver;
}
catch ( Throwable driverError )
{
// we need to close the connection pool if driver creation threw exception
try
{
Futures.blockingGet( connectionPool.close() );
}
catch ( Throwable closeError )
{
driverError.addSuppressed( closeError );
}
throw driverError;
}
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
eventExecutorGroup, securityPlan, retryLogic );

verifyConnectivity( driver, connectionPool, config );

return driver;
}

protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
Expand All @@ -123,17 +110,26 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address,
ConnectionPool connectionPool, Config config, RoutingSettings routingSettings,
EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
try
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
eventExecutorGroup );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
eventExecutorGroup );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
}
catch ( Throwable driverError )
{
// we need to close the connection pool if driver creation threw exception
closeConnectionPoolAndSuppressError( connectionPool, driverError );
throw driverError;
}
}

Expand Down Expand Up @@ -313,4 +309,50 @@ private static void assertNoRoutingContext( URI uri, RoutingSettings routingSett
"Routing parameters are not supported with scheme 'bolt'. Given URI: '" + uri + "'" );
}
}

private static void verifyConnectivity( InternalDriver driver, ConnectionPool connectionPool, Config config )
{
try
{
// block to verify connectivity, close connection pool if thread gets interrupted
Futures.blockingGet( driver.verifyConnectivity(),
() -> closeConnectionPoolOnThreadInterrupt( connectionPool, config.logging() ) );
}
catch ( Throwable connectionError )
{
if ( Thread.currentThread().isInterrupted() )
{
// current thread has been interrupted while verifying connectivity
// connection pool should've been closed
throw new ServiceUnavailableException( "Unable to create driver. Thread has been interrupted.",
connectionError );
}

// we need to close the connection pool if driver creation threw exception
closeConnectionPoolAndSuppressError( connectionPool, connectionError );
throw connectionError;
}
}

private static void closeConnectionPoolAndSuppressError( ConnectionPool connectionPool, Throwable mainError )
{
try
{
Futures.blockingGet( connectionPool.close() );
}
catch ( Throwable closeError )
{
if ( mainError != closeError )
{
mainError.addSuppressed( closeError );
}
}
}

private static void closeConnectionPoolOnThreadInterrupt( ConnectionPool pool, Logging logging )
{
Logger log = logging.getLog( Driver.class.getSimpleName() );
log.warn( "Driver creation interrupted while verifying connectivity. Connection pool will be closed" );
pool.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.blockingGet;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.v1.Values.value;

Expand Down Expand Up @@ -148,7 +147,8 @@ public void failure()
@Override
public void close()
{
blockingGet( closeAsync() );
Futures.blockingGet( closeAsync(),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the transaction" ) );
}

CompletionStage<Void> closeAsync()
Expand Down Expand Up @@ -272,8 +272,9 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
@Override
public StatementResult run( Statement statement )
{
StatementResultCursor cursor = blockingGet( run( statement, false ) );
return new InternalStatementResult( cursor );
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) );
return new InternalStatementResult( connection, cursor );
}

@Override
Expand Down Expand Up @@ -368,7 +369,17 @@ private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursor
{
return ( ignore, commitOrRollbackError ) ->
{
if ( cursorFailure != null )
if ( cursorFailure != null && commitOrRollbackError != null )
{
Throwable cause1 = Futures.completionExceptionCause( cursorFailure );
Throwable cause2 = Futures.completionExceptionCause( commitOrRollbackError );
if ( cause1 != cause2 )
{
cause1.addSuppressed( cause2 );
}
throw Futures.asCompletionException( cause1 );
}
else if ( cursorFailure != null )
{
throw Futures.asCompletionException( cursorFailure );
}
Expand All @@ -392,4 +403,9 @@ private BiConsumer<Object,Throwable> transactionClosed( State newState )
session.setBookmark( bookmark );
};
}

private void terminateConnectionOnThreadInterrupt( String reason )
{
connection.terminateAndRelease( reason );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Session;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.blockingGet;

public class InternalDriver implements Driver
{
Expand Down Expand Up @@ -105,7 +105,7 @@ private Session newSession( AccessMode mode, Bookmark bookmark )
@Override
public void close()
{
blockingGet( closeAsync() );
Futures.blockingGet( closeAsync() );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.neo4j.driver.internal;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.StatementResultCursor;
Expand All @@ -28,15 +31,15 @@
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.util.Function;

import static org.neo4j.driver.internal.util.Futures.blockingGet;

public class InternalStatementResult implements StatementResult
{
private final Connection connection;
private final StatementResultCursor cursor;
private List<String> keys;

public InternalStatementResult( StatementResultCursor cursor )
public InternalStatementResult( Connection connection, StatementResultCursor cursor )
{
this.connection = connection;
this.cursor = cursor;
}

Expand Down Expand Up @@ -114,4 +117,14 @@ public void remove()
{
throw new ClientException( "Removing records from a result is not supported." );
}

private <T> T blockingGet( CompletionStage<T> stage )
{
return Futures.blockingGet( stage, this::terminateConnectionOnThreadInterrupt );
}

private void terminateConnectionOnThreadInterrupt()
{
connection.terminateAndRelease( "Thread interrupted while waiting for result to arrive" );
}
}
44 changes: 37 additions & 7 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.neo4j.driver.v1.types.TypeSystem;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.blockingGet;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.v1.Values.value;

Expand Down Expand Up @@ -132,8 +131,12 @@ public CompletionStage<StatementResultCursor> runAsync( String statementText, Va
@Override
public StatementResult run( Statement statement )
{
StatementResultCursor cursor = blockingGet( run( statement, false ) );
return new InternalStatementResult( cursor );
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );

// query executed, it is safe to obtain a connection in a blocking way
Connection connection = Futures.getNow( connectionStage );
return new InternalStatementResult( connection, cursor );
}

@Override
Expand All @@ -152,7 +155,8 @@ public boolean isOpen()
@Override
public void close()
{
blockingGet( closeAsync() );
Futures.blockingGet( closeAsync(),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the session" ) );
}

@Override
Expand Down Expand Up @@ -188,7 +192,7 @@ public CompletionStage<Void> closeAsync()
@Override
public Transaction beginTransaction()
{
return blockingGet( beginTransactionAsync( mode ) );
return beginTransaction( mode );
}

@Deprecated
Expand Down Expand Up @@ -247,7 +251,8 @@ public String lastBookmark()
@Override
public void reset()
{
blockingGet( resetAsync() );
Futures.blockingGet( resetAsync(),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) );
}

private CompletionStage<Void> resetAsync()
Expand Down Expand Up @@ -287,7 +292,7 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
// event loop thread will bock and wait for itself to read some data
return retryLogic.retry( () ->
{
try ( Transaction tx = blockingGet( beginTransactionAsync( mode ) ) )
try ( Transaction tx = beginTransaction( mode ) )
{
try
{
Expand Down Expand Up @@ -422,6 +427,12 @@ private CompletionStage<InternalStatementResultCursor> run( Statement statement,
return newResultCursorStage;
}

private Transaction beginTransaction( AccessMode mode )
{
return Futures.blockingGet( beginTransactionAsync( mode ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
}

private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
{
ensureSessionIsOpen();
Expand Down Expand Up @@ -515,6 +526,25 @@ private CompletionStage<Void> releaseConnection()
} );
}

private void terminateConnectionOnThreadInterrupt( String reason )
{
// try to get current connection if it has been acquired
Connection connection = null;
try
{
connection = Futures.getNow( connectionStage );
}
catch ( Throwable ignore )
{
// ignore errors because handing interruptions is best effort
}

if ( connection != null )
{
connection.terminateAndRelease( reason );
}
}

private CompletionStage<Void> ensureNoOpenTxBeforeRunningQuery()
{
return ensureNoOpenTx( "Statements cannot be run directly on a session with an open transaction; " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class ChannelAttributes
private static final AttributeKey<Long> CREATION_TIMESTAMP = newInstance( "creationTimestamp" );
private static final AttributeKey<Long> LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
private static final AttributeKey<InboundMessageDispatcher> MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
private static final AttributeKey<String> TERMINATION_REASON = newInstance( "terminationReason" );

private ChannelAttributes()
{
Expand Down Expand Up @@ -89,6 +90,16 @@ public static void setMessageDispatcher( Channel channel, InboundMessageDispatch
setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher );
}

public static String terminationReason( Channel channel )
{
return get( channel, TERMINATION_REASON );
}

public static void setTerminationReason( Channel channel, String reason )
{
setOnce( channel, TERMINATION_REASON, reason );
}

private static <T> T get( Channel channel, AttributeKey<T> key )
{
return channel.attr( key ).get();
Expand Down
Loading