Skip to content

Commit

Permalink
fix #1242 Improve metrics on Flux and Mono
Browse files Browse the repository at this point in the history
 - split Mono implementation from Flux implementation
 - Mono doesn't record onNext delays nor requests
 - rename meter 'reactor.subscribe.to.terminate' to 'reactor.flow
 .duration'
 - remove 'reactor' prefix from tag names and rename some:
   - 'reactor.termination.type' to 'status'
   - 'reactor.sequence.name' to 'flow'
   - 'reactor.sequence.type' to 'type'
 - duration meter tagged with `status == ERROR` is lazily created to
   also be tagged with the Exception's class name
  • Loading branch information
simonbasle committed Jun 15, 2018
1 parent a879741 commit 1c64c8a
Show file tree
Hide file tree
Showing 8 changed files with 1,189 additions and 140 deletions.
91 changes: 53 additions & 38 deletions reactor-core/src/main/java/reactor/core/publisher/FluxMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,44 +90,51 @@ static boolean isMicrometerAvailable() {
* Meter that counts the number of events received from a malformed source (ie an
* onNext after an onComplete).
*/
static final String METER_MALFORMED = "reactor.malformed.source";
static final String METER_MALFORMED = "reactor.malformed.source";
/**
* Meter that counts the number of subscriptions to a sequence.
*/
static final String METER_SUBSCRIBED = "reactor.subscribed";
static final String METER_SUBSCRIBED = "reactor.subscribed";
/**
* Meter that times the duration between the subscription and the sequence's terminal
* event. The timer is also using the {@link #TAG_TERMINATION_TYPE} tag to determine
* event. The timer is also using the {@link #TAG_STATUS} tag to determine
* which kind of event terminated the sequence.
*/
static final String METER_SUBSCRIBE_TO_TERMINATE = "reactor.subscribe.to.terminate";
static final String METER_FLOW_DURATION = "reactor.flow.duration";
/**
* Meter that times the delays between each onNext (or between the first onNext and
* the onSubscribe event).
*/
static final String METER_ON_NEXT_DELAY = "reactor.onNext.delay";
static final String METER_ON_NEXT_DELAY = "reactor.onNext.delay";
/**
* Meter that tracks the request amount, in {@link Flux#name(String) named}
* sequences only.
*/
static final String METER_REQUESTED = "reactor.requested";
static final String METER_REQUESTED = "reactor.requested";

/**
* Tag used by {@link #METER_SUBSCRIBE_TO_TERMINATE} to mark what kind of terminating
* Tag used by {@link #METER_FLOW_DURATION} to mark what kind of terminating
* event occurred: {@link #TAGVALUE_ON_COMPLETE}, {@link #TAGVALUE_ON_ERROR} or
* {@link #TAGVALUE_CANCEL}.
*/
static final String TAG_TERMINATION_TYPE = "reactor.termination.type";
static final String TAG_STATUS = "status";

/**
* Tag used by {@link #METER_FLOW_DURATION} when {@link #TAG_STATUS}
* is {@link #TAGVALUE_ON_ERROR}, to store the exception that occurred.
*/
static final String TAG_EXCEPTION = "exception";

/**
* Tag bearing the sequence's name, as given by the {@link Flux#name(String)} operator.
*/
static final String TAG_SEQUENCE_NAME = "reactor.sequence.name";
static final String TAG_SEQUENCE_NAME = "flow";
/**
* Tag bearing the sequence's type, {@link Flux} or {@link Mono}.
* @see #TAGVALUE_FLUX
* @see #TAGVALUE_MONO
*/
static final String TAG_SEQUENCE_TYPE = "reactor.sequence.type";
static final String TAG_SEQUENCE_TYPE = "type";

//... tag values are free-for-all
static final String TAGVALUE_ON_ERROR = "onError";
Expand Down Expand Up @@ -206,11 +213,11 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (registryCandidate != null) {
registry = registryCandidate;
}
source.subscribe(new MicrometerMetricsSubscriber<>(actual, registry,
Clock.SYSTEM, this.name, this.tags, false));
source.subscribe(new MicrometerFluxMetricsSubscriber<>(actual, registry,
Clock.SYSTEM, this.name, this.tags));
}

