Skip to content

Latest commit

 

History

History
569 lines (474 loc) · 22.9 KB

debugging.adoc

File metadata and controls

569 lines (474 loc) · 22.9 KB

Debugging Reactor

Switching from an imperative and synchronous programming paradigm to a reactive and asynchronous one can sometimes be daunting. One of the steepest steps in the learning curve is how to analyze and debug when something goes wrong.

In the imperative world, debugging is usually pretty straightforward. You can read the stacktrace and see where the problem originated. Was it entirely a failure of your code? Did the failure occur in some library code? If so, what part of your code called the library, potentially passing in improper parameters that ultimately caused the failure?

The Typical Reactor Stack Trace

When you shift to asynchronous code, things can get much more complicated.

Consider the following stack trace:

Example 1. A typical Reactor stack trace
java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
	at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
	at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
	at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)

There is a lot going on there. We get an IndexOutOfBoundsException, which tells us that a source emitted more than one item.

We can probably quickly come to assume that this source is a Flux or a Mono, as confirmed by the next line, which mentions MonoSingle. So it appears to be some sort of complaint from a single operator.

Referring to the Javadoc for the Mono#single operator, we see that single has a contract: The source must emit exactly one element. It appears we had a source that emitted more than one and thus violated that contract.

Can we dig deeper and identify that source? The following rows are not very helpful. They take us through the internals of what seems to be a reactive chain, through multiple calls to subscribe and request.

By skimming over these rows, we can at least start to form a picture of the kind of chain that went wrong: It seems to involve a MonoSingle, a FluxFlatMap, and a FluxRange (each gets several rows in the trace, but overall these three classes are involved). So a range().flatMap().single() chain maybe?

But what if we use that pattern a lot in our application? This still does not tell us much, and simply searching for single is not going to find the problem. Then the last line refers to some of our code. Finally, we are getting close.

Hold on, though. When we go to the source file, all we see is that a pre-existing Flux is subscribed to, as follows:

toDebug.subscribe(System.out::println, Throwable::printStackTrace);

All of this happened at subscription time, but the Flux itself was not declared there. Worse, when we go to where the variable is declared, we see the following:

public Mono<String> toDebug; //please overlook the public class attribute

The variable is not instantiated where it is declared. We must assume a worst-case scenario where we find out that there could be a few different code paths that set it in the application. We remain unsure of which one caused the issue.

Note
This is kind of the Reactor equivalent of a runtime error, as opposed to a compilation error.

What we want to find out more easily is where the operator was added into the chain - that is, where the Flux was declared. We usually refer to that as the “assembly” of the Flux.

Activating Debug Mode - aka tracebacks

Warning
this section describes the easiest but also the slowest way to enable the debugging capabilities due to the fact that it captures the stacktrace on every operator. See The checkpoint() Alternative for a more fine grained way of debugging, and Production-ready Global Debugging for a more advanced and performant global option.

Even though the stacktrace was still able to convey some information for someone with a bit of experience, we can see that it is not ideal by itself in more advanced cases.

Fortunately, Reactor comes with assembly-time instrumentation that is designed for debugging.

This is done by customizing the Hooks.onOperator hook at application start (or at least before the incriminated Flux or Mono can be instantiated), as follows:

Hooks.onOperatorDebug();

This starts instrumenting the calls to the Flux (and Mono) operator methods (where they are assembled into the chain) by wrapping the construction of the operator and capturing a stack trace there. Since this is done when the operator chain is declared, the hook should be activated before that, so the safest way is to activate it right at the start of your application.

Later on, if an exception occurs, the failing operator is able to refer to that capture and append it to the stack trace. We call this captured assembly information a traceback.

In the next section, we see how the stack trace differs and how to interpret that new information.

Reading a Stack Trace in Debug Mode

When we reuse our initial example but activate the operatorStacktrace debug feature, the stack trace is as follows:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:375) (1)
...
(2)
...
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
	at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1000)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: (3)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (4)
	reactor.core.publisher.Flux.single(Flux.java:6676)
	reactor.guide.GuideTests.scatterAndGather(GuideTests.java:949)
	reactor.guide.GuideTests.populateDebug(GuideTests.java:962)
	org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
	org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s): (5)
	|_	Flux.singlereactor.guide.GuideTests.scatterAndGather(GuideTests.java:949) (6)
  1. This is new: We see the wrapper operator that captures the stack.

  2. Apart from that, the first section of the stack trace is still the same for the most part, showing a bit of the operator’s internals (so we removed a bit of the snippet here).

  3. This is where the traceback starts to appear.

  4. First, we get some details about where the operator was assembled.

  5. We also get a traceback of the error as it propagated through the operator chain, from first to last (error site to subscribe site).

  6. Each operator that saw the error is mentioned along with the user class and line where it was used.

