Skip to content

A PR for reactive streams support #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6e30220
Add a proof-of-concept for "Observer-like" batch loading
AlexandreCarlton May 12, 2024
95540ff
reactive streams support branch
bbakerman May 17, 2024
2cdba8a
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 17, 2024
1d78255
reactive streams support branch - merged master
bbakerman May 17, 2024
2032e33
Merge remote-tracking branch 'origin/master' into observer-batch-load…
AlexandreCarlton May 18, 2024
6b5a732
Eliminate *BatchObserver in favour of Publisher
AlexandreCarlton May 18, 2024
68d7f54
Use internal Assertions over Java's raw assert
AlexandreCarlton May 18, 2024
a3132b7
Remove handling of Throwable passed into onNext
AlexandreCarlton May 18, 2024
fbeffae
Expose `new*DataLoader` methods for *PublisherBatchLoader
AlexandreCarlton May 18, 2024
b2a662d
Copy/tweak original/ DataLoader tests for publisher equivalents
AlexandreCarlton May 18, 2024
0d0b2f8
Rename '*PublisherBatchLoader' to 'BatchPublisher'
AlexandreCarlton May 18, 2024
14002f6
Ensure DataLoaderSubscriber is only called by one thread
AlexandreCarlton May 18, 2024
0f303a8
Document Subscriber#onNext invocation order
AlexandreCarlton May 18, 2024
ce115fd
Merge branch 'reactive-streams-branch' into observer-batch-loader-pro…
bbakerman May 19, 2024
288be41
Merge pull request #148 from AlexandreCarlton/observer-batch-loader-p…
bbakerman May 19, 2024
e16fa65
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 20, 2024
a93112a
reactive streams support branch - getting it compiling
bbakerman May 20, 2024
74567fe
Making the Subscribers use a common base class
bbakerman May 21, 2024
4396624
Making the Subscribers use a common base class- synchronized on each …
bbakerman May 21, 2024
8a64483
Making the Subscribers use a common base class- now with failing test…
bbakerman May 21, 2024
3e8ac9c
Making the Subscribers use a common base class- fail the overall CF o…
bbakerman May 21, 2024
eb2b40c
Inline BatchPublisher tests into DataLoaderTest
AlexandreCarlton May 20, 2024
651e561
Fix MappedBatchPublisher loaders to work without cache
AlexandreCarlton May 20, 2024
8295396
Merge pull request #155 from AlexandreCarlton/migrate-publisher-tests
bbakerman May 22, 2024
86ec5c8
Merge remote-tracking branch 'origin/reactive-streams-branch' into re…
bbakerman May 22, 2024
6d3c4eb
Making the Subscribers use a common base class - merged in main branch
bbakerman May 22, 2024
3fddb8b
Merge pull request #154 from graphql-java/reactive-streams-common-pub…
bbakerman May 22, 2024
034c68f
More tests for Publishers
bbakerman May 22, 2024
b09ac60
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 23, 2024
5d826b8
Merge remote-tracking branch 'origin/master' into reactive-streams-br…
bbakerman May 23, 2024
8b344db
Now the builds pass - broken out the fixtures
bbakerman May 23, 2024
e9bfc2b
Merge pull request #158 from graphql-java/reactive-streams-branch-ext…
bbakerman May 23, 2024
91d3036
This moves the reactive code pout into its own package because DataLo…
bbakerman May 24, 2024
e98621b
renamed classes inline with their counterparts
bbakerman May 24, 2024
6523015
made them non public and created a static factory support class
bbakerman May 24, 2024
170ccf8
reorged method placement
bbakerman May 24, 2024
77fd0dd
Merge pull request #159 from graphql-java/reactive-streams-branch-mov…
bbakerman May 24, 2024
4b9356e
Added javadoc to publisher interfaces
bbakerman May 24, 2024
3c3cc99
Have MappedBatchPublisher take in a Set<K> keys
AlexandreCarlton May 26, 2024
2e82858
Add README sections for `*BatchPublisher`
AlexandreCarlton May 26, 2024
c3e6ee5
Merge pull request #160 from AlexandreCarlton/add-documentation-for-p…
bbakerman May 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More tests for Publishers
  • Loading branch information
