Skip to content

Commit

Permalink
fix reactor#2160 Accommodate ASYNC fusion in Mono.metrics()
Browse files Browse the repository at this point in the history
Reviewed-in: reactor#2168
Co-authored-by: Simon Baslé <sbasle@pivotal.io>
  • Loading branch information
alex-dukhno and simonbasle authored May 27, 2020
1 parent c44dc6c commit 7c401de
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ final public void cancel() {
}

@Override
final public void onComplete() {
public void onComplete() {
if (done) {
return;
}
Expand All @@ -124,7 +124,7 @@ final public void onError(Throwable e) {
}

@Override
final public void onNext(T t) {
public void onNext(T t) {
if (done) {
FluxMetrics.recordMalformed(commonTags, registry);
Operators.onNextDropped(t, actual.currentContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,42 @@ public void clear() {
}
}

@Override
public void onComplete() {
if (mode == ASYNC) {
actual.onComplete();
}
else {
if (done) {
return;
}
done = true;
FluxMetrics.recordOnComplete(commonTags, registry, subscribeToTerminateSample);
actual.onComplete();
}
}

@Override
public void onNext(T t) {
if (mode == ASYNC) {
actual.onNext(null);
}
else {
if (done) {
FluxMetrics.recordMalformed(commonTags, registry);
Operators.onNextDropped(t, actual.currentContext());
return;
}
done = true;
//TODO looks like we don't count onNext: `Mono.empty()` vs `Mono.just("foo")`
FluxMetrics.recordOnComplete(commonTags,
registry,
subscribeToTerminateSample);
actual.onNext(t);
actual.onComplete();
}
}

@Override
public boolean isEmpty() {
return qs == null || qs.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package reactor.core.publisher;

import java.time.Duration;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.micrometer.core.instrument.Counter;
Expand All @@ -36,6 +38,7 @@
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -456,4 +459,29 @@ public void flowDurationTagsConsistency() {

assertThat(uniqueTagKeySets).hasSize(1);
}

@Test
public void ensureFuseablePropagateOnComplete_inCaseOfAsyncFusion() {
Flux.fromIterable(Arrays.asList(1, 2, 3))
.metrics()
.flatMapIterable(Arrays::asList)
.as(StepVerifier::create)
.expectNext(1, 2, 3)
.expectComplete()
.verify(Duration.ofMillis(500));
}

@Test
public void ensureOnNextInAsyncModeIsCapableToPropagateNulls() {
Flux.using(() -> "irrelevant",
irrelevant -> Mono.fromSupplier(() -> Arrays.asList(1, 2, 3)),
irrelevant -> {
})
.metrics()
.flatMapIterable(Function.identity())
.as(StepVerifier::create)
.expectNext(1, 2, 3)
.expectComplete()
.verify(Duration.ofMillis(500));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package reactor.core.publisher;

import java.time.Duration;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.micrometer.core.instrument.Counter;
Expand All @@ -38,6 +40,7 @@
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.publisher.MonoMetrics.MetricsSubscriber;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -389,4 +392,28 @@ public void flowDurationTagsConsistency() {
assertThat(uniqueTagKeySets).hasSize(1);
}

@Test
public void ensureFuseablePropagateOnComplete_inCaseOfAsyncFusion() {
Mono.fromSupplier(() -> Arrays.asList(1, 2, 3))
.metrics()
.flatMapIterable(Function.identity())
.as(StepVerifier::create)
.expectNext(1, 2, 3)
.expectComplete()
.verify(Duration.ofMillis(500));
}

@Test
public void ensureOnNextInAsyncModeIsCapableToPropagateNulls() {
Mono.using(() -> "irrelevant",
irrelevant -> Mono.fromSupplier(() -> Arrays.asList(1, 2, 3)),
irrelevant -> {
})
.metrics()
.flatMapIterable(Function.identity())
.as(StepVerifier::create)
.expectNext(1, 2, 3)
.expectComplete()
.verify(Duration.ofMillis(500));
}
}

0 comments on commit 7c401de

Please sign in to comment.