Skip to content

Commit dbc0e4c

Browse files
committed
Expose transaction timeout and metadata in the API
This is the first draft of the API to allow transaction timeout and/or metadata be set for explicit and auto-commit transactions. Both settings can be specified in a `TransactionConfig` object which can be created like this: ``` TransactionConfig config = TransactionConfig.builder() .withTimeout(Duration.ofSeconds(5)) .withMetadata(Collections.singletonMap("key", "value")) .build(); ``` Explicit transactions can take config like this: ``` Transaction tx = session.beginTransaction(config); ``` and auto-commit transactions accept it in various overloads of `Session#run()` and `Session#runAsync()`: ``` session.run("CREATE ()", config); session.runAsync("RETURN $x", Collections.singletonMap("x", 1) config); ``` Transaction functions accept config like this: ``` session.readTransaction(tx -> {...}, config); session.writeTransactionAsync(tx -> {...}, config); ``` There also exists a possibility to specify default transaction configuration per driver. It can be configured in the driver's config: ``` Config driverConfig = Config.build() .withDefaultTransactionConfig(txConfig) .toConfig(); ``` More tests and javadocs will be added in subsequent commits.
1 parent bb31f06 commit dbc0e4c

29 files changed

+702
-95
lines changed

driver/src/main/java/org/neo4j/driver/internal/AbstractStatementRunner.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CompletionStage;
2323

2424
import org.neo4j.driver.internal.types.InternalTypeSystem;
25+
import org.neo4j.driver.internal.util.Extract;
2526
import org.neo4j.driver.internal.value.MapValue;
2627
import org.neo4j.driver.v1.Record;
2728
import org.neo4j.driver.v1.Statement;
@@ -32,10 +33,6 @@
3233
import org.neo4j.driver.v1.Values;
3334
import org.neo4j.driver.v1.types.TypeSystem;
3435

35-
import static org.neo4j.driver.internal.util.Extract.assertParameter;
36-
import static org.neo4j.driver.internal.util.Iterables.newHashMapWithSize;
37-
import static org.neo4j.driver.v1.Values.value;
38-
3936
abstract class AbstractStatementRunner implements StatementRunner
4037
{
4138
@Override
@@ -103,14 +100,6 @@ private static Value parameters( Map<String,Object> map )
103100
{
104101
return Values.EmptyMap;
105102
}
106-
107-
Map<String,Value> asValues = newHashMapWithSize( map.size() );
108-
for ( Map.Entry<String,Object> entry : map.entrySet() )
109-
{
110-
Object value = entry.getValue();
111-
assertParameter( value );
112-
asValues.put( entry.getKey(), value( value ) );
113-
}
114-
return new MapValue( asValues );
103+
return new MapValue( Extract.mapOfValues( map ) );
115104
}
116105
}

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.neo4j.driver.v1.StatementResult;
3232
import org.neo4j.driver.v1.StatementResultCursor;
3333
import org.neo4j.driver.v1.Transaction;
34+
import org.neo4j.driver.v1.TransactionConfig;
3435
import org.neo4j.driver.v1.exceptions.ClientException;
3536

3637
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
@@ -78,9 +79,9 @@ public ExplicitTransaction( Connection connection, NetworkSession session )
7879
this.resultCursors = new ResultCursorsHolder();
7980
}
8081