bbakerman committed May 22, 2024
commit 034c68f4ca4faa17cf762230891324b04b3df7be
2 changes: 1 addition & 1 deletion src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
V value = entry.getValue();

Object callContext = callContextByKey.get(key);
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());

onNextValue(key, value, callContext, futures);

Expand Down
190 changes: 162 additions & 28 deletions src/test/java/org/dataloader/DataLoaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package org.dataloader;

import org.awaitility.Duration;
import org.dataloader.fixtures.CustomCacheMap;
import org.dataloader.fixtures.JsonObject;
import org.dataloader.fixtures.TestKit;
import org.dataloader.fixtures.User;
import org.dataloader.fixtures.UserManager;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.impl.DataLoaderAssertionException;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -35,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
Expand All @@ -47,6 +50,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.*;
import static org.awaitility.Awaitility.await;
import static org.dataloader.DataLoaderFactory.newDataLoader;
import static org.dataloader.DataLoaderFactory.newMappedDataLoader;
Expand Down Expand Up @@ -104,7 +108,7 @@ public void basic_map_batch_loading() {
mapOfResults.put(k, k);
}
});
return CompletableFuture.completedFuture(mapOfResults);
return completedFuture(mapOfResults);
};
DataLoader<String, String> loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader);

Expand Down Expand Up @@ -424,7 +428,7 @@ public void should_Allow_priming_the_cache_with_a_future(TestDataLoaderFactory f
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoader<String, String> identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls);

DataLoader<String, String> dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A"));
DataLoader<String, String> dlFluency = identityLoader.prime("A", completedFuture("A"));
assertThat(dlFluency, equalTo(identityLoader));