static class MicrometerMetricsSubscriber<T> implements InnerOperator<T,T> {
static class MicrometerFluxMetricsSubscriber<T> implements InnerOperator<T,T> {

final CoreSubscriber<? super T> actual;
final MeterRegistry registry;
Expand All @@ -228,45 +235,44 @@ static class MicrometerMetricsSubscriber<T> implements InnerOperator<T,T> {
Fuseable.QueueSubscription<T> qs;
Subscription s;

final Timer onNextIntervalTimer;
final Timer subscribeToCompleteTimer;
final Timer subscribeToErrorTimer;
final Timer subscribeToCancelTimer;
final Timer onNextIntervalTimer;
final Timer subscribeToCompleteTimer;
final Timer.Builder subscribeToErrorTimerBuilder;
final Timer subscribeToCancelTimer;

MicrometerMetricsSubscriber(CoreSubscriber<? super T> actual,
MicrometerFluxMetricsSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry,
Clock clock,
String sequenceName,
List<Tag> sequenceTags,
boolean monoSource) {
List<Tag> sequenceTags) {
this.actual = actual;
this.registry = registry;
this.clock = clock;

List<Tag> commonTags = new ArrayList<>();
commonTags.add(Tag.of(TAG_SEQUENCE_NAME, sequenceName));
commonTags.add(Tag.of(TAG_SEQUENCE_TYPE, monoSource ? TAGVALUE_MONO : TAGVALUE_FLUX));
commonTags.add(Tag.of(TAG_SEQUENCE_TYPE, TAGVALUE_FLUX));
commonTags.addAll(sequenceTags);

this.subscribeToCompleteTimer = Timer
.builder(METER_SUBSCRIBE_TO_TERMINATE)
.builder(METER_FLOW_DURATION)
.tags(commonTags)
.tag(TAG_TERMINATION_TYPE, TAGVALUE_ON_COMPLETE)
.tag(TAG_STATUS, TAGVALUE_ON_COMPLETE)
.description("Times the duration elapsed between a subscription and the onComplete termination of the sequence")
.register(registry);
this.subscribeToErrorTimer = Timer
.builder(METER_SUBSCRIBE_TO_TERMINATE)
.tags(commonTags)
.tag(TAG_TERMINATION_TYPE, TAGVALUE_ON_ERROR)
.description("Times the duration elapsed between a subscription and the onError termination of the sequence")
.register(registry);
this.subscribeToCancelTimer = Timer
.builder(METER_SUBSCRIBE_TO_TERMINATE)
.builder(METER_FLOW_DURATION)
.tags(commonTags)
.tag(TAG_TERMINATION_TYPE, TAGVALUE_CANCEL)
.tag(TAG_STATUS, TAGVALUE_CANCEL)
.description("Times the duration elapsed between a subscription and the cancellation of the sequence")
.register(registry);

this.subscribeToErrorTimerBuilder = Timer
.builder(METER_FLOW_DURATION)
.tags(commonTags)
.tag(TAG_STATUS, TAGVALUE_ON_ERROR)
.description("Times the duration elapsed between a subscription and the onError termination of the sequence, with the exception name as a tag");

this.onNextIntervalTimer = Timer
.builder(METER_ON_NEXT_DELAY)
.tags(commonTags)
Expand Down Expand Up @@ -326,7 +332,13 @@ public void onError(Throwable e) {
done = true;
//we don't record the time between last onNext and onError,
// because it would skew the onNext count by one
this.subscribeToTerminateSample.stop(subscribeToErrorTimer);

//register a timer for that particular exception
Timer timer = subscribeToErrorTimerBuilder
.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName())
.register(registry);
//record error termination
this.subscribeToTerminateSample.stop(timer);

actual.onError(e);
}
Expand Down Expand Up @@ -389,15 +401,14 @@ public void cancel() {
*
* @param <T>
*/
static final class MicrometerMetricsFuseableSubscriber<T> extends MicrometerMetricsSubscriber<T>
static final class MicrometerFluxMetricsFuseableSubscriber<T> extends MicrometerFluxMetricsSubscriber<T>
implements Fuseable, Fuseable.QueueSubscription<T> {

private boolean syncFused;

MicrometerMetricsFuseableSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags,
boolean monoSource) {
super(actual, registry, clock, sequenceName, sequenceTags, monoSource);
MicrometerFluxMetricsFuseableSubscriber(CoreSubscriber<? super T> actual,
MeterRegistry registry, Clock clock, String sequenceName, List<Tag> sequenceTags) {
super(actual, registry, clock, sequenceName, sequenceTags);
}

@Override
Expand Down Expand Up @@ -433,8 +444,12 @@ public T poll() {
}
return v;
} catch (Throwable e) {
//register a timer for that particular exception
Timer timer = subscribeToErrorTimerBuilder
.tag(FluxMetrics.TAG_EXCEPTION, e.getClass().getName())
.register(registry);
//record error termination
this.subscribeToTerminateSample.stop(subscribeToErrorTimer);
this.subscribeToTerminateSample.stop(timer);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.List;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
Expand Down Expand Up @@ -72,8 +71,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (registryCandidate != null) {
registry = registryCandidate;
}
source.subscribe(new FluxMetrics.MicrometerMetricsFuseableSubscriber<>(actual, registry,
Clock.SYSTEM, this.name, this.tags, false));
source.subscribe(new FluxMetrics.MicrometerFluxMetricsFuseableSubscriber<>(actual, registry,
Clock.SYSTEM, this.name, this.tags));
}

}
Loading

0 comments on commit 1c64c8a

Please sign in to comment.