Skip to content

Commit

Permalink
Allowing CosmosPageFlux.handle consumers to get chained (#20040)
Browse files Browse the repository at this point in the history
  • Loading branch information
FabianMeiswinkel authored Mar 23, 2021
1 parent 9b585e2 commit 9554519
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ public final class CosmosPagedFlux<T> extends ContinuablePagedFlux<String, T, Fe
/**
* Handle for invoking "side-effects" on each FeedResponse returned by CosmosPagedFlux
*
* @param feedResponseConsumer handler
* @param newFeedResponseConsumer handler
* @return CosmosPagedFlux instance with attached handler
*/
@Beta(value = Beta.SinceVersion.V4_6_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> feedResponseConsumer) {
return new CosmosPagedFlux<T>(this.optionsFluxFunction, feedResponseConsumer);
public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> newFeedResponseConsumer) {
if (this.feedResponseConsumer != null) {
return new CosmosPagedFlux<T>(
this.optionsFluxFunction,
this.feedResponseConsumer.andThen(newFeedResponseConsumer));
} else {
return new CosmosPagedFlux<T>(this.optionsFluxFunction, newFeedResponseConsumer);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,38 @@ public void readAllItemsByPageWithCosmosPagedFluxHandler() throws Exception {
cosmosAsyncContainer.readAllItems(cosmosQueryRequestOptions, ObjectNode.class);

AtomicInteger handleCount = new AtomicInteger();
AtomicInteger chainedHandleCount = new AtomicInteger();
AtomicInteger yetAnotherChainedHandleCount = new AtomicInteger();
cosmosPagedFlux = cosmosPagedFlux.handle(feedResponse -> {
CosmosDiagnostics cosmosDiagnostics = feedResponse.getCosmosDiagnostics();
if (cosmosDiagnostics != null) {
handleCount.incrementAndGet();
}
});

cosmosPagedFlux = cosmosPagedFlux.handle(feedResponse -> {
CosmosDiagnostics cosmosDiagnostics = feedResponse.getCosmosDiagnostics();
if (cosmosDiagnostics != null) {
chainedHandleCount.incrementAndGet();
}
});

cosmosPagedFlux = cosmosPagedFlux.handle(feedResponse -> {
CosmosDiagnostics cosmosDiagnostics = feedResponse.getCosmosDiagnostics();
if (cosmosDiagnostics != null) {
yetAnotherChainedHandleCount.incrementAndGet();
}
});

AtomicInteger feedResponseCount = new AtomicInteger();
cosmosPagedFlux.byPage().toIterable().forEach(feedResponse -> {
feedResponseCount.incrementAndGet();
});

assertThat(handleCount.get() >= 1).isTrue();
assertThat(handleCount.get()).isEqualTo(feedResponseCount.get());
assertThat(handleCount.get()).isEqualTo(chainedHandleCount.get());
assertThat(handleCount.get()).isEqualTo(yetAnotherChainedHandleCount.get());
}

@Test(groups = { "simple" }, timeOut = TIMEOUT)
Expand Down

0 comments on commit 9554519

Please sign in to comment.