2525import org .apache .commons .lang3 .concurrent .BasicThreadFactory ;
2626import org .apache .tinkerpop .gremlin .util .message .RequestMessage ;
2727import org .apache .tinkerpop .gremlin .util .message .ResponseMessage ;
28- import org .apache .tinkerpop .gremlin .util .message .ResponseStatusCode ;
2928
3029import java .util .ArrayList ;
30+ import java .util .HashMap ;
3131import java .util .List ;
32+ import java .util .Map ;
33+ import java .util .UUID ;
3234import java .util .concurrent .CompletableFuture ;
3335import java .util .concurrent .TimeUnit ;
3436import java .util .function .Consumer ;
@@ -49,7 +51,7 @@ public AbstractClient(final String threadPattern) {
4951
5052 @ Override
5153 public void submit (final RequestMessage requestMessage , final Consumer <ResponseMessage > callback ) throws Exception {
52- callbackResponseHandler .callback = callback ;
54+ callbackResponseHandler .callbackByRequestId . put ( requestMessage . getRequestId (), callback ) ;
5355 writeAndFlush (requestMessage );
5456 }
5557
@@ -65,7 +67,7 @@ public List<ResponseMessage> submit(final RequestMessage requestMessage) throws
6567 public CompletableFuture <List <ResponseMessage >> submitAsync (final RequestMessage requestMessage ) throws Exception {
6668 final List <ResponseMessage > results = new ArrayList <>();
6769 final CompletableFuture <List <ResponseMessage >> f = new CompletableFuture <>();
68- callbackResponseHandler .callback = response -> {
70+ callbackResponseHandler .callbackByRequestId . put ( requestMessage . getRequestId (), response -> {
6971 if (f .isDone ())
7072 throw new RuntimeException ("A terminating message was already encountered - no more messages should have been received" );
7173
@@ -75,19 +77,19 @@ public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage
7577 if (response .getStatus ().getCode ().isFinalResponse ()) {
7678 f .complete (results );
7779 }
78- };
80+ }) ;
7981
8082 writeAndFlush (requestMessage );
8183
8284 return f ;
8385 }
8486
8587 static class CallbackResponseHandler extends SimpleChannelInboundHandler <ResponseMessage > {
86- public Consumer <ResponseMessage > callback ;
88+ public Map < UUID , Consumer <ResponseMessage >> callbackByRequestId = new HashMap <>() ;
8789
8890 @ Override
8991 protected void channelRead0 (final ChannelHandlerContext channelHandlerContext , final ResponseMessage response ) throws Exception {
90- callback .accept (response );
92+ callbackByRequestId . get ( response . getRequestId ()) .accept (response );
9193 }
9294 }
9395}
0 commit comments