81-
public CompletionStage<ExplicitTransaction> beginAsync( Bookmarks initialBookmarks )
82+
public CompletionStage<ExplicitTransaction> beginAsync( Bookmarks initialBookmarks, TransactionConfig config )
8283
{
83-
return protocol.beginTransaction( connection, initialBookmarks )
84+
return protocol.beginTransaction( connection, initialBookmarks, config )
8485
.handle( ( ignore, beginError ) ->
8586
{
8687
if ( beginError != null )

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.neo4j.driver.internal.util.Futures;
2424
import org.neo4j.driver.v1.AccessMode;
2525
import org.neo4j.driver.v1.Logging;
26+
import org.neo4j.driver.v1.TransactionConfig;
2627

2728
import static java.lang.System.lineSeparator;
2829

@@ -31,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession
3132
private final String stackTrace;
3233

3334
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
34-
Logging logging )
35+
TransactionConfig defaultTransactionConfig, Logging logging )
3536
{
36-
super( connectionProvider, mode, retryLogic, logging );
37+
super( connectionProvider, mode, retryLogic, defaultTransactionConfig, logging );
3738
this.stackTrace = captureStackTrace();
3839
}
3940

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.Map;
2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
@@ -36,9 +37,11 @@
3637
import org.neo4j.driver.v1.StatementResult;
3738
import org.neo4j.driver.v1.StatementResultCursor;
3839
import org.neo4j.driver.v1.Transaction;
40+
import org.neo4j.driver.v1.TransactionConfig;
3941
import org.neo4j.driver.v1.TransactionWork;
4042
import org.neo4j.driver.v1.exceptions.ClientException;
4143

44+
import static java.util.Collections.emptyMap;
4245
import static java.util.concurrent.CompletableFuture.completedFuture;
4346
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4447
import static org.neo4j.driver.internal.util.Futures.failedFuture;
@@ -50,6 +53,7 @@ public class NetworkSession extends AbstractStatementRunner implements Session
5053
private final ConnectionProvider connectionProvider;
5154
private final AccessMode mode;
5255
private final RetryLogic retryLogic;
56+
private final TransactionConfig defaultTransactionConfig;
5357
protected final Logger logger;
5458

5559
private volatile Bookmarks bookmarks = Bookmarks.empty();
@@ -60,18 +64,37 @@ public class NetworkSession extends AbstractStatementRunner implements Session
6064
private final AtomicBoolean open = new AtomicBoolean( true );
6165

6266
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
63-
Logging logging )
67+
TransactionConfig defaultTransactionConfig, Logging logging )
6468
{
6569
this.connectionProvider = connectionProvider;
6670
this.mode = mode;
6771
this.retryLogic = retryLogic;
72+
this.defaultTransactionConfig = defaultTransactionConfig;
6873
this.logger = new PrefixedLogger( "[" + hashCode() + "]", logging.getLog( LOG_NAME ) );
6974
}
7075

