Skip to content

Commit 09f5c53

Browse files
author
Karthick Sankarachary
committed
In serial mode, resolve the fields in given order
1 parent aa07818 commit 09f5c53

File tree

4 files changed

+50
-33
lines changed

4 files changed

+50
-33
lines changed

src/main/java/graphql/async/ExecutionFuture.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@
88
import java.util.concurrent.CompletableFuture;
99
import java.util.concurrent.ExecutorService;
1010
import java.util.concurrent.ForkJoinPool;
11-
import java.util.function.Function;
1211

1312
import graphql.ExceptionWhileDataFetching;
1413
import graphql.ExecutionResult;
1514
import graphql.ExecutionResultImpl;
16-
import graphql.GraphQLError;
1715

1816
public class ExecutionFuture {
1917

@@ -31,17 +29,17 @@ public static CompletableFuture<ExecutionResult> completable(ExecutionResult exe
3129
Object executionData = executionResult != null ? executionResult.getData() : null;
3230
if (executionData instanceof CompletableFuture) {
3331
return ((CompletableFuture) executionData)
34-
.exceptionally((throwable) -> {
32+
.exceptionally(throwable -> {
3533
((ExecutionResultImpl) executionResult).addErrors(Arrays.asList(
3634
new ExceptionWhileDataFetching((Throwable) throwable)
3735
));
3836
return executionResult;
3937
})
4038
.thenApplyAsync(
41-
(completedData) -> {
42-
((ExecutionResultImpl) executionResult).setData(completedData);
43-
return executionResult;
44-
}, executorService);
39+
completedData -> {
40+
((ExecutionResultImpl) executionResult).setData(completedData);
41+
return executionResult;
42+
}, executorService);
4543
} else if (executionData instanceof Map) {
4644
final Map<String, Object> results = (Map<String, Object>) executionData;
4745
List<CompletableFuture<Object>> completables = new ArrayList<>();
@@ -50,19 +48,19 @@ public static CompletableFuture<ExecutionResult> completable(ExecutionResult exe
5048
if (fieldValue instanceof CompletableFuture) {
5149
completables.add(
5250
((CompletableFuture) fieldValue)
53-
.thenAccept((fieldValueDone) -> results.put(fieldName, fieldValueDone)));
51+
.thenAccept(fieldValueDone -> results.put(fieldName, fieldValueDone)));
5452
}
5553
}
5654
if (!completables.isEmpty()) {
5755
return CompletableFuture.allOf(array(completables))
58-
.exceptionally((throwable) -> {
56+
.exceptionally(throwable -> {
5957
((ExecutionResultImpl) executionResult).addErrors(Arrays.asList(
6058
new ExceptionWhileDataFetching(throwable)
6159
));
6260
return null;
6361
})
6462
.thenApply(
65-
(completablesDone) -> executionResult);
63+
completablesDone -> executionResult);
6664
}
6765
}
6866
return CompletableFuture.completedFuture(executionResult);

src/main/java/graphql/async/GraphQL.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import graphql.ExecutionResult;
99
import graphql.execution.AsyncExecutionStrategy;
1010
import graphql.execution.ExecutionStrategy;
11-
import graphql.execution.SimpleExecutionStrategy;
1211
import graphql.schema.GraphQLSchema;
1312

1413
import static graphql.Assert.assertNotNull;
@@ -44,7 +43,7 @@ public GraphQL(GraphQLSchema graphQLSchema) {
4443
*/
4544
public GraphQL(GraphQLSchema graphQLSchema, ExecutionStrategy queryStrategy) {
4645
//noinspection deprecation
47-
super(graphQLSchema, queryStrategy, AsyncExecutionStrategy.parallel());
46+
super(graphQLSchema, queryStrategy, AsyncExecutionStrategy.serial());
4847
}
4948

5049
/**
@@ -59,8 +58,12 @@ public GraphQL(GraphQLSchema graphQLSchema, ExecutionStrategy queryStrategy) {
5958
public GraphQL(GraphQLSchema graphQLSchema, ExecutionStrategy queryStrategy,
6059
ExecutionStrategy mutationStrategy) {
6160
super(graphQLSchema, queryStrategy, mutationStrategy);
62-
assert queryStrategy instanceof AsyncExecutionStrategy;
63-
assert mutationStrategy instanceof AsyncExecutionStrategy;
61+
assert queryStrategy instanceof AsyncExecutionStrategy :
62+
"Async graphql requires an async query strategy";
63+
assert mutationStrategy instanceof AsyncExecutionStrategy :
64+
"Async graphql requires an async mutation strategy";
65+
assert ((AsyncExecutionStrategy) mutationStrategy).isSerial() :
66+
"Async graphql requires a serial mutation strategy";
6467
}
6568

6669
/**

src/main/java/graphql/execution/AsyncExecutionStrategy.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package graphql.execution;
22

33
import java.util.ArrayList;
4-
import java.util.Arrays;
54
import java.util.LinkedHashMap;
65
import java.util.List;
76
import java.util.Map;
@@ -11,7 +10,6 @@
1110
import java.util.concurrent.ForkJoinPool;
1211
import java.util.function.Function;
1312
import java.util.stream.Collectors;
14-
import java.util.stream.Stream;
1513

1614
import graphql.ExceptionWhileDataFetching;
1715
import graphql.ExecutionResult;
@@ -21,8 +19,8 @@
2119
import graphql.schema.GraphQLObjectType;
2220
import graphql.schema.GraphQLType;
2321

24-
import static graphql.async.ExecutionFuture.completable;
2522
import static graphql.async.ExecutionFuture.array;
23+
import static graphql.async.ExecutionFuture.completable;
2624

2725
/**
2826
* <p>AsyncExecutionStrategy implements the {@link ExecutionStrategy} in a non-blocking manner.</p>
@@ -42,8 +40,8 @@
4240
*/
4341
public class AsyncExecutionStrategy extends ExecutionStrategy {
4442

45-
protected ExecutorService executorService;
4643
protected boolean serial;
44+
protected ExecutorService executorService;
4745

4846
public static AsyncExecutionStrategy serial() {
4947
return new AsyncExecutionStrategy(true);
@@ -70,6 +68,10 @@ protected AsyncExecutionStrategy(boolean serial, ExecutorService executorService
7068
this.executorService = executorService;
7169
}
7270

71+
public boolean isSerial() {
72+
return serial;
73+
}
74+
7375
/**
7476
* Resolve the given fields in parallel and return an execution result without blocking.
7577
*
@@ -87,17 +89,32 @@ public ExecutionResult execute(final ExecutionContext executionContext,
8789
Set<String> fieldNames = fields.keySet();
8890

8991
// Create tasks to resolve each of the fields
90-
(serial ? fieldNames.stream() : fieldNames.parallelStream()).forEach((fieldName) -> {
91-
fieldFutures.put(fieldName, CompletableFuture.supplyAsync(
92-
() -> resolveField(executionContext, parentType, source, fields.get(fieldName)),
93-
executorService));
94-
});
92+
CompletableFuture<ExecutionResult> previousField = CompletableFuture.completedFuture(null);
93+
for (String fieldName : fieldNames) {
94+
CompletableFuture<ExecutionResult> fieldFuture;
95+
if (serial) {
96+
// Block the current field until the previous field is done
97+
fieldFuture = previousField.thenCompose(result -> CompletableFuture.supplyAsync(
98+
() -> resolveField(executionContext, parentType, source, fields.get(fieldName)),
99+
executorService
100+
));
101+
previousField = fieldFuture;
102+
} else {
103+
// Resolve every field in parallel, independent of each other
104+
fieldFuture = CompletableFuture.supplyAsync(
105+
() -> resolveField(executionContext, parentType, source, fields.get(fieldName)),
106+
executorService
107+
);
108+
}
109+
fieldFutures.put(fieldName, fieldFuture);
110+
}
111+
95112

96113
// Prepare a completable for the map of field results
97114
CompletableFuture<Map<String, Object>> resultsFuture = CompletableFuture
98115
// First, wait for all the tasks above to complete
99116
.allOf(array(fieldFutures.values()))
100-
.exceptionally((throwable) -> {
117+
.exceptionally(throwable -> {
101118
executionContext.addError(new ExceptionWhileDataFetching(throwable));
102119
return null;
103120
})
@@ -107,25 +124,25 @@ public ExecutionResult execute(final ExecutionContext executionContext,
107124
return CompletableFuture
108125
.allOf(array(
109126
fieldNames.stream()
110-
.map((fieldName) -> {
127+
.map(fieldName -> {
111128
final ExecutionResult fieldResult = fieldFutures.get(fieldName).join();
112129
fieldResults.put(fieldName, fieldResult);
113130
return completable(fieldResult, executorService);
114131
})
115132
.collect(Collectors.<CompletableFuture<ExecutionResult>>toList())
116133
))
117-
.exceptionally((throwable) -> {
134+
.exceptionally(throwable -> {
118135
executionContext.addError(new ExceptionWhileDataFetching(throwable));
119136
return null;
120137
})
121-
.thenApplyAsync((resultsDone) -> fieldResults, executorService);
138+
.thenApplyAsync(resultsDone -> fieldResults, executorService);
122139
}, executorService)
123140
// Last, collect the results of the field into a map
124-
.exceptionally((throwable) -> {
141+
.exceptionally(throwable -> {
125142
executionContext.addError(new ExceptionWhileDataFetching(throwable));
126143
return new LinkedHashMap<>();
127144
})
128-
.thenApplyAsync((fieldResults) -> {
145+
.thenApplyAsync(fieldResults -> {
129146
Map<String, Object> results = new LinkedHashMap<>();
130147
for (String fieldName : fieldResults.keySet()) {
131148
ExecutionResult fieldResult = fieldResults.get(fieldName);
@@ -155,10 +172,10 @@ protected ExecutionResult completeValue(final ExecutionContext executionContext,
155172
return ((CompletableFuture<ExecutionResult>) result)
156173
.thenComposeAsync(
157174
(Function<Object, CompletableFuture<ExecutionResult>>)
158-
(completedResult) -> completable(
175+
completedResult -> completable(
159176
executionStrategy.completeValue(executionContext, fieldType, fields, completedResult),
160177
executorService), executorService)
161-
.exceptionally((throwable) -> {
178+
.exceptionally(throwable -> {
162179
executionContext.addError(new ExceptionWhileDataFetching(throwable));
163180
return null;
164181
})
@@ -180,7 +197,7 @@ protected ExecutionResult completeValueForList(ExecutionContext executionContext
180197
super.completeValueForList(executionContext, fieldType, fields, result);
181198
List<Object> completableResults = (List<Object>) executionResult.getData();
182199
List<Object> completedResults = new ArrayList<>();
183-
completableResults.forEach((completedResult) -> {
200+
completableResults.forEach(completedResult -> {
184201
completedResults.add((completedResult instanceof CompletableFuture) ?
185202
((CompletableFuture) completedResult).join() : completedResult);
186203
});

src/test/groovy/graphql/execution/AsyncExecutionStrategyTest.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ class AsyncExecutionStrategyTest extends Specification {
6464
new ThreadPoolExecutor.CallerRunsPolicy());
6565

6666
def graphQL = GraphQL.newAsyncGraphQL(NewsSchema.newsSchema)
67-
.queryExecutionStrategy(AsyncExecutionStrategy.serial(threadPoolExecutor))
6867
.build()
6968

7069
def received = new AtomicReference<Map>()

0 commit comments

Comments
 (0)