|
16 | 16 | import graphql.servlet.core.GraphQLServletListener;
|
17 | 17 | import graphql.servlet.core.internal.GraphQLRequest;
|
18 | 18 | import graphql.servlet.core.internal.VariableMapper;
|
19 |
| -import graphql.servlet.input.BatchInputPreProcessResult; |
20 |
| -import graphql.servlet.input.BatchInputPreProcessor; |
21 |
| -import graphql.servlet.input.GraphQLBatchedInvocationInput; |
22 |
| -import graphql.servlet.input.GraphQLInvocationInputFactory; |
23 |
| -import graphql.servlet.input.GraphQLSingleInvocationInput; |
| 19 | +import graphql.servlet.input.*; |
24 | 20 | import org.reactivestreams.Publisher;
|
25 | 21 | import org.reactivestreams.Subscriber;
|
26 | 22 | import org.reactivestreams.Subscription;
|
27 | 23 | import org.slf4j.Logger;
|
28 | 24 | import org.slf4j.LoggerFactory;
|
29 |
| -import reactor.core.publisher.Flux; |
30 | 25 |
|
31 | 26 | import javax.servlet.AsyncContext;
|
32 | 27 | import javax.servlet.AsyncEvent;
|
33 | 28 | import javax.servlet.AsyncListener;
|
34 | 29 | import javax.servlet.Servlet;
|
35 |
| -import javax.servlet.ServletException; |
36 | 30 | import javax.servlet.http.HttpServlet;
|
37 | 31 | import javax.servlet.http.HttpServletRequest;
|
38 | 32 | import javax.servlet.http.HttpServletResponse;
|
39 | 33 | import javax.servlet.http.Part;
|
40 |
| -import java.io.BufferedInputStream; |
41 |
| -import java.io.ByteArrayOutputStream; |
42 |
| -import java.io.IOException; |
43 |
| -import java.io.InputStream; |
44 |
| -import java.io.Writer; |
45 |
| -import java.util.ArrayList; |
46 |
| -import java.util.Arrays; |
47 |
| -import java.util.HashMap; |
48 |
| -import java.util.Iterator; |
49 |
| -import java.util.List; |
50 |
| -import java.util.Map; |
51 |
| -import java.util.Objects; |
52 |
| -import java.util.Optional; |
| 34 | +import java.io.*; |
| 35 | +import java.util.*; |
53 | 36 | import java.util.concurrent.CountDownLatch;
|
54 | 37 | import java.util.concurrent.atomic.AtomicReference;
|
55 | 38 | import java.util.function.BiConsumer;
|
@@ -358,13 +341,13 @@ private void doRequest(HttpServletRequest request, HttpServletResponse response,
|
358 | 341 | }
|
359 | 342 |
|
360 | 343 | @Override
|
361 |
| - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { |
| 344 | + protected void doGet(HttpServletRequest req, HttpServletResponse resp) { |
362 | 345 | init();
|
363 | 346 | doRequestAsync(req, resp, getHandler);
|
364 | 347 | }
|
365 | 348 |
|
366 | 349 | @Override
|
367 |
| - protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { |
| 350 | + protected void doPost(HttpServletRequest req, HttpServletResponse resp) { |
368 | 351 | init();
|
369 | 352 | doRequestAsync(req, resp, postHandler);
|
370 | 353 | }
|
@@ -402,7 +385,7 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
|
402 | 385 | } else {
|
403 | 386 | publisher = new SingleSubscriberPublisher<>();
|
404 | 387 | ((SingleSubscriberPublisher<ExecutionResult>) publisher).offer(result);
|
405 |
| - publisher = Flux.merge(publisher, (Publisher<DeferredExecutionResult>) result.getExtensions().get(GraphQL.DEFERRED_RESULTS)); |
| 388 | + publisher = new MultiPublisher(publisher, (Publisher<DeferredExecutionResult>) result.getExtensions().get(GraphQL.DEFERRED_RESULTS)); |
406 | 389 | }
|
407 | 390 | publisher.subscribe(subscriber);
|
408 | 391 | if (isInAsyncThread) {
|
@@ -551,7 +534,6 @@ public void onStartAsync(AsyncEvent event) {
|
551 | 534 | }
|
552 | 535 | }
|
553 | 536 |
|
554 |
| - |
555 | 537 | private static class ExecutionResultSubscriber implements Subscriber<ExecutionResult> {
|
556 | 538 |
|
557 | 539 | private final AtomicReference<Subscription> subscriptionRef;
|
@@ -598,4 +580,21 @@ public void await() throws InterruptedException {
|
598 | 580 | completedLatch.await();
|
599 | 581 | }
|
600 | 582 | }
|
| 583 | + |
| 584 | + private static class MultiPublisher<T> implements Publisher<T> { |
| 585 | + |
| 586 | + private List<Publisher<T>> publishers; |
| 587 | + |
| 588 | + @SafeVarargs |
| 589 | + MultiPublisher(Publisher<T>... publishers) { |
| 590 | + this.publishers = Arrays.asList(publishers); |
| 591 | + } |
| 592 | + |
| 593 | + @Override |
| 594 | + public void subscribe(Subscriber<? super T> s) { |
| 595 | + publishers.forEach(publisher -> publisher.subscribe(s)); |
| 596 | + } |
| 597 | + |
| 598 | + } |
| 599 | + |
601 | 600 | }
|
0 commit comments