7176
@Override
7277
public StatementResult run( Statement statement )
7378
{
74-
StatementResultCursor cursor = Futures.blockingGet( run( statement, false ),
79+
return run( statement, TransactionConfig.empty() );
80+
}
81+
82+
@Override
83+
public StatementResult run( String statement, TransactionConfig config )
84+
{
85+
return run( statement, emptyMap(), config );
86+
}
87+
88+
@Override
89+
public StatementResult run( String statement, Map<String,Object> parameters, TransactionConfig config )
90+
{
91+
return run( new Statement( statement, parameters ), config );
92+
}
93+
94+
@Override
95+
public StatementResult run( Statement statement, TransactionConfig config )
96+
{
97+
StatementResultCursor cursor = Futures.blockingGet( run( statement, config, false ),
7598
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) );
7699

77100
// query executed, it is safe to obtain a connection in a blocking way
@@ -81,9 +104,27 @@ public StatementResult run( Statement statement )
81104

82105
@Override
83106
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
107+
{
108+
return runAsync( statement, TransactionConfig.empty() );
109+
}
110+
111+
@Override
112+
public CompletionStage<StatementResultCursor> runAsync( String statement, TransactionConfig config )
113+
{
114+
return runAsync( statement, emptyMap(), config );
115+
}
116+
117+
@Override
118+
public CompletionStage<StatementResultCursor> runAsync( String statement, Map<String,Object> parameters, TransactionConfig config )
119+
{
120+
return runAsync( new Statement( statement, parameters ), config );
121+
}
122+
123+
@Override
124+
public CompletionStage<StatementResultCursor> runAsync( Statement statement, TransactionConfig config )
84125
{
85126
//noinspection unchecked
86-
return (CompletionStage) run( statement, true );
127+
return (CompletionStage) run( statement, config, true );
87128
}
88129

89130
@Override
@@ -131,7 +172,13 @@ public CompletionStage<Void> closeAsync()
131172
@Override
132173
public Transaction beginTransaction()
133174
{
134-
return beginTransaction( mode );
175+
return beginTransaction( defaultTransactionConfig );
176+
}
177+
178+
@Override
179+
public Transaction beginTransaction( TransactionConfig config )
180+
{
181+
return beginTransaction( mode, config );
135182
}
136183

137184
@Deprecated
@@ -144,33 +191,63 @@ public Transaction beginTransaction( String bookmark )
144191

145192
@Override
146193
public CompletionStage<Transaction> beginTransactionAsync()
194+
{
195+
return beginTransactionAsync( defaultTransactionConfig );
196+
}
197+
198+
@Override
199+
public CompletionStage<Transaction> beginTransactionAsync( TransactionConfig config )
147200
{
148201
//noinspection unchecked
149-
return (CompletionStage) beginTransactionAsync( mode );
202+
return (CompletionStage) beginTransactionAsync( mode, config );
150203
}
151204

152205
@Override
153206
public <T> T readTransaction( TransactionWork<T> work )
154207
{
155-
return transaction( AccessMode.READ, work );
208+
return readTransaction( work, defaultTransactionConfig );
209+
}
210+
211+
@Override
212+
public <T> T readTransaction( TransactionWork<T> work, TransactionConfig config )
213+
{
214+
return transaction( AccessMode.READ, work, config );
156215
}
157216

158217
@Override
159218
public <T> CompletionStage<T> readTransactionAsync( TransactionWork<CompletionStage<T>> work )
160219
{
161-
return transactionAsync( AccessMode.READ, work );
220+
return readTransactionAsync( work, defaultTransactionConfig );
221+
}
222+
223+
@Override
224+
public <T> CompletionStage<T> readTransactionAsync( TransactionWork<CompletionStage<T>> work, TransactionConfig config )
225+
{
226+
return transactionAsync( AccessMode.READ, work, config );
162227
}
163228

164229
@Override
165230
public <T> T writeTransaction( TransactionWork<T> work )
166231
{
167-
return transaction( AccessMode.WRITE, work );
232+
return writeTransaction( work, defaultTransactionConfig );
233+
}
234+
235+
@Override
236+
public <T> T writeTransaction( TransactionWork<T> work, TransactionConfig config )
237+
{
238+
return transaction( AccessMode.WRITE, work, config );
168239
}
169240

170241
@Override
171242
public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionStage<T>> work )
172243
{
173-
return transactionAsync( AccessMode.WRITE, work );
244+
return writeTransactionAsync( work, defaultTransactionConfig );
245+
}
246+
247+
@Override
248+
public <T> CompletionStage<T> writeTransactionAsync( TransactionWork<CompletionStage<T>> work, TransactionConfig config )
249+
{
250+
return transactionAsync( AccessMode.WRITE, work, config );
174251
}
175252

176253
void setBookmarks( Bookmarks bookmarks )
@@ -225,15 +302,15 @@ CompletionStage<Boolean> currentConnectionIsOpen()
225302
connection.isOpen() ); // and it's still open
226303
}
227304

228-
private <T> T transaction( AccessMode mode, TransactionWork<T> work )
305+
private <T> T transaction( AccessMode mode, TransactionWork<T> work, TransactionConfig config )
229306
{
230307
// use different code path compared to async so that work is executed in the caller thread
231308
// caller thread will also be the one who sleeps between retries;
232309
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
233310
// event loop thread will bock and wait for itself to read some data
234311
return retryLogic.retry( () ->
235312
{
236-
try ( Transaction tx = beginTransaction( mode ) )
313+
try ( Transaction tx = beginTransaction( mode, config ) )
237314
{
238315
try
239316
{
@@ -252,12 +329,12 @@ private <T> T transaction( AccessMode mode, TransactionWork<T> work )
252329
} );
253330
}
254331

255-
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work )
332+
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work, TransactionConfig config )
256333
{
257334
return retryLogic.retryAsync( () ->
258335
{
259336
CompletableFuture<T> resultFuture = new CompletableFuture<>();
260-
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode );
337+
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode, config );
261338

262339
txFuture.whenComplete( ( tx, completionError ) ->
263340
{
@@ -358,26 +435,27 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
358435
}
359436
}
360437

361-
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean waitForRunResponse )
438+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, TransactionConfig config, boolean waitForRunResponse )
362439
{
363440
ensureSessionIsOpen();
364441

365442
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
366443
.thenCompose( ignore -> acquireConnection( mode ) )
367-
.thenCompose( connection -> connection.protocol().runInAutoCommitTransaction( connection, statement, waitForRunResponse ) );
444+
.thenCompose( connection ->
445+
connection.protocol().runInAutoCommitTransaction( connection, statement, bookmarks, config, waitForRunResponse ) );
368446

369447
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
370448

371449
return newResultCursorStage;
372450
}
373451

