Switching from an imperative and synchronous programming paradigm to a reactive and asynchronous one can sometimes be daunting. One of the steepest steps in the learning curve is how to analyze and debug when something goes wrong.
In the imperative world, debugging is usually pretty straightforward. You can read the stacktrace and see where the problem originated. Was it entirely a failure of your code? Did the failure occur in some library code? If so, what part of your code called the library, potentially passing in improper parameters that ultimately caused the failure?
When you shift to asynchronous code, things can get much more complicated.
Consider the following stack trace:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)
There is a lot going on there. We get an IndexOutOfBoundsException
, which tells us that
a source emitted more than one item
.
We can probably quickly come to assume that this source is a Flux or a Mono, as confirmed by
the next line, which mentions MonoSingle
. So it appears to be some sort of complaint
from a single
operator.
Referring to the Javadoc for the Mono#single
operator, we see that single
has a contract:
The source must emit exactly one element. It appears we had a source that emitted more
than one and thus violated that contract.
Can we dig deeper and identify that source? The following rows are not very helpful. They
take us through the internals of what seems to be a reactive chain, through
multiple calls to subscribe
and request
.
By skimming over these rows, we can at least start to form a picture of the kind of chain
that went wrong: It seems to involve a MonoSingle
, a FluxFlatMap
, and a FluxRange
(each gets several rows in the trace, but overall these three classes are involved). So a
range().flatMap().single()
chain maybe?
But what if we use that pattern a lot in our application? This still does not tell us
much, and simply searching for single
is not going to find the problem. Then the last
line refers to some of our code. Finally, we are getting close.
Hold on, though. When we go to the source file, all we see is that a
pre-existing Flux
is subscribed to, as follows:
toDebug.subscribe(System.out::println, Throwable::printStackTrace);
All of this happened at subscription time, but the Flux
itself was not
declared there. Worse, when we go to where the variable is declared, we see the following:
public Mono<String> toDebug; //please overlook the public class attribute
The variable is not instantiated where it is declared. We must assume a worst-case scenario where we find out that there could be a few different code paths that set it in the application. We remain unsure of which one caused the issue.
Note
|
This is kind of the Reactor equivalent of a runtime error, as opposed to a compilation error. |
What we want to find out more easily is where the operator was added into the chain -
that is, where the Flux
was declared. We usually refer to that as the “assembly” of
the Flux
.
Warning
|
this section describes the easiest but also the slowest way to enable
the debugging capabilities due to the fact that it captures the stacktrace on every operator.
See The checkpoint() Alternative for a more fine grained way of debugging,
and Production-ready Global Debugging for a more advanced and performant global option.
|
Even though the stacktrace was still able to convey some information for someone with a bit of experience, we can see that it is not ideal by itself in more advanced cases.
Fortunately, Reactor comes with assembly-time instrumentation that is designed for debugging.
This is done by customizing the Hooks.onOperator
hook at application start (or at
least before the incriminated Flux
or Mono
can be instantiated), as follows:
Hooks.onOperatorDebug();
This starts instrumenting the calls to the Flux
(and Mono
) operator methods (where
they are assembled into the chain) by wrapping the construction of the operator and
capturing a stack trace there. Since this is done when the operator chain is declared, the
hook should be activated before that, so the safest way is to activate it right at the
start of your application.
Later on, if an exception occurs, the failing operator is able to refer to that capture and append it to the stack trace. We call this captured assembly information a traceback.
In the next section, we see how the stack trace differs and how to interpret that new information.
When we reuse our initial example but activate the operatorStacktrace
debug feature,
the stack trace is as follows:
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:375) (1)
...
(2)
...
at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1000)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: (3)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (4)
reactor.core.publisher.Flux.single(Flux.java:6676)
reactor.guide.GuideTests.scatterAndGather(GuideTests.java:949)
reactor.guide.GuideTests.populateDebug(GuideTests.java:962)
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s): (5)
|_ Flux.single ⇢ reactor.guide.GuideTests.scatterAndGather(GuideTests.java:949) (6)
-
This is new: We see the wrapper operator that captures the stack.
-
Apart from that, the first section of the stack trace is still the same for the most part, showing a bit of the operator’s internals (so we removed a bit of the snippet here).
-
This is where the traceback starts to appear.
-
First, we get some details about where the operator was assembled.
-
We also get a traceback of the error as it propagated through the operator chain, from first to last (error site to subscribe site).
-
Each operator that saw the error is mentioned along with the user class and line where it was used.
The captured stack trace is appended to the original error as a
suppressed OnAssemblyException
. There are two parts to it, but the first section is the
most interesting. It shows the path of construction for the operator that triggered the
exception. Here, it shows that the single
that caused our issue was created in the
scatterAndGather
method, itself called from a populateDebug
method that got executed
through JUnit.
Now that we are armed with enough information to find the culprit, we can have
a meaningful look at that scatterAndGather
method:
private Mono<String> scatterAndGather(Flux<String> urls) {
return urls.flatMap(url -> doRequest(url))
.single(); (1)
}
-
Sure enough, here is our
single
.
Now we can see what the root cause of the error was a flatMap
that performs
several HTTP calls to a few URLs but that is chained with single
, which is too
restrictive. After a short git blame
and a quick discussion with the author of
that line, we find out he meant to use the less restrictive take(1)
instead.
We have solved our problem.
Now consider the following line in the stack trace:
Error has been observed by the following operator(s):
That second part of the debug stack trace was not necessarily interesting in
this particular example, because the error was actually happening in the last
operator in the chain (the one closest to subscribe
). Considering another
example might make it more clear:
FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
.transform(FakeUtils1.applyFilters)
.transform(FakeUtils2.enrichUser)
.blockLast();
Now imagine that, inside findAllUserByName
, there is a map
that fails. Here,
we would see the following final traceback:
Error has been observed by the following operator(s):
|_ Flux.map ⇢ reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
|_ Flux.map ⇢ reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
|_ Flux.filter ⇢ reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
|_ Flux.transform ⇢ reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)
|_ Flux.elapsed ⇢ reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
|_ Flux.transform ⇢ reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:41)
This corresponds to the section of the chain of operators that gets notified of the error:
-
The exception originates in the first
map
. -
It is seen by a second
map
(both in fact correspond to thefindAllUserByName
method). -
It is then seen by a
filter
and atransform
, which indicate that part of the chain is constructed by a reusable transformation function (here, theapplyFilters
utility method). -
Finally, it is seen by an
elapsed
and atransform
. Once again,elapsed
is applied by the transformation function of that second transform.
Tip
|
As tracebacks are appended to original errors as suppressed exceptions, this can somewhat
interfere with another type of exception that uses this mechanism: composite exceptions.
Such exceptions can be created directly via Exceptions.multiple(Throwable…) , or by some
operators that might join multiple erroring sources (like Flux#flatMapDelayError ). They
can be unwrapped into a List via Exceptions.unwrapMultiple(Throwable) , in which case the traceback
would be considered a component of the composite and be part of the returned List .
If that is somehow not desirable, tracebacks can be identified thanks to Exceptions.isTraceback(Throwable)
check, and excluded from such an unwrap by using Exceptions.unwrapMultipleExcludingTracebacks(Throwable)
instead.
|
We deal with a form of instrumentation here, and creating a stack trace is costly. That is why this debugging feature should only be activated in a controlled manner, as a last resort.
The debug mode is global and affects every single operator assembled into a Flux
or a
Mono
inside the application. This has the benefit of allowing after-the-fact
debugging: Whatever the error, we can obtain additional information to debug it.
As we saw earlier, this global knowledge comes at the cost of an impact on performance (due to the number of populated stack traces). That cost can be reduced if we have an idea of likely problematic operators. However, we usually do not know which operators are likely to be problematic unless we observed an error in the wild, saw we were missing assembly information, and then modified the code to activate assembly tracking, hoping to observe the same error again.
In that scenario, we have to switch into debugging mode and make preparations in order to better observe a second occurrence of the error, this time capturing all the additional information.
If you can identify reactive chains that you assemble in your application for which
serviceability is critical, you can achieve a mix of both techniques with the
checkpoint()
operator.
You can chain this operator into a method chain. The checkpoint
operator works like the
hook version but only for its link of that particular chain.
There is also a checkpoint(String)
variant that lets you add a unique String
identifier
to the assembly traceback. This way, the stack trace is omitted and you rely on the
description to identify the assembly site. checkpoint(String)
imposes less processing
cost than a regular checkpoint
.
checkpoint(String)
includes “light” in its output (which can be handy when
searching), as shown in the following example:
... Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly site of producer [reactor.core.publisher.ParallelSource] is identified by light checkpoint [light checkpoint identifier].
Last but not least, if you want to add a more generic description to the checkpoint but
still rely on the stack trace mechanism to identify the assembly site, you can force that
behavior by using the checkpoint("description", true)
version. We are now back to the
initial message for the traceback, augmented with a description
, as shown in the
following example:
Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1) reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215) reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225) Error has been observed by the following operator(s): |_ ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
-
descriptionCorrelation1234
is the description provided in thecheckpoint
.
The description could be a static identifier or user-readable description or a wider correlation ID (for instance, coming from a header in the case of an HTTP request).
Note
|
When both global debugging and local checkpoint() are enabled, checkpointed
snapshot stacks are appended as suppressed error output after the observing operator
graph and following the same declarative order.
|
Project Reactor comes with a separate Java Agent that instruments your code and adds debugging info without paying the cost of capturing the stacktrace on every operator call. The behaviour is very similar to Activating Debug Mode - aka tracebacks, but without the runtime performance overhead.
To use it in your app, you must add it as a dependency.
The following example shows how to add reactor-tools
as a dependency in Maven:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
(1)
</dependency>
-
If you use the BOM, you do not need to specify a
<version>
.
The following example shows how to add reactor-tools
as a dependency in Gradle:
dependencies
blockdependencies {
compile 'io.projectreactor:reactor-tools'
}
It also needs to be explicitly initialized with:
ReactorDebugAgent.init();
Tip
|
Since the implementation will instrument your classes when they are loaded, the best place to put it is before everything else in your main(String[]) methood: |
public static void main(String[] args) {
ReactorDebugAgent.init();
SpringApplication.run(Application.class, args);
}
You may also re-process existing classes if you cannot run the init eagerly (e.g. in the tests):
ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
Warning
|
Be aware that the re-processing takes a couple of seconds due to the need to iterate over all loaded classes and apply the transformation. Use it only if you see that some call-sites are not instrumented. |
ReactorDebugAgent
is implemented as a Java Agent and uses ByteBuddy
to perform the self-attach.
Self-attach may not work on some JVMs, please refer to ByteBuddy’s documentation for more details.
If your environment does not support ByteBuddy’s self-attachment, you can run reactor-tools
as a
Java Agent:
java -javaagent reactor-tools.jar -jar app.jar
It is also possible to run reactor-tools
at build time. To do so, you need to apply it as a
plugin for ByteBuddy’s build instrumentation.
Warning
|
The transformation will only be applied to your project’s classes. The classpath libraries will not be instrumented. |
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
(1)
<classifier>original</classifier> (2)
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-maven-plugin</artifactId>
<configuration>
<transformations>
<transformation>
<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
</transformation>
</transformations>
</configuration>
</plugin>
</plugins>
</build>
-
If you use the BOM, you do not need to specify a
<version>
. -
classifier
here is important.
plugins {
id 'net.bytebuddy.byte-buddy-gradle-plugin' version '1.10.9'
}
configurations {
byteBuddyPlugin
}
dependencies {
byteBuddyPlugin(
group: 'io.projectreactor',
name: 'reactor-tools',
(1)
classifier: 'original', (2)
)
}
byteBuddy {
transformation {
plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
classPath = configurations.byteBuddyPlugin
}
}
-
If you use the BOM, you do not need to specify a
version
. -
classifier
here is important.
In addition to stack trace debugging and analysis, another powerful tool to have in your toolkit is the ability to trace and log events in an asynchronous sequence.
The log()
operator can do just that. Chained inside a sequence, it peeks at every
event of the Flux
or Mono
upstream of it (including onNext
, onError
, and
onComplete
as well as subscriptions, cancellations, and requests).
The log
operator uses the Loggers
utility class, which picks up common logging
frameworks such as Log4J and Logback through SLF4J
and defaults to logging to the
console if SLF4J is unavailable.
The console fallback uses System.err
for the WARN
and ERROR
log levels and
System.out
for everything else.
If you prefer a JDK java.util.logging
fallback, as in 3.0.x, you can get it by setting
the reactor.logging.fallback
system property to JDK
.
In all cases, when logging in production you should take care to configure the
underlying logging framework to use its most asynchronous and non-blocking approach — for instance, an AsyncAppender
in Logback or AsyncLogger
in Log4j 2.
For instance, suppose we have Logback activated and configured and a chain like
range(1,10).take(3)
. By placing a log()
before the take
, we can get some
insight into how it works and what kind of events it propagates upstream to the range,
as the following example shows:
Flux<Integer> flux = Flux.range(1, 10)
.log()
.take(3);
flux.subscribe();
This prints out the following (through the logger’s console appender):
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) (2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) (3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | cancel() (4)
Here, in addition to the logger’s own formatter (time, thread, level, message), the
log()
operator outputs a few things in its own format:
-
reactor.Flux.Range.1
is an automatic category for the log, in case you use the operator several times in a chain. It lets you distinguish which operator’s events are logged (in this case, therange
). You can overwrite the identifier with your own custom category by using thelog(String)
method signature. After a few separating characters, the actual event gets printed. Here, we get anonSubscribe
call, arequest
call, threeonNext
calls, and acancel
call. For the first line,onSubscribe
, we get the implementation of theSubscriber
, which usually corresponds to the operator-specific implementation. Between square brackets, we get additional information, including whether the operator can be automatically optimized through synchronous or asynchronous fusion. -
On the second line, we can see that an unbounded request was propagated up from downstream.
-
Then the range sends three values in a row.
-
On the last line, we see
cancel()
.
The last line, (4), is the most interesting. We can see the take
in action there. It
operates by cutting the sequence short after it has seen enough elements emitted. In
short, take()
causes the source to cancel()
once it has emitted the user-requested
amount.