CompletableFuture<String> future1 = identityLoader.load("A");
Expand Down Expand Up @@ -992,7 +996,7 @@ public void batches_multiple_requests_with_max_batch_size(TestDataLoaderFactory

identityLoader.dispatch();

CompletableFuture.allOf(f1, f2, f3).join();
allOf(f1, f2, f3).join();

assertThat(f1.join(), equalTo(1));
assertThat(f2.join(), equalTo(2));
Expand Down Expand Up @@ -1035,13 +1039,13 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa

AtomicBoolean v4Called = new AtomicBoolean();

CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> {
supplyAsync(nullValue).thenAccept(v1 -> {
identityLoader.load("a");
CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> {
supplyAsync(nullValue).thenAccept(v2 -> {
identityLoader.load("b");
CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> {
supplyAsync(nullValue).thenAccept(v3 -> {
identityLoader.load("c");
CompletableFuture.supplyAsync(nullValue).thenAccept(
supplyAsync(nullValue).thenAccept(
v4 -> {
identityLoader.load("d");
v4Called.set(true);
Expand All @@ -1058,12 +1062,68 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa
singletonList(asList("a", "b", "c", "d"))));
}

@ParameterizedTest
@MethodSource("dataLoaderFactories")
public void should_blowup_after_N_keys(TestDataLoaderFactory factory) {
if (!(factory instanceof TestReactiveDataLoaderFactory)) {
return;
}
//
// if we blow up after emitting N keys, the N keys should work but the rest of the keys
// should be exceptional
DataLoader<Integer, Integer> identityLoader = ((TestReactiveDataLoaderFactory) factory).idLoaderBlowsUpsAfterN(3, new DataLoaderOptions(), new ArrayList<>());
CompletableFuture<Integer> cf1 = identityLoader.load(1);
CompletableFuture<Integer> cf2 = identityLoader.load(2);
CompletableFuture<Integer> cf3 = identityLoader.load(3);
CompletableFuture<Integer> cf4 = identityLoader.load(4);
CompletableFuture<Integer> cf5 = identityLoader.load(5);
identityLoader.dispatch();
await().until(cf5::isDone);

assertThat(cf1.join(), equalTo(1));
assertThat(cf2.join(), equalTo(2));
assertThat(cf3.join(), equalTo(3));
assertThat(cf4.isCompletedExceptionally(), is(true));
assertThat(cf5.isCompletedExceptionally(), is(true));

}

@ParameterizedTest
@MethodSource("dataLoaderFactories")
public void should_assert_values_size_equals_key_size(TestDataLoaderFactory factory) {
//
// what happens if we want 4 values but are only given 2 back say
//
DataLoader<String, String> identityLoader = factory.onlyReturnsNValues(2, new DataLoaderOptions(), new ArrayList<>());
CompletableFuture<String> cf1 = identityLoader.load("A");
CompletableFuture<String> cf2 = identityLoader.load("B");
CompletableFuture<String> cf3 = identityLoader.load("C");
CompletableFuture<String> cf4 = identityLoader.load("D");
identityLoader.dispatch();

await().atMost(Duration.FIVE_HUNDRED_MILLISECONDS).until(() -> cf1.isDone() && cf2.isDone() && cf3.isDone() && cf4.isDone());

if (factory instanceof ListDataLoaderFactory | factory instanceof PublisherDataLoaderFactory) {
assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class));
assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class));
assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class));
assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class));
} else {
// with the maps it's ok to have fewer results
assertThat(cf1.join(), equalTo("A"));
assertThat(cf2.join(), equalTo("B"));
assertThat(cf3.join(), equalTo(null));
assertThat(cf4.join(), equalTo(null));
}

}

@Test
public void can_call_a_loader_from_a_loader() throws Exception {
List<Collection<String>> deepLoadCalls = new ArrayList<>();
DataLoader<String, String> deepLoader = newDataLoader(keys -> {
deepLoadCalls.add(keys);
return CompletableFuture.completedFuture(keys);
return completedFuture(keys);
});

List<Collection<String>> aLoadCalls = new ArrayList<>();
Expand All @@ -1083,7 +1143,7 @@ public void can_call_a_loader_from_a_loader() throws Exception {
CompletableFuture<String> b1 = bLoader.load("B1");
CompletableFuture<String> b2 = bLoader.load("B2");

CompletableFuture.allOf(
allOf(
aLoader.dispatch(),
deepLoader.dispatch(),
bLoader.dispatch(),
Expand All @@ -1109,11 +1169,10 @@ public void can_call_a_loader_from_a_loader() throws Exception {
public void should_allow_composition_of_data_loader_calls() {
UserManager userManager = new UserManager();

BatchLoader<Long, User> userBatchLoader = userIds -> CompletableFuture
.supplyAsync(() -> userIds
.stream()
.map(userManager::loadUserById)
.collect(Collectors.toList()));
BatchLoader<Long, User> userBatchLoader = userIds -> supplyAsync(() -> userIds
.stream()
.map(userManager::loadUserById)
.collect(Collectors.toList()));
DataLoader<Long, User> userLoader = newDataLoader(userBatchLoader);

AtomicBoolean gandalfCalled = new AtomicBoolean(false);
Expand Down Expand Up @@ -1160,17 +1219,26 @@ private static Stream<Arguments> dataLoaderFactories() {

public interface TestDataLoaderFactory {
<K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls);

<K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Collection<K>> loadCalls);

<K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options, List<Collection<K>> loadCalls);

DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions options, List<Collection<Integer>> loadCalls);

DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls);
}

public interface TestReactiveDataLoaderFactory {
<K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls);
}

private static class ListDataLoaderFactory implements TestDataLoaderFactory {
@Override
public <K> DataLoader<K, K> idLoader(DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newDataLoader(keys -> {
loadCalls.add(new ArrayList<>(keys));
return CompletableFuture.completedFuture(keys);
return completedFuture(keys);
}, options);
}

Expand All @@ -1189,7 +1257,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(DataLoaderOptions options
loadCalls.add(new ArrayList<>(keys));

List<Object> errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList());
return CompletableFuture.completedFuture(errors);
return completedFuture(errors);
}, options);
}

Expand All @@ -1206,7 +1274,15 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(DataLoaderOptions o
errors.add(new IllegalStateException("Error"));
}
}
return CompletableFuture.completedFuture(errors);
return completedFuture(errors);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newDataLoader(keys -> {
loadCalls.add(new ArrayList<>(keys));
return completedFuture(keys.subList(0, N));
}, options);
}
}
Expand All @@ -1220,7 +1296,7 @@ public <K> DataLoader<K, K> idLoader(
loadCalls.add(new ArrayList<>(keys));
Map<K, K> map = new HashMap<>();
keys.forEach(k -> map.put(k, k));
return CompletableFuture.completedFuture(map);
return completedFuture(map);
}, options);
}

