Skip to content

Defer support #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Test cleanup - no need for its own field
  • Loading branch information
JacobMountain committed Aug 7, 2019
commit 110813b4b007ed53e6b543111c36000d6ea5a50c
31 changes: 11 additions & 20 deletions src/main/java/graphql/servlet/AbstractGraphQLHttpServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.io.ByteStreams;
import com.google.common.io.CharStreams;
import graphql.DeferredExecutionResult;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.execution.reactive.SingleSubscriberPublisher;
Expand Down Expand Up @@ -379,15 +378,16 @@ private void query(GraphQLQueryInvoker queryInvoker, GraphQLObjectMapper graphQL
AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
asyncContext.addListener(new SubscriptionAsyncListener(subscriptionRef));
ExecutionResultSubscriber subscriber = new ExecutionResultSubscriber(subscriptionRef, asyncContext, graphQLObjectMapper);
Publisher<ExecutionResult> publisher;
List<Publisher<ExecutionResult>> publishers = new ArrayList<>();
if (result.getData() instanceof Publisher) {
publisher = result.getData();
publishers.add(result.getData());
} else {
publisher = new SingleSubscriberPublisher<>();
((SingleSubscriberPublisher<ExecutionResult>) publisher).offer(result);
publisher = new MultiPublisher<>(publisher, (Publisher<ExecutionResult>) result.getExtensions().get(GraphQL.DEFERRED_RESULTS));
publishers.add(new StaticDataPublisher<>(result));
final Publisher<ExecutionResult> deferredResultsPublisher = (Publisher<ExecutionResult>) result.getExtensions().get(GraphQL.DEFERRED_RESULTS);
publishers.add(deferredResultsPublisher);
}
publisher.subscribe(subscriber);
publishers.forEach(it -> it.subscribe(subscriber));

if (isInAsyncThread) {
// We need to delay the completion of async context until after the subscription has terminated, otherwise the AsyncContext is prematurely closed.
try {
Expand Down Expand Up @@ -581,20 +581,11 @@ public void await() throws InterruptedException {
}
}

private static class MultiPublisher<T> implements Publisher<T> {

private List<Publisher<T>> publishers;

@SafeVarargs
MultiPublisher(Publisher<T>... publishers) {
this.publishers = Arrays.asList(publishers);
private static class StaticDataPublisher<T> extends SingleSubscriberPublisher<T> implements Publisher<T> {
StaticDataPublisher(T data) {
super();
super.offer(data);
}

@Override
public void subscribe(Subscriber<? super T> s) {
publishers.forEach(publisher -> publisher.subscribe(s));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -286,23 +286,23 @@ class AbstractGraphQLHttpServletSpec extends Specification {

def "deferred query over HTTP GET"() {
setup:
request.addParameter('query', 'query { deferred(arg:"test") @defer }')
request.addParameter('query', 'query { echo(arg:"test") @defer }')

when:
servlet.doGet(request, response)

then:
response.getStatus() == STATUS_OK
response.getContentType() == CONTENT_TYPE_SERVER_SENT_EVENTS
getSubscriptionResponseContent()[0].data.deferred == null
getSubscriptionResponseContent()[0].data.echo == null

when:
subscriptionLatch.await(1, TimeUnit.SECONDS)

then:
def content = getSubscriptionResponseContent()
content[1].data == "test"
content[1].path == ["deferred"]
content[1].path == ["echo"]
}

def "Batch Execution Handler allows limiting batches and sending error messages."() {
Expand Down Expand Up @@ -1056,7 +1056,7 @@ class AbstractGraphQLHttpServletSpec extends Specification {
setup:
servlet = TestUtils.createDefaultServlet()
request.setContent(mapper.writeValueAsBytes([
query: 'query { deferred(arg:"test") @defer }'
query: 'query { echo(arg:"test") @defer }'
]))
request.setAsyncSupported(true)

Expand All @@ -1066,15 +1066,15 @@ class AbstractGraphQLHttpServletSpec extends Specification {
then:
response.getStatus() == STATUS_OK
response.getContentType() == CONTENT_TYPE_SERVER_SENT_EVENTS
getSubscriptionResponseContent()[0].data.deferred == null
getSubscriptionResponseContent()[0].data.echo == null

when:
subscriptionLatch.await(1, TimeUnit.SECONDS)

then:
def content = getSubscriptionResponseContent()
content[1].data == "test"
content[1].path == ["deferred"]
content[1].path == ["echo"]
}

def "errors before graphql schema execution return internal server error"() {
Expand Down
14 changes: 0 additions & 14 deletions src/test/groovy/graphql/servlet/TestUtils.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,6 @@ class TestUtils {
field.type(new GraphQLNonNull(Scalars.GraphQLString))
field.dataFetcher({ env -> null })
}
.field { GraphQLFieldDefinition.Builder field ->
field.name("deferred")
field.type(Scalars.GraphQLString)
field.argument { argument ->
argument.name("arg")
argument.type(Scalars.GraphQLString)
}
field.dataFetcher({ env ->
return CompletableFuture.supplyAsync( {
Thread.sleep(100)
env.arguments.arg
})
})
}
.build()

GraphQLObjectType mutation = GraphQLObjectType.newObject()
Expand Down