@@ -48,6 +48,7 @@ public class BasicPullResponseHandler implements PullResponseHandler {
4848 protected final MetadataExtractor metadataExtractor ;
4949 protected final Connection connection ;
5050 private final PullResponseCompletionListener completionListener ;
51+ private final boolean syncSignals ;
5152
5253 private State state ;
5354 private long toRequest ;
@@ -60,31 +61,105 @@ public BasicPullResponseHandler(
6061 Connection connection ,
6162 MetadataExtractor metadataExtractor ,
6263 PullResponseCompletionListener completionListener ) {
64+ this (query , runResponseHandler , connection , metadataExtractor , completionListener , false );
65+ }
66+
67+ public BasicPullResponseHandler (
68+ Query query ,
69+ RunResponseHandler runResponseHandler ,
70+ Connection connection ,
71+ MetadataExtractor metadataExtractor ,
72+ PullResponseCompletionListener completionListener ,
73+ boolean syncSignals ) {
6374 this .query = requireNonNull (query );
6475 this .runResponseHandler = requireNonNull (runResponseHandler );
6576 this .metadataExtractor = requireNonNull (metadataExtractor );
6677 this .connection = requireNonNull (connection );
6778 this .completionListener = requireNonNull (completionListener );
79+ this .syncSignals = syncSignals ;
6880
6981 this .state = State .READY_STATE ;
7082 }
7183
7284 @ Override
73- public synchronized void onSuccess (Map <String , Value > metadata ) {
74- assertRecordAndSummaryConsumerInstalled ();
75- state .onSuccess (this , metadata );
85+ public void onSuccess (Map <String , Value > metadata ) {
86+ State newState ;
87+ BiConsumer <Record , Throwable > recordConsumer = null ;
88+ BiConsumer <ResultSummary , Throwable > summaryConsumer = null ;
89+ ResultSummary summary = null ;
90+ Neo4jException exception = null ;
91+ synchronized (this ) {
92+ assertRecordAndSummaryConsumerInstalled ();
93+ state .onSuccess (this , metadata );
94+ newState = state ;
95+ if (newState == State .SUCCEEDED_STATE ) {
96+ completionListener .afterSuccess (metadata );
97+ try {
98+ summary = extractResultSummary (metadata );
99+ } catch (Neo4jException e ) {
100+ summary = extractResultSummary (emptyMap ());
101+ exception = e ;
102+ }
103+ recordConsumer = this .recordConsumer ;
104+ summaryConsumer = this .summaryConsumer ;
105+ if (syncSignals ) {
106+ complete (summaryConsumer , recordConsumer , summary , exception );
107+ }
108+ dispose ();
109+ } else if (newState == State .READY_STATE ) {
110+ if (toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE ) {
111+ request (toRequest );
112+ toRequest = 0 ;
113+ }
114+ // summary consumer use (null, null) to identify done handling of success with has_more
115+ this .summaryConsumer .accept (null , null );
116+ }
117+ }
118+ if (!syncSignals && newState == State .SUCCEEDED_STATE ) {
119+ complete (summaryConsumer , recordConsumer , summary , exception );
120+ }
76121 }
77122
78123 @ Override
79- public synchronized void onFailure (Throwable error ) {
80- assertRecordAndSummaryConsumerInstalled ();
81- state .onFailure (this , error );
124+ public void onFailure (Throwable error ) {
125+ BiConsumer <Record , Throwable > recordConsumer ;
126+ BiConsumer <ResultSummary , Throwable > summaryConsumer ;
127+ ResultSummary summary ;
128+ synchronized (this ) {
129+ assertRecordAndSummaryConsumerInstalled ();
130+ state .onFailure (this , error );
131+ completionListener .afterFailure (error );
132+ summary = extractResultSummary (emptyMap ());
133+ recordConsumer = this .recordConsumer ;
134+ summaryConsumer = this .summaryConsumer ;
135+ if (syncSignals ) {
136+ complete (summaryConsumer , recordConsumer , summary , error );
137+ }
138+ dispose ();
139+ }
140+ if (!syncSignals ) {
141+ complete (summaryConsumer , recordConsumer , summary , error );
142+ }
82143 }
83144
84145 @ Override
85- public synchronized void onRecord (Value [] fields ) {
86- assertRecordAndSummaryConsumerInstalled ();
87- state .onRecord (this , fields );
146+ public void onRecord (Value [] fields ) {
147+ State newState ;
148+ Record record = null ;
149+ synchronized (this ) {
150+ assertRecordAndSummaryConsumerInstalled ();
151+ state .onRecord (this , fields );
152+ newState = state ;
153+ if (newState == State .STREAMING_STATE ) {
154+ record = new InternalRecord (runResponseHandler .queryKeys (), fields );
155+ if (syncSignals ) {
156+ recordConsumer .accept (record , null );
157+ }
158+ }
159+ }
160+ if (!syncSignals && newState == State .STREAMING_STATE ) {
161+ recordConsumer .accept (record , null );
162+ }
88163 }
89164
90165 @ Override
@@ -99,38 +174,6 @@ public synchronized void cancel() {
99174 state .cancel (this );
100175 }
101176
102- protected void completeWithFailure (Throwable error ) {
103- completionListener .afterFailure (error );
104- complete (extractResultSummary (emptyMap ()), error );
105- }
106-
107- protected void completeWithSuccess (Map <String , Value > metadata ) {
108- completionListener .afterSuccess (metadata );
109- ResultSummary summary ;
110- Neo4jException exception = null ;
111- try {
112- summary = extractResultSummary (metadata );
113- } catch (Neo4jException e ) {
114- summary = extractResultSummary (emptyMap ());
115- exception = e ;
116- }
117- complete (summary , exception );
118- }
119-
120- protected void successHasMore () {
121- if (toRequest > 0 || toRequest == UNLIMITED_FETCH_SIZE ) {
122- request (toRequest );
123- toRequest = 0 ;
124- }
125- // summary consumer use (null, null) to identify done handling of success with has_more
126- summaryConsumer .accept (null , null );
127- }
128-
129- protected void handleRecord (Value [] fields ) {
130- Record record = new InternalRecord (runResponseHandler .queryKeys (), fields );
131- recordConsumer .accept (record , null );
132- }
133-
134177 protected void writePull (long n ) {
135178 connection .writeAndFlush (new PullMessage (n , runResponseHandler .queryId ()), this );
136179 }
@@ -198,12 +241,15 @@ private void assertRecordAndSummaryConsumerInstalled() {
198241 }
199242 }
200243
201- private void complete (ResultSummary summary , Throwable error ) {
244+ private void complete (
245+ BiConsumer <ResultSummary , Throwable > summaryConsumer ,
246+ BiConsumer <Record , Throwable > recordConsumer ,
247+ ResultSummary summary ,
248+ Throwable error ) {
202249 // we first inform the summary consumer to ensure when streaming finished, summary is definitely available.
203250 summaryConsumer .accept (summary , error );
204251 // record consumer use (null, null) to identify the end of record stream
205252 recordConsumer .accept (null , error );
206- dispose ();
207253 }
208254
209255 private void dispose () {
@@ -226,13 +272,11 @@ enum State {
226272 @ Override
227273 void onSuccess (BasicPullResponseHandler context , Map <String , Value > metadata ) {
228274 context .state (SUCCEEDED_STATE );
229- context .completeWithSuccess (metadata );
230275 }
231276
232277 @ Override
233278 void onFailure (BasicPullResponseHandler context , Throwable error ) {
234279 context .state (FAILURE_STATE );
235- context .completeWithFailure (error );
236280 }
237281
238282 @ Override
@@ -257,23 +301,19 @@ void cancel(BasicPullResponseHandler context) {
257301 void onSuccess (BasicPullResponseHandler context , Map <String , Value > metadata ) {
258302 if (metadata .getOrDefault ("has_more" , BooleanValue .FALSE ).asBoolean ()) {
259303 context .state (READY_STATE );
260- context .successHasMore ();
261304 } else {
262305 context .state (SUCCEEDED_STATE );
263- context .completeWithSuccess (metadata );
264306 }
265307 }
266308
267309 @ Override
268310 void onFailure (BasicPullResponseHandler context , Throwable error ) {
269311 context .state (FAILURE_STATE );
270- context .completeWithFailure (error );
271312 }
272313
273314 @ Override
274315 void onRecord (BasicPullResponseHandler context , Value [] fields ) {
275316 context .state (STREAMING_STATE );
276- context .handleRecord (fields );
277317 }
278318
279319 @ Override
@@ -295,14 +335,12 @@ void onSuccess(BasicPullResponseHandler context, Map<String, Value> metadata) {
295335 context .discardAll ();
296336 } else {
297337 context .state (SUCCEEDED_STATE );
298- context .completeWithSuccess (metadata );
299338 }
300339 }
301340
302341 @ Override
303342 void onFailure (BasicPullResponseHandler context , Throwable error ) {
304343 context .state (FAILURE_STATE );
305- context .completeWithFailure (error );
306344 }
307345
308346 @ Override
@@ -324,13 +362,11 @@ void cancel(BasicPullResponseHandler context) {
324362 @ Override
325363 void onSuccess (BasicPullResponseHandler context , Map <String , Value > metadata ) {
326364 context .state (SUCCEEDED_STATE );
327- context .completeWithSuccess (metadata );
328365 }
329366
330367 @ Override
331368 void onFailure (BasicPullResponseHandler context , Throwable error ) {
332369 context .state (FAILURE_STATE );
333- context .completeWithFailure (error );
334370 }
335371
336372 @ Override
@@ -352,13 +388,11 @@ void cancel(BasicPullResponseHandler context) {
352388 @ Override
353389 void onSuccess (BasicPullResponseHandler context , Map <String , Value > metadata ) {
354390 context .state (SUCCEEDED_STATE );
355- context .completeWithSuccess (metadata );
356391 }
357392
358393 @ Override
359394 void onFailure (BasicPullResponseHandler context , Throwable error ) {
360395 context .state (FAILURE_STATE );
361- context .completeWithFailure (error );
362396 }
363397
364398 @ Override
0 commit comments