Expand All @@ -1239,7 +1315,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(
loadCalls.add(new ArrayList<>(keys));
Map<K, Object> errorByKey = new HashMap<>();
keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error")));
return CompletableFuture.completedFuture(errorByKey);
return completedFuture(errorByKey);
}, options);
}

Expand All @@ -1257,16 +1333,28 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
errorByKey.put(key, new IllegalStateException("Error"));
}
}
return CompletableFuture.completedFuture(errorByKey);
return completedFuture(errorByKey);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newMappedDataLoader(keys -> {
loadCalls.add(new ArrayList<>(keys));

Map<String, String> collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap(
k -> k, v -> v
));
return completedFuture(collect);
}, options);
}
}

private static class PublisherDataLoaderFactory implements TestDataLoaderFactory {
private static class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {

@Override
public <K> DataLoader<K, K> idLoader(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Flux.fromIterable(keys).subscribe(subscriber);
Expand All @@ -1283,7 +1371,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col

@Override
public <K> DataLoader<K, Object> idLoaderAllExceptions(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Stream<Try<Object>> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error")));
Expand All @@ -1293,7 +1381,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(

@Override
public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
return newPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

Expand All @@ -1308,13 +1396,36 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
Flux.fromIterable(errors).subscribe(subscriber);
}, options);
}

@Override
public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<K> nKeys = keys.subList(0, N);
Flux<K> subFlux = Flux.fromIterable(nKeys);
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
.subscribe(subscriber);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<String> nKeys = keys.subList(0, N);
Flux.fromIterable(nKeys)
.subscribe(subscriber);
}, options);
}
}

private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory {
private static class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory {

@Override
public <K> DataLoader<K, K> idLoader(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Map<K, K> map = new HashMap<>();
Expand All @@ -1333,7 +1444,7 @@ public <K> DataLoader<K, K> idLoaderBlowsUps(DataLoaderOptions options, List<Col

@Override
public <K> DataLoader<K, Object> idLoaderAllExceptions(
DataLoaderOptions options, List<Collection<K>> loadCalls) {
DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Stream<Map.Entry<K, Try<Object>>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error"))));
Expand All @@ -1343,7 +1454,7 @@ public <K> DataLoader<K, Object> idLoaderAllExceptions(

@Override
public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
DataLoaderOptions options, List<Collection<Integer>> loadCalls) {
return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

Expand All @@ -1358,6 +1469,29 @@ public DataLoader<Integer, Object> idLoaderOddEvenExceptions(
Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber);
}, options);
}

@Override
public <K> DataLoader<K, K> idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List<Collection<K>> loadCalls) {
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<K> nKeys = keys.subList(0, N);
Flux<Map.Entry<K, K>> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k));
subFlux.concatWith(Flux.error(new IllegalStateException("Error")))
.subscribe(subscriber);
}, options);
}

@Override
public DataLoader<String, String> onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList<Object> loadCalls) {
return newMappedPublisherDataLoader((keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));

List<String> nKeys = keys.subList(0, N);
Flux.fromIterable(nKeys).map(k -> Map.entry(k, k))
.subscribe(subscriber);
}, options);
}
}

private static class ThrowingCacheMap extends CustomCacheMap {
Expand Down