Skip to content

Expose transaction timeout and metadata in the API #518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.Extract;
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
Expand All @@ -32,10 +33,6 @@
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.types.TypeSystem;

import static org.neo4j.driver.internal.util.Extract.assertParameter;
import static org.neo4j.driver.internal.util.Iterables.newHashMapWithSize;
import static org.neo4j.driver.v1.Values.value;

abstract class AbstractStatementRunner implements StatementRunner
{
@Override
Expand Down Expand Up @@ -103,14 +100,6 @@ private static Value parameters( Map<String,Object> map )
{
return Values.EmptyMap;
}

Map<String,Value> asValues = newHashMapWithSize( map.size() );
for ( Map.Entry<String,Object> entry : map.entrySet() )
{
Object value = entry.getValue();
assertParameter( value );
asValues.put( entry.getKey(), value( value ) );
}
return new MapValue( asValues );
return new MapValue( Extract.mapOfValues( map ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.StatementResultCursor;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.TransactionConfig;
import org.neo4j.driver.v1.exceptions.ClientException;

import static org.neo4j.driver.internal.util.Futures.completedWithNull;
Expand Down Expand Up @@ -78,9 +79,9 @@ public ExplicitTransaction( Connection connection, NetworkSession session )
this.resultCursors = new ResultCursorsHolder();
}

public CompletionStage<ExplicitTransaction> beginAsync( Bookmarks initialBookmarks )
public CompletionStage<ExplicitTransaction> beginAsync( Bookmarks initialBookmarks, TransactionConfig config )
{
return protocol.beginTransaction( connection, initialBookmarks )
return protocol.beginTransaction( connection, initialBookmarks, config )
.handle( ( ignore, beginError ) ->
{
if ( beginError != null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class LeakLoggingNetworkSession extends NetworkSession
{
private final String stackTrace;

LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
Logging logging )
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, Logging logging )
{
super( connectionProvider, mode, retryLogic, logging );
this.stackTrace = captureStackTrace();
Expand Down
115 changes: 95 additions & 20 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
Expand All @@ -36,9 +37,11 @@
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.StatementResultCursor;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.TransactionConfig;
import org.neo4j.driver.v1.TransactionWork;
import org.neo4j.driver.v1.exceptions.ClientException;

import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
Expand All @@ -59,8 +62,7 @@ public class NetworkSession extends AbstractStatementRunner implements Session

private final AtomicBoolean open = new AtomicBoolean( true );

public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
Logging logging )
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, Logging logging )
{
this.connectionProvider = connectionProvider;
this.mode = mode;
Expand All @@ -71,7 +73,25 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
@Override
public StatementResult run( Statement statement )
{
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
return run( statement, TransactionConfig.empty() );
}

@Override
public StatementResult run( String statement, TransactionConfig config )
{
return run( statement, emptyMap(), config );
}

@Override
public StatementResult run( String statement, Map<String,Object> parameters, TransactionConfig config )
{
return run( new Statement( statement, parameters ), config );
}

@Override
public StatementResult run( Statement statement, TransactionConfig config )
{
StatementResultCursor cursor = Futures.blockingGet( run( statement, config, false ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );

// query executed, it is safe to obtain a connection in a blocking way
Expand All @@ -81,9 +101,27 @@ public StatementResult run( Statement statement )

@Override
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
{
return runAsync( statement, TransactionConfig.empty() );
}

@Override
public CompletionStage<StatementResultCursor> runAsync( String statement, TransactionConfig config )
{
return runAsync( statement, emptyMap(), config );
}

@Override
public CompletionStage<StatementResultCursor> runAsync( String statement, Map<String,Object> parameters, TransactionConfig config )
{
return runAsync( new Statement( statement, parameters ), config );
}

@Override
public CompletionStage<StatementResultCursor> runAsync( Statement statement, TransactionConfig config )
{
//noinspection unchecked
return (CompletionStage) run( statement, true );
return (CompletionStage) run( statement, config, true );
}

@Override
Expand Down Expand Up @@ -131,7 +169,13 @@ public CompletionStage<Void> closeAsync()
@Override
public Transaction beginTransaction()
{
return beginTransaction( mode );
return beginTransaction( TransactionConfig.empty() );
}

@Override
public Transaction beginTransaction( TransactionConfig config )
{
return beginTransaction( mode, config );
}

@Deprecated
Expand All @@ -144,33 +188,63 @@ public Transaction beginTransaction( String bookmark )

@Override
public CompletionStage<Transaction> beginTransactionAsync()
{
return beginTransactionAsync( TransactionConfig.empty() );
}

@Override
public CompletionStage<Transaction> beginTransactionAsync( TransactionConfig config )
{
//noinspection unchecked
return (CompletionStage) beginTransactionAsync( mode );
return (CompletionStage) beginTransactionAsync( mode, config );
}

@Override
public <T> T readTransaction( TransactionWork<T> work )
{
return transaction( AccessMode.READ, work );
return readTransaction( work, TransactionConfig.empty() );
}

@Override
public <T> T readTransaction( TransactionWork<T> work, TransactionConfig config )
{
return transaction( AccessMode.READ, work, config );
}

@Override
public <T> CompletionStage<T> readTransactionAsync( TransactionWork<CompletionStage<T>> work )
{
return transactionAsync( AccessMode.READ, work );
return readTransactionAsync( work, TransactionConfig.empty() );
}

@Override
public <T> CompletionStage<T> readTransactionAsync( TransactionWork<CompletionStage<T>> work, TransactionConfig config )
{
return transactionAsync( AccessMode.READ, work, config );
}

@Override
public <T> T writeTransaction( TransactionWork<T> work )
{
return transaction( AccessMode.WRITE, work );
return writeTransaction( work, TransactionConfig.empty() );
}

@Override
public <T> T writeTransaction( TransactionWork<T> work, TransactionConfig config )
{
return transaction( AccessMode.WRITE, work, config );
}

@Override
public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionStage<T>> work )
{
return transactionAsync( AccessMode.WRITE, work );
return writeTransactionAsync( work, TransactionConfig.empty() );
}

@Override
public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionStage<T>> work, TransactionConfig config )
{
return transactionAsync( AccessMode.WRITE, work, config );
}

void setBookmarks( Bookmarks bookmarks )
Expand Down Expand Up @@ -225,15 +299,15 @@ CompletionStage<Boolean> currentConnectionIsOpen()
connection.isOpen() ); // and it's still open
}

private <T> T transaction( AccessMode mode, TransactionWork<T> work )
private <T> T transaction( AccessMode mode, TransactionWork<T> work, TransactionConfig config )
{
// use different code path compared to async so that work is executed in the caller thread
// caller thread will also be the one who sleeps between retries;
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
// event loop thread will bock and wait for itself to read some data
return retryLogic.retry( () ->
{
try ( Transaction tx = beginTransaction( mode ) )
try ( Transaction tx = beginTransaction( mode, config ) )
{
try
{
Expand All @@ -252,12 +326,12 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
} );
}

private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work )
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work, TransactionConfig config )
{
return retryLogic.retryAsync( () ->
{
CompletableFuture<T> resultFuture = new CompletableFuture<>();
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode );
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode, config );

txFuture.whenComplete( ( tx, completionError ) ->
{
Expand Down Expand Up @@ -358,26 +432,27 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
}
}

private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
private CompletionStage<InternalStatementResultCursor> run( Statement statement, TransactionConfig config, boolean waitForRunResponse )
{
ensureSessionIsOpen();

CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
.thenCompose( ignore -> acquireConnection( mode ) )
.thenCompose( connection -> connection.protocol().runInAutoCommitTransaction( connection, statement, waitForRunResponse ) );
.thenCompose( connection ->
connection.protocol().runInAutoCommitTransaction( connection, statement, bookmarks, config, waitForRunResponse ) );

resultCursorStage = newResultCursorStage.exceptionally( error -> null );

return newResultCursorStage;
}

private Transaction beginTransaction( AccessMode mode )
private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
{
return Futures.blockingGet( beginTransactionAsync( mode ),
return Futures.blockingGet( beginTransactionAsync( mode, config ),
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
}

private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode, TransactionConfig config )
{
ensureSessionIsOpen();

Expand All @@ -387,7 +462,7 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m
.thenCompose( connection ->
{
ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
return tx.beginAsync( bookmarks );
return tx.beginAsync( bookmarks, config );
} );

// update the reference to the only known transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResultCursor;
import org.neo4j.driver.v1.TransactionConfig;
import org.neo4j.driver.v1.exceptions.ClientException;

import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
Expand Down Expand Up @@ -60,7 +62,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
{
return connection.protocol()
.runInAutoCommitTransaction( connection, procedure, true )
.runInAutoCommitTransaction( connection, procedure, Bookmarks.empty(), TransactionConfig.empty(), true )
.thenCompose( StatementResultCursor::listAsync );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.TransactionConfig;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;

Expand Down Expand Up @@ -62,9 +63,10 @@ public interface BoltProtocol
*
* @param connection the connection to use.
* @param bookmarks the bookmarks. Never null, should be {@link Bookmarks#empty()} when absent.
* @param config the transaction configuration. Never null, should be {@link TransactionConfig#empty()} when absent.
* @return a completion stage completed when transaction is started or completed exceptionally when there was a failure.
*/
CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks );
CompletionStage<Void> beginTransaction( Connection connection, Bookmarks bookmarks, TransactionConfig config );

/**
* Commit the explicit transaction.
Expand All @@ -87,12 +89,15 @@ public interface BoltProtocol
*
* @param connection the network connection to use.
* @param statement the cypher to execute.
* @param bookmarks the bookmarks. Never null, should be {@link Bookmarks#empty()} when absent.
* @param config the transaction config for the implicitly started auto-commit transaction.
* @param waitForRunResponse {@code true} for async query execution and {@code false} for blocking query
* execution. Makes returned cursor stage be chained after the RUN response arrives. Needed to have statement
* keys populated.
* @return stage with cursor.
*/
CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement, boolean waitForRunResponse );
CompletionStage<InternalStatementResultCursor> runInAutoCommitTransaction( Connection connection, Statement statement,
Bookmarks bookmarks, TransactionConfig config, boolean waitForRunResponse );

/**
* Execute the given statement in a running explicit transaction, i.e. {@link Transaction#run(Statement)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
import java.util.Objects;

import org.neo4j.driver.internal.Bookmarks;
import org.neo4j.driver.v1.TransactionConfig;
import org.neo4j.driver.v1.Value;

public class BeginMessage extends TransactionStartingMessage
{
public static final byte SIGNATURE = 0x11;

public BeginMessage( Bookmarks bookmarks, TransactionConfig config )
{
this( bookmarks, config.timeout(), config.metadata() );
}

public BeginMessage( Bookmarks bookmarks, Duration txTimeout, Map<String,Value> txMetadata )
{
super( bookmarks, txTimeout, txMetadata );
Expand Down
Loading