Skip to content

Commit

Permalink
fix #1242 Further metrics improvements around ASYNC fusion
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Jun 20, 2018
1 parent e013678 commit dfde3f4
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 19 deletions.
30 changes: 25 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/FluxMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -405,20 +405,40 @@ public void cancel() {
static final class MicrometerFluxMetricsFuseableSubscriber<T> extends MicrometerFluxMetricsSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {

private boolean syncFused;
private int fusionMode;

MicrometerFluxMetricsFuseableSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) {
super(actual, registry, clock, sequenceName, sequenceTags);
}

@Override
public void onNext(T t) {
if (this.fusionMode == Fuseable.ASYNC) {
actual.onNext(null);
return;
}

if (done) {
this.malformedSourceCounter.increment();
Operators.onNextDropped(t, actual.currentContext());
return;
}

//record the delay since previous onNext/onSubscribe. This also records the count.
long last = this.lastNextEventNanos;
this.lastNextEventNanos = clock.monotonicTime();
this.onNextIntervalTimer.record(lastNextEventNanos - last, TimeUnit.NANOSECONDS);

actual.onNext(t);
}

@Override
public int requestFusion(int mode) {
//Simply negotiate the fusion by delegating:
if (qs != null) {
int negotiated = qs.requestFusion(mode);
this.syncFused = negotiated == Fuseable.SYNC;
return negotiated;
this.fusionMode = qs.requestFusion(mode);
return fusionMode;
}
return Fuseable.NONE; //should not happen unless requestFusion called before subscribe
}
Expand All @@ -432,7 +452,7 @@ public T poll() {
try {
T v = qs.poll();

if (v == null && syncFused) {
if (v == null && fusionMode == SYNC) {
//this is also a complete event
this.subscribeToTerminateSample.stop(subscribeToCompleteTimer);
}
Expand Down
19 changes: 14 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/MonoMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,29 @@ public void cancel() {
static final class MicrometerMonoMetricsFuseableSubscriber<T> extends MicrometerMonoMetricsSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {

private boolean syncFused;
private int mode;

MicrometerMonoMetricsFuseableSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) {
super(actual, registry, clock, sequenceName, sequenceTags);
}

@Override
public void onNext(T t) {
// if (this.mode == ASYNC) {
// actual.onNext(null);
// }
// else {
super.onNext(t);
// }
}

@Override
public int requestFusion(int mode) {
//Simply negotiate the fusion by delegating:
if (qs != null) {
int negotiated = qs.requestFusion(mode);
this.syncFused = negotiated == Fuseable.SYNC;
return negotiated;
this.mode = qs.requestFusion(mode);
return this.mode;
}
return Fuseable.NONE; //should not happen unless requestFusion called before subscribe
}
Expand All @@ -255,7 +264,7 @@ public T poll() {
try {
T v = qs.poll();

if (v == null && syncFused) {
if (v == null && this.mode == SYNC) {
//this is also a complete event
this.subscribeToTerminateSample.stop(subscribeToCompleteTimer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;

import static org.assertj.core.api.Assertions.*;
Expand Down Expand Up @@ -315,13 +316,20 @@ public void onNextTimerCountsFuseable() {

@Test
public void subscribeToCompleteFuseable() {
//not really fuseable, goes through onComplete path, but tests FluxMetricsFuseable at least
Flux<String> source = Flux.just(1)
.delayElements(Duration.ofMillis(100))
.doOnNext(v -> {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
e.printStackTrace();
}
})
.map(i -> "foo");
new FluxMetricsFuseable<>(source, registry)
.log()
.blockLast();
StepVerifier.create(new FluxMetricsFuseable<>(source, registry))
.expectFusion(Fuseable.SYNC) //just only supports SYNC
.expectNext("foo")
.verifyComplete();

Timer stcCompleteTimer = registry.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_COMPLETE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -38,7 +39,9 @@
import reactor.core.Disposable;
import reactor.core.Fuseable;
import reactor.core.publisher.MonoMetrics.MicrometerMonoMetricsFuseableSubscriber;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.*;
import static reactor.core.publisher.FluxMetrics.*;
Expand Down Expand Up @@ -301,10 +304,16 @@ public void noOnNextTimerFuseable() {

@Test
public void subscribeToCompleteFuseable() {
Mono<String> source = Mono.delay(Duration.ofMillis(100))
.map(i -> "foo");
new MonoMetricsFuseable<>(source, registry)
.block();
Mono<String> source = Mono.fromCallable(() -> {
Thread.sleep(100);
return "foo";
});

StepVerifier.create(new MonoMetricsFuseable<>(source, registry))
.expectFusion(Fuseable.ASYNC)
.expectNext("foo")
.verifyComplete();


Timer stcCompleteTimer = registry.find(METER_FLOW_DURATION)
.tag(TAG_STATUS, TAGVALUE_ON_COMPLETE)
Expand Down

0 comments on commit dfde3f4

Please sign in to comment.