Skip to content

Commit cd34055

Browse files
committed
Defer support with RxJava
1 parent 8ea7d83 commit cd34055

File tree

5 files changed

+97
-8
lines changed

5 files changed

+97
-8
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ dependencies {
5353
// Servlet
5454
compile 'javax.servlet:javax.servlet-api:3.1.0'
5555
compile 'javax.websocket:javax.websocket-api:1.1'
56+
compile 'io.projectreactor:reactor-core:3.0.5.RELEASE'
5657

5758
// GraphQL
5859
compile "com.graphql-java:graphql-java:$LIB_GRAPHQL_JAVA_VER"

src/main/java/graphql/servlet/AbstractGraphQLHttpServlet.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import com.google.common.io.ByteStreams;
44
import com.google.common.io.CharStreams;
5+
import graphql.DeferredExecutionResult;
56
import graphql.ExecutionResult;
7+
import graphql.GraphQL;
8+
import graphql.execution.reactive.SingleSubscriberPublisher;
69
import graphql.introspection.IntrospectionQuery;
710
import graphql.schema.GraphQLFieldDefinition;
811
import graphql.servlet.config.GraphQLConfiguration;
@@ -23,6 +26,7 @@
2326
import org.reactivestreams.Subscription;
2427
import org.slf4j.Logger;
2528
import org.slf4j.LoggerFactory;
29+
import reactor.core.publisher.Flux;
2630

2731
import javax.servlet.AsyncContext;
2832
import javax.servlet.AsyncEvent;
@@ -373,7 +377,9 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
373377
HttpServletRequest req, HttpServletResponse resp) throws IOException {
374378
ExecutionResult result = queryInvoker.query(invocationInput);
375379

376-
if (!(result.getData() instanceof Publisher)) {
380+
boolean isDeferred = Objects.nonNull(result.getExtensions()) && result.getExtensions().containsKey(GraphQL.DEFERRED_RESULTS);
381+
382+
if (!(result.getData() instanceof Publisher || isDeferred)) {
377383
resp.setContentType(APPLICATION_JSON_UTF8);
378384
resp.setStatus(STATUS_OK);
379385
resp.getWriter().write(graphQLObjectMapper.serializeResultAsJson(result));
@@ -390,7 +396,15 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
390396
AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
391397
asyncContext.addListener(new SubscriptionAsyncListener(subscriptionRef));
392398
ExecutionResultSubscriber subscriber = new ExecutionResultSubscriber(subscriptionRef, asyncContext, graphQLObjectMapper);
393-
((Publisher<ExecutionResult>) result.getData()).subscribe(subscriber);
399+
Publisher<ExecutionResult> publisher;
400+
if (result.getData() instanceof Publisher) {
401+
publisher = result.getData();
402+
} else {
403+
publisher = new SingleSubscriberPublisher<>();
404+
((SingleSubscriberPublisher<ExecutionResult>) publisher).offer(result);
405+
publisher = Flux.merge(publisher, (Publisher<DeferredExecutionResult>) result.getExtensions().get(GraphQL.DEFERRED_RESULTS));
406+
}
407+
publisher.subscribe(subscriber);
394408
if (isInAsyncThread) {
395409
// We need to delay the completion of async context until after the subscription has terminated, otherwise the AsyncContext is prematurely closed.
396410
try {

src/main/java/graphql/servlet/core/GraphQLObjectMapper.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@
55
import com.fasterxml.jackson.databind.MappingIterator;
66
import com.fasterxml.jackson.databind.ObjectMapper;
77
import com.fasterxml.jackson.databind.ObjectReader;
8-
import graphql.ExecutionResult;
9-
import graphql.ExecutionResultImpl;
10-
import graphql.GraphQLError;
8+
import graphql.*;
9+
import graphql.execution.ExecutionPath;
1110
import graphql.servlet.config.ConfiguringObjectMapperProvider;
1211
import graphql.servlet.config.ObjectMapperConfigurer;
1312
import graphql.servlet.config.ObjectMapperProvider;
@@ -117,12 +116,19 @@ public ExecutionResult sanitizeErrors(ExecutionResult executionResult) {
117116
} else {
118117
errors = null;
119118
}
120-
121119
return new ExecutionResultImpl(data, errors, extensions);
122120
}
123121

124122
public Map<String, Object> createResultFromExecutionResult(ExecutionResult executionResult) {
125-
return convertSanitizedExecutionResult(sanitizeErrors(executionResult));
123+
ExecutionResult sanitizedExecutionResult = sanitizeErrors(executionResult);
124+
if (executionResult instanceof DeferredExecutionResult) {
125+
sanitizedExecutionResult = DeferredExecutionResultImpl
126+
.newDeferredExecutionResult()
127+
.from(executionResult)
128+
.path(ExecutionPath.fromList(((DeferredExecutionResult) executionResult).getPath()))
129+
.build();
130+
}
131+
return convertSanitizedExecutionResult(sanitizedExecutionResult);
126132
}
127133

128134
public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult executionResult) {
@@ -144,6 +150,10 @@ public Map<String, Object> convertSanitizedExecutionResult(ExecutionResult execu
144150
result.put("data", executionResult.getData());
145151
}
146152

153+
if (executionResult instanceof DeferredExecutionResult) {
154+
result.put("path", ((DeferredExecutionResult) executionResult).getPath());
155+
}
156+
147157
return result;
148158
}
149159

src/test/groovy/graphql/servlet/AbstractGraphQLHttpServletSpec.groovy

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,28 @@ class AbstractGraphQLHttpServletSpec extends Specification {
283283
getBatchedResponseContent()[1].data.echo == "test"
284284
}
285285

286+
287+
def "deferred query over HTTP GET"() {
288+
setup:
289+
request.addParameter('query', 'query { deferred(arg:"test") @defer }')
290+
291+
when:
292+
servlet.doGet(request, response)
293+
294+
then:
295+
response.getStatus() == STATUS_OK
296+
response.getContentType() == CONTENT_TYPE_SERVER_SENT_EVENTS
297+
getSubscriptionResponseContent()[0].data.deferred == null
298+
299+
when:
300+
subscriptionLatch.await(1, TimeUnit.SECONDS)
301+
302+
then:
303+
def content = getSubscriptionResponseContent()
304+
content[1].data == "test"
305+
content[1].path == ["deferred"]
306+
}
307+
286308
def "Batch Execution Handler allows limiting batches and sending error messages."() {
287309
setup:
288310
servlet = TestUtils.createBatchCustomizedServlet({ env -> env.arguments.arg }, { env -> env.arguments.arg }, { env ->
@@ -1030,6 +1052,31 @@ class AbstractGraphQLHttpServletSpec extends Specification {
10301052
getSubscriptionResponseContent()[1].data.echo == "Second\n\ntest"
10311053
}
10321054

1055+
def "defer query over HTTP POST"() {
1056+
setup:
1057+
servlet = TestUtils.createDefaultServlet()
1058+
request.setContent(mapper.writeValueAsBytes([
1059+
query: 'query { deferred(arg:"test") @defer }'
1060+
]))
1061+
request.setAsyncSupported(true)
1062+
1063+
when:
1064+
servlet.doPost(request, response)
1065+
1066+
then:
1067+
response.getStatus() == STATUS_OK
1068+
response.getContentType() == CONTENT_TYPE_SERVER_SENT_EVENTS
1069+
getSubscriptionResponseContent()[0].data.deferred == null
1070+
1071+
when:
1072+
subscriptionLatch.await(1, TimeUnit.SECONDS)
1073+
1074+
then:
1075+
def content = getSubscriptionResponseContent()
1076+
content[1].data == "test"
1077+
content[1].path == ["deferred"]
1078+
}
1079+
10331080
def "errors before graphql schema execution return internal server error"() {
10341081
setup:
10351082
servlet = SimpleGraphQLHttpServlet.newBuilder(GraphQLInvocationInputFactory.newBuilder {

src/test/groovy/graphql/servlet/TestUtils.groovy

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package graphql.servlet
22

33
import com.google.common.io.ByteStreams
4+
import graphql.Directives
45
import graphql.Scalars
56
import graphql.execution.reactive.SingleSubscriberPublisher
67
import graphql.schema.*
@@ -15,6 +16,7 @@ import graphql.servlet.core.ApolloScalars
1516
import graphql.servlet.input.BatchInputPreProcessor
1617
import graphql.servlet.context.ContextSetting
1718

19+
import java.util.concurrent.CompletableFuture
1820
import java.util.concurrent.atomic.AtomicReference
1921

2022
class TestUtils {
@@ -95,7 +97,7 @@ class TestUtils {
9597
static def createGraphQlSchema(DataFetcher queryDataFetcher = { env -> env.arguments.arg },
9698
DataFetcher mutationDataFetcher = { env -> env.arguments.arg },
9799
DataFetcher subscriptionDataFetcher = { env ->
98-
AtomicReference<SingleSubscriberPublisher<String>> publisherRef = new AtomicReference<>();
100+
AtomicReference<SingleSubscriberPublisher<String>> publisherRef = new AtomicReference<>()
99101
publisherRef.set(new SingleSubscriberPublisher<>({ subscription ->
100102
publisherRef.get().offer(env.arguments.arg)
101103
publisherRef.get().noMoreData()
@@ -118,6 +120,20 @@ class TestUtils {
118120
field.type(new GraphQLNonNull(Scalars.GraphQLString))
119121
field.dataFetcher({ env -> null })
120122
}
123+
.field { GraphQLFieldDefinition.Builder field ->
124+
field.name("deferred")
125+
field.type(Scalars.GraphQLString)
126+
field.argument { argument ->
127+
argument.name("arg")
128+
argument.type(Scalars.GraphQLString)
129+
}
130+
field.dataFetcher({ env ->
131+
return CompletableFuture.supplyAsync( {
132+
Thread.sleep(100)
133+
env.arguments.arg
134+
})
135+
})
136+
}
121137
.build()
122138

123139
GraphQLObjectType mutation = GraphQLObjectType.newObject()
@@ -174,6 +190,7 @@ class TestUtils {
174190
.mutation(mutation)
175191
.subscription(subscription)
176192
.additionalType(ApolloScalars.Upload)
193+
.additionalDirective(Directives.DeferDirective)
177194
.build()
178195
}
179196

0 commit comments

Comments
 (0)