374-
private Transaction beginTransaction( AccessMode mode )
452+
private Transaction beginTransaction( AccessMode mode, TransactionConfig config )
375453
{
376-
return Futures.blockingGet( beginTransactionAsync( mode ),
454+
return Futures.blockingGet( beginTransactionAsync( mode, config ),
377455
() -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) );
378456
}
379457

380-
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
458+
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode, TransactionConfig config )
381459
{
382460
ensureSessionIsOpen();
383461

@@ -387,7 +465,7 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m
387465
.thenCompose( connection ->
388466
{
389467
ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
390-
return tx.beginAsync( bookmarks );
468+
return tx.beginAsync( bookmarks, config );
391469
} );
392470

393471
// update the reference to the only known transaction

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@
2626
import org.neo4j.driver.v1.Config;
2727
import org.neo4j.driver.v1.Logging;
2828
import org.neo4j.driver.v1.Session;
29+
import org.neo4j.driver.v1.TransactionConfig;
2930

3031
public class SessionFactoryImpl implements SessionFactory
3132
{
3233
private final ConnectionProvider connectionProvider;
3334
private final RetryLogic retryLogic;
35+
private final TransactionConfig defaultTransactionConfig;
3436
private final Logging logging;
3537
private final boolean leakedSessionsLoggingEnabled;
3638

@@ -39,6 +41,7 @@ public class SessionFactoryImpl implements SessionFactory
3941
this.connectionProvider = connectionProvider;
4042
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
4143
this.retryLogic = retryLogic;
44+
this.defaultTransactionConfig = config.defaultTransactionConfig();
4245
this.logging = config.logging();
4346
}
4447

@@ -78,7 +81,7 @@ private NetworkSession createSession( ConnectionProvider connectionProvider, Ret
7881
AccessMode mode, Logging logging )
7982
{
8083
return leakedSessionsLoggingEnabled
81-
? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, logging )
82-
: new NetworkSession( connectionProvider, mode, retryLogic, logging );
84+
? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, defaultTransactionConfig, logging )
85+
: new NetworkSession( connectionProvider, mode, retryLogic, defaultTransactionConfig, logging );
8386
}
8487
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import java.util.concurrent.CompletionException;
2323
import java.util.concurrent.CompletionStage;
2424

25+
import org.neo4j.driver.internal.Bookmarks;
2526
import org.neo4j.driver.internal.spi.Connection;
2627
import org.neo4j.driver.internal.util.Futures;
2728
import org.neo4j.driver.internal.util.ServerVersion;
2829
import org.neo4j.driver.v1.Record;
2930
import org.neo4j.driver.v1.Statement;
3031
import org.neo4j.driver.v1.StatementResultCursor;
32+
import org.neo4j.driver.v1.TransactionConfig;
3133
import org.neo4j.driver.v1.exceptions.ClientException;
3234

3335
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
@@ -60,7 +62,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6062
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
6163
{
6264
return connection.protocol()
63-
.runInAutoCommitTransaction( connection, procedure, true )
65+
.runInAutoCommitTransaction( connection, procedure, Bookmarks.empty(), TransactionConfig.empty(), true )
6466
.thenCompose( StatementResultCursor::listAsync );
6567
}
6668

0 commit comments

Comments
 (0)