The captured stack trace is appended to the original error as a suppressed OnAssemblyException. There are two parts to it, but the first section is the most interesting. It shows the path of construction for the operator that triggered the exception. Here, it shows that the single that caused our issue was created in the scatterAndGather method, itself called from a populateDebug method that got executed through JUnit.

Now that we are armed with enough information to find the culprit, we can have a meaningful look at that scatterAndGather method:

private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); (1)
}
  1. Sure enough, here is our single.

Now we can see what the root cause of the error was a flatMap that performs several HTTP calls to a few URLs but that is chained with single, which is too restrictive. After a short git blame and a quick discussion with the author of that line, we find out he meant to use the less restrictive take(1) instead.

We have solved our problem.

Now consider the following line in the stack trace:

Error has been observed by the following operator(s):

That second part of the debug stack trace was not necessarily interesting in this particular example, because the error was actually happening in the last operator in the chain (the one closest to subscribe). Considering another example might make it more clear:

FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();

Now imagine that, inside findAllUserByName, there is a map that fails. Here, we would see the following final traceback:

Error has been observed by the following operator(s):
	|_	Flux.mapreactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
	|_	Flux.mapreactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
	|_	Flux.filterreactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
	|_	Flux.transformreactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)
	|_	Flux.elapsedreactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
	|_	Flux.transformreactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:41)

This corresponds to the section of the chain of operators that gets notified of the error:

  1. The exception originates in the first map.

  2. It is seen by a second map (both in fact correspond to the findAllUserByName method).

  3. It is then seen by a filter and a transform, which indicate that part of the chain is constructed by a reusable transformation function (here, the applyFilters utility method).

  4. Finally, it is seen by an elapsed and a transform. Once again, elapsed is applied by the transformation function of that second transform.

Tip
As tracebacks are appended to original errors as suppressed exceptions, this can somewhat interfere with another type of exception that uses this mechanism: composite exceptions. Such exceptions can be created directly via Exceptions.multiple(Throwable…​), or by some operators that might join multiple erroring sources (like Flux#flatMapDelayError). They can be unwrapped into a List via Exceptions.unwrapMultiple(Throwable), in which case the traceback would be considered a component of the composite and be part of the returned List. If that is somehow not desirable, tracebacks can be identified thanks to Exceptions.isTraceback(Throwable) check, and excluded from such an unwrap by using Exceptions.unwrapMultipleExcludingTracebacks(Throwable) instead.

We deal with a form of instrumentation here, and creating a stack trace is costly. That is why this debugging feature should only be activated in a controlled manner, as a last resort.

The checkpoint() Alternative

The debug mode is global and affects every single operator assembled into a Flux or a Mono inside the application. This has the benefit of allowing after-the-fact debugging: Whatever the error, we can obtain additional information to debug it.

As we saw earlier, this global knowledge comes at the cost of an impact on performance (due to the number of populated stack traces). That cost can be reduced if we have an idea of likely problematic operators. However, we usually do not know which operators are likely to be problematic unless we observed an error in the wild, saw we were missing assembly information, and then modified the code to activate assembly tracking, hoping to observe the same error again.

In that scenario, we have to switch into debugging mode and make preparations in order to better observe a second occurrence of the error, this time capturing all the additional information.

If you can identify reactive chains that you assemble in your application for which serviceability is critical, you can achieve a mix of both techniques with the checkpoint() operator.

You can chain this operator into a method chain. The checkpoint operator works like the hook version but only for its link of that particular chain.

There is also a checkpoint(String) variant that lets you add a unique String identifier to the assembly traceback. This way, the stack trace is omitted and you rely on the description to identify the assembly site. checkpoint(String) imposes less processing cost than a regular checkpoint.

checkpoint(String) includes “light” in its output (which can be handy when searching), as shown in the following example:

...
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.ParallelSource] is identified by light checkpoint [light checkpoint identifier].

Last but not least, if you want to add a more generic description to the checkpoint but still rely on the stack trace mechanism to identify the assembly site, you can force that behavior by using the checkpoint("description", true) version. We are now back to the initial message for the traceback, augmented with a description, as shown in the following example:

Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed by the following operator(s):
	|_	ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
  1. descriptionCorrelation1234 is the description provided in the checkpoint.

The description could be a static identifier or user-readable description or a wider correlation ID (for instance, coming from a header in the case of an HTTP request).

Note
When both global debugging and local checkpoint() are enabled, checkpointed snapshot stacks are appended as suppressed error output after the observing operator graph and following the same declarative order.

Production-ready Global Debugging

Project Reactor comes with a separate Java Agent that instruments your code and adds debugging info without paying the cost of capturing the stacktrace on every operator call. The behaviour is very similar to Activating Debug Mode - aka tracebacks, but without the runtime performance overhead.

To use it in your app, you must add it as a dependency.

The following example shows how to add reactor-tools as a dependency in Maven:

Example 2. reactor-tools in Maven, in <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    (1)
</dependency>
  1. If you use the BOM, you do not need to specify a <version>.

The following example shows how to add reactor-tools as a dependency in Gradle:

Example 3. reactor-tools in Gradle, amend the dependencies block
dependencies {
   compile 'io.projectreactor:reactor-tools'
}

It also needs to be explicitly initialized with:

ReactorDebugAgent.init();
Tip
Since the implementation will instrument your classes when they are loaded, the best place to put it is before everything else in your main(String[]) methood:
public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}

You may also re-process existing classes if you cannot run the init eagerly (e.g. in the tests):

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
Warning
Be aware that the re-processing takes a couple of seconds due to the need to iterate over all loaded classes and apply the transformation. Use it only if you see that some call-sites are not instrumented.

Limitations

ReactorDebugAgent is implemented as a Java Agent and uses ByteBuddy to perform the self-attach. Self-attach may not work on some JVMs, please refer to ByteBuddy’s documentation for more details.

Running ReactorDebugAgent as a Java Agent

If your environment does not support ByteBuddy’s self-attachment, you can run reactor-tools as a Java Agent:

java -javaagent reactor-tools.jar -jar app.jar

Running ReactorDebugAgent at build time

It is also possible to run reactor-tools at build time. To do so, you need to apply it as a plugin for ByteBuddy’s build instrumentation.

Warning
The transformation will only be applied to your project’s classes. The classpath libraries will not be instrumented.
Example 4. reactor-tools with ByteBuddy’s Maven plugin
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-tools</artifactId>
		(1)
		<classifier>original</classifier> (2)
		<scope>runtime</scope>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>net.bytebuddy</groupId>
			<artifactId>byte-buddy-maven-plugin</artifactId>
			<configuration>
				<transformations>
					<transformation>
						<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
					</transformation>
				</transformations>
			</configuration>
		</plugin>
	</plugins>
</build>
  1. If you use the BOM, you do not need to specify a <version>.

  2. classifier here is important.

Example 5. reactor-tools with ByteBuddy’s Gradle plugin
plugins {
	id 'net.bytebuddy.byte-buddy-gradle-plugin' version '1.10.9'
}

configurations {
	byteBuddyPlugin
}

dependencies {
	byteBuddyPlugin(
			group: 'io.projectreactor',
			name: 'reactor-tools',
			(1)
			classifier: 'original', (2)
	)
}

byteBuddy {
	transformation {
		plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
		classPath = configurations.byteBuddyPlugin
	}
}
  1. If you use the BOM, you do not need to specify a version.

  2. classifier here is important.

Logging a Sequence

In addition to stack trace debugging and analysis, another powerful tool to have in your toolkit is the ability to trace and log events in an asynchronous sequence.

The log() operator can do just that. Chained inside a sequence, it peeks at every event of the Flux or Mono upstream of it (including onNext, onError, and onComplete as well as subscriptions, cancellations, and requests).

A note on logging implementation

The log operator uses the Loggers utility class, which picks up common logging frameworks such as Log4J and Logback through SLF4J and defaults to logging to the console if SLF4J is unavailable.

The console fallback uses System.err for the WARN and ERROR log levels and System.out for everything else.

If you prefer a JDK java.util.logging fallback, as in 3.0.x, you can get it by setting the reactor.logging.fallback system property to JDK.

In all cases, when logging in production you should take care to configure the underlying logging framework to use its most asynchronous and non-blocking approach — for instance, an AsyncAppender in Logback or AsyncLogger in Log4j 2.

For instance, suppose we have Logback activated and configured and a chain like range(1,10).take(3). By placing a log() before the take, we can get some insight into how it works and what kind of events it propagates upstream to the range, as the following example shows:

Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();

This prints out the following (through the logger’s console appender):

10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(unbounded) (2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) (3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel() (4)

Here, in addition to the logger’s own formatter (time, thread, level, message), the log() operator outputs a few things in its own format:

  1. reactor.Flux.Range.1 is an automatic category for the log, in case you use the operator several times in a chain. It lets you distinguish which operator’s events are logged (in this case, the range). You can overwrite the identifier with your own custom category by using the log(String) method signature. After a few separating characters, the actual event gets printed. Here, we get an onSubscribe call, a request call, three onNext calls, and a cancel call. For the first line, onSubscribe, we get the implementation of the Subscriber, which usually corresponds to the operator-specific implementation. Between square brackets, we get additional information, including whether the operator can be automatically optimized through synchronous or asynchronous fusion.

  2. On the second line, we can see that an unbounded request was propagated up from downstream.

  3. Then the range sends three values in a row.

  4. On the last line, we see cancel().

The last line, (4), is the most interesting. We can see the take in action there. It operates by cutting the sequence short after it has seen enough elements emitted. In short, take() causes the source to cancel() once it has emitted the user-requested amount.