Skip to content

Commit 0b55f09

Browse files
authored
Merge pull request #521 from lutovich/1.7-bolt-protocol-commit-cleanup
Simplify `BoltProtocol#commitTransaction()`
2 parents 841f92b + 06c1220 commit 0b55f09

File tree

7 files changed

+114
-26
lines changed

7 files changed

+114
-26
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,12 @@ private CompletionStage<Void> doCommitAsync()
248248
return failedFuture( new ClientException( "Transaction can't be committed. " +
249249
"It has been rolled back either because of an error or explicit termination" ) );
250250
}
251-
return protocol.commitTransaction( connection, this );
251+
return protocol.commitTransaction( connection )
252+
.thenApply( newBookmarks ->
253+
{
254+
setBookmarks( newBookmarks );
255+
return null;
256+
} );
252257
}
253258

254259
private CompletionStage<Void> doRollbackAsync()
@@ -260,7 +265,7 @@ private CompletionStage<Void> doRollbackAsync()
260265
return protocol.rollbackTransaction( connection );
261266
}
262267

263-
private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursorFailure )
268+
private static BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursorFailure )
264269
{
265270
return ( ignore, commitOrRollbackError ) ->
266271
{

driver/src/main/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandler.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,36 +23,32 @@
2323
import java.util.concurrent.CompletableFuture;
2424

2525
import org.neo4j.driver.internal.Bookmarks;
26-
import org.neo4j.driver.internal.ExplicitTransaction;
2726
import org.neo4j.driver.internal.spi.ResponseHandler;
2827
import org.neo4j.driver.v1.Value;
2928

3029
import static java.util.Objects.requireNonNull;
3130

3231
public class CommitTxResponseHandler implements ResponseHandler
3332
{
34-
private final CompletableFuture<Void> commitFuture;
35-
private final ExplicitTransaction tx;
33+
private final CompletableFuture<Bookmarks> commitFuture;
3634

37-
public CommitTxResponseHandler( CompletableFuture<Void> commitFuture, ExplicitTransaction tx )
35+
public CommitTxResponseHandler( CompletableFuture<Bookmarks> commitFuture )
3836
{
3937
this.commitFuture = requireNonNull( commitFuture );
40-
this.tx = requireNonNull( tx );
4138
}
4239

4340
@Override
4441
public void onSuccess( Map<String,Value> metadata )
4542
{
4643
Value bookmarkValue = metadata.get( "bookmark" );
47-
if ( bookmarkValue != null )
44+
if ( bookmarkValue == null )
4845
{
49-
if ( tx != null )
50-
{
51-
tx.setBookmarks( Bookmarks.from( bookmarkValue.asString() ) );
52-
}
46+
commitFuture.complete( null );
47+
}
48+
else
49+
{
50+
commitFuture.complete( Bookmarks.from( bookmarkValue.asString() ) );
5351
}
54-
55-
commitFuture.complete( null );
5652
}
5753

5854
@Override

driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,9 @@ public interface BoltProtocol
7070
* Commit the explicit transaction.
7171
*
7272
* @param connection the connection to use.
73-
* @param tx the explicit transaction being committed. Parameter is needed to update bookmark.
74-
* @return a completion stage completed when transaction is committed or completed exceptionally when there was a failure.
73+
* @return a completion stage completed with a bookmark when transaction is committed or completed exceptionally when there was a failure.
7574
*/
76-
CompletionStage<Void> commitTransaction( Connection connection, ExplicitTransaction tx );
75+
CompletionStage<Bookmarks> commitTransaction( Connection connection );
7776

7877
/**
7978
* Rollback the explicit transaction.

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ public CompletionStage<Void> beginTransaction( Connection connection, Bookmarks
108108
}
109109

110110
@Override
111-
public CompletionStage<Void> commitTransaction( Connection connection, ExplicitTransaction tx )
111+
public CompletionStage<Bookmarks> commitTransaction( Connection connection )
112112
{
113-
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
113+
CompletableFuture<Bookmarks> commitFuture = new CompletableFuture<>();
114114

115-
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, tx );
115+
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture );
116116
connection.writeAndFlush(
117117
COMMIT_MESSAGE, NoOpResponseHandler.INSTANCE,
118118
PullAllMessage.PULL_ALL, pullAllHandler );

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ public CompletionStage<Void> beginTransaction( Connection connection, Bookmarks
101101
}
102102

103103
@Override
104-
public CompletionStage<Void> commitTransaction( Connection connection, ExplicitTransaction tx )
104+
public CompletionStage<Bookmarks> commitTransaction( Connection connection )
105105
{
106-
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
107-
connection.writeAndFlush( COMMIT, new CommitTxResponseHandler( commitFuture, tx ) );
106+
CompletableFuture<Bookmarks> commitFuture = new CompletableFuture<>();
107+
connection.writeAndFlush( COMMIT, new CommitTxResponseHandler( commitFuture ) );
108108
return commitFuture;
109109
}
110110

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.handlers;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
25+
import org.neo4j.driver.internal.Bookmarks;
26+
import org.neo4j.driver.v1.Value;
27+
28+
import static java.util.Collections.emptyMap;
29+
import static java.util.Collections.singletonMap;
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
import static org.junit.jupiter.api.Assertions.assertNull;
32+
import static org.junit.jupiter.api.Assertions.assertThrows;
33+
import static org.neo4j.driver.v1.Values.value;
34+
import static org.neo4j.driver.v1.util.TestUtil.await;
35+
36+
class CommitTxResponseHandlerTest
37+
{
38+
private final CompletableFuture<Bookmarks> future = new CompletableFuture<>();
39+
private final CommitTxResponseHandler handler = new CommitTxResponseHandler( future );
40+
41+
@Test
42+
void shouldHandleSuccessWithoutBookmark()
43+
{
44+
handler.onSuccess( emptyMap() );
45+
46+
assertNull( await( future ) );
47+
}
48+
49+
@Test
50+
void shouldHandleSuccessWithBookmark()
51+
{
52+
String bookmarkString = "neo4j:bookmark:v1:tx12345";
53+
54+
handler.onSuccess( singletonMap( "bookmark", value( bookmarkString ) ) );
55+
56+
assertEquals( Bookmarks.from( bookmarkString ), await( future ) );
57+
}
58+
59+
@Test
60+
void shouldHandleFailure()
61+
{
62+
RuntimeException error = new RuntimeException( "Hello" );
63+
64+
handler.onFailure( error );
65+
66+
RuntimeException receivedError = assertThrows( RuntimeException.class, () -> await( future ) );
67+
assertEquals( error, receivedError );
68+
}
69+
70+
@Test
71+
void shouldFailToHandleRecord()
72+
{
73+
assertThrows( UnsupportedOperationException.class, () -> handler.onRecord( new Value[]{value( 42 )} ) );
74+
}
75+
}

driver/src/test/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1Test.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,14 @@
6666
import static org.junit.jupiter.api.Assertions.assertTrue;
6767
import static org.mockito.ArgumentMatchers.any;
6868
import static org.mockito.ArgumentMatchers.eq;
69+
import static org.mockito.Mockito.doAnswer;
6970
import static org.mockito.Mockito.mock;
7071
import static org.mockito.Mockito.verify;
72+
import static org.mockito.Mockito.when;
7173
import static org.neo4j.driver.internal.util.Futures.blockingGet;
7274
import static org.neo4j.driver.v1.Values.value;
75+
import static org.neo4j.driver.v1.util.TestUtil.DEFAULT_TEST_PROTOCOL;
76+
import static org.neo4j.driver.v1.util.TestUtil.await;
7377
import static org.neo4j.driver.v1.util.TestUtil.connectionMock;
7478

7579
public class BoltProtocolV1Test
@@ -168,15 +172,24 @@ void shouldBeginTransactionWithBookmark()
168172
@Test
169173
void shouldCommitTransaction()
170174
{
171-
Connection connection = connectionMock();
175+
String bookmarkString = "neo4j:bookmark:v1:tx1909";
172176

173-
CompletionStage<Void> stage = protocol.commitTransaction( connection, mock( ExplicitTransaction.class ) );
177+
Connection connection = mock( Connection.class );
178+
when( connection.protocol() ).thenReturn( DEFAULT_TEST_PROTOCOL );
179+
doAnswer( invocation ->
180+
{
181+
ResponseHandler commitHandler = invocation.getArgument( 3 );
182+
commitHandler.onSuccess( singletonMap( "bookmark", value( bookmarkString ) ) );
183+
return null;
184+
} ).when( connection ).writeAndFlush( eq( new RunMessage( "COMMIT" ) ), any(), any(), any() );
185+
186+
CompletionStage<Bookmarks> stage = protocol.commitTransaction( connection );
174187

175188
verify( connection ).writeAndFlush(
176189
eq( new RunMessage( "COMMIT" ) ), eq( NoOpResponseHandler.INSTANCE ),
177190
eq( PullAllMessage.PULL_ALL ), any( CommitTxResponseHandler.class ) );
178191

179-
assertNull( Futures.blockingGet( stage ) );
192+
assertEquals( Bookmarks.from( bookmarkString ), await( stage ) );
180193
}
181194

182195
@Test

0 commit comments

Comments
 (0)