Skip to content

Commit

Permalink
Add reactor#1123 metrics to Flux (reactor#1183)
Browse files Browse the repository at this point in the history
This commit adds a `metrics()` operator to Flux and Mono, which is
no-op if the Micrometer library is not on the classpath.

If it is, the upstream events are captured and various instrumentation
are performed.

The operator is Fuseable and uses the information provided by the
`name()` and `tags()` operators, if used.

Meter names are stored as private constants in the FluxMetrics class
for reference.
  • Loading branch information
simonbasle authored May 18, 2018
1 parent 89a8e0e commit 425bf5c
Show file tree
Hide file tree
Showing 10 changed files with 1,658 additions and 1 deletion.
7 changes: 7 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ ext {
}
}

// Metrics
micrometerVersion = 'latest.release' //TODO specify a version of micrometer?

// Logging
slf4jVersion = '1.7.12'
logbackVersion = '1.1.2'
Expand Down Expand Up @@ -249,6 +252,9 @@ project('reactor-core') {
//Optional Logging Operator
optional "org.slf4j:slf4j-api:$slf4jVersion"

//Optional Metrics
optional "io.micrometer:micrometer-core:$micrometerVersion"

optional("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")

//Optional JDK 9 Converter
Expand All @@ -257,6 +263,7 @@ project('reactor-core') {
testCompile 'junit:junit:4.12'

testRuntime "ch.qos.logback:logback-classic:$logbackVersion"
testRuntime "io.micrometer:micrometer-core:$micrometerVersion"
// Testing
testCompile(project(":reactor-test")) {
exclude module: 'reactor-core'
Expand Down
21 changes: 21 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -5431,6 +5431,27 @@ public final Flux<T> mergeWith(Publisher<? extends T> other) {
return merge(this, other);
}

/**
* Activate metrics for this sequence, provided there is an instrumentation facade
* on the classpath (otherwise this method is a pure no-op).
* <p>
* Metrics are gathered on {@link Subscriber} events, and it is recommended to also
* {@link #name(String) name} (and optionally {@link #tag(String, String) tag}) the
* sequence.
*
* @return an instrumented {@link Flux}
*/
public final Flux<T> metrics() {
if (!FluxMetrics.isMicrometerAvailable()) {
return this;
}

if (this instanceof Fuseable) {
return onAssembly(new FluxMetricsFuseable<>(this));
}
return onAssembly(new FluxMetrics<>(this));
}

/**
* Give a name to this sequence, which can be retrieved using {@link Scannable#name()}
* as long as this is the first reachable {@link Scannable#parents()}.
Expand Down
Loading

0 comments on commit 425bf5c

Please sign in to comment.