Skip to content

Support rx java #243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ _site/
*.logtjmeter
.checkstyle
.DS_Store
*.log
*.log
/spring-cloud-sleuth-core/nb-configuration.xml
/spring-cloud-sleuth-core/nbactions.xml
7 changes: 7 additions & 0 deletions docs/src/main/asciidoc/spring-cloud-sleuth.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,13 @@ In order to pass the tracing information you have to wrap the same logic in the
include::../../../../spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/hystrix/TraceCommandTests.java[tags=trace_hystrix_command,indent=0]
----

=== `RxJavaSchedulersHook` to add support for `RxJava` Schedulers

We're registering a custom https://github.com/ReactiveX/RxJava/wiki/Plugins#rxjavaschedulershook [`RxJavaSchedulersHook`]
that wraps all `Action0` instances into their Sleuth representative -
the `TraceAction`. The hook either starts or continues a span depending on the fact whether tracing was already going
on before the Action was scheduled. To disable the custom RxJavaSchedulersHook set the `spring.sleuth.rxjava.schedulers.hook.enabled` to `false`.

=== HTTP integration

Features from this section can be disabled by providing the `spring.sleuth.web.enabled` property with value equal to `false`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rx.plugins.RxJavaSchedulersHook;

/**
* {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration Auto-configuration} that
* enables support for RxJava via {@link RxJavaSchedulersHook}.
*
* @author Shivang Shah
* @since 1.0.0
*/
@Configuration
@AutoConfigureAfter(TraceAutoConfiguration.class)
@ConditionalOnBean(Tracer.class)
@ConditionalOnClass(RxJavaSchedulersHook.class)
@ConditionalOnProperty(value = "spring.sleuth.rxjava.schedulers.hook.enabled", matchIfMissing = true)
public class RxJavaAutoConfiguration {

@Bean
SleuthRxJavaSchedulersHook sleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) {
return new SleuthRxJavaSchedulersHook(tracer, traceKeys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import rx.functions.Action0;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaSchedulersHook;
import rx.plugins.SleuthRxJavaPlugins;

/**
* {@link RxJavaSchedulersHook} that wraps a {@link Action0} into its tracing
* representation.
*
* @author Shivang Shah
* @since 1.0.0
*/
class SleuthRxJavaSchedulersHook extends RxJavaSchedulersHook {

private static final Log log = LogFactory.getLog(SleuthRxJavaSchedulersHook.class);

private static final String RXJAVA_COMPONENT = "rxjava";
private final Tracer tracer;
private final TraceKeys traceKeys;
private RxJavaSchedulersHook delegate;

SleuthRxJavaSchedulersHook(Tracer tracer, TraceKeys traceKeys) {
this.tracer = tracer;
this.traceKeys = traceKeys;
try {
this.delegate = SleuthRxJavaPlugins.getInstance().getSchedulersHook();
if (this.delegate instanceof SleuthRxJavaSchedulersHook) {
return;
}
RxJavaErrorHandler errorHandler = SleuthRxJavaPlugins.getInstance().getErrorHandler();
RxJavaObservableExecutionHook observableExecutionHook
= SleuthRxJavaPlugins.getInstance().getObservableExecutionHook();
logCurrentStateOfRxJavaPlugins(errorHandler, observableExecutionHook);
SleuthRxJavaPlugins.resetPlugins();
SleuthRxJavaPlugins.getInstance().registerSchedulersHook(this);
SleuthRxJavaPlugins.getInstance().registerErrorHandler(errorHandler);
SleuthRxJavaPlugins.getInstance().registerObservableExecutionHook(observableExecutionHook);
} catch (Exception e) {
log.error("Failed to register Sleuth RxJava SchedulersHook", e);
}
}

private void logCurrentStateOfRxJavaPlugins(RxJavaErrorHandler errorHandler,
RxJavaObservableExecutionHook observableExecutionHook) {
log.debug("Current RxJava plugins configuration is ["
+ "schedulersHook [" + this.delegate + "],"
+ "errorHandler [" + errorHandler + "],"
+ "observableExecutionHook [" + observableExecutionHook + "],"
+ "]");
log.debug("Registering Sleuth RxJava Schedulers Hook.");
}

@Override
public Action0 onSchedule(Action0 action) {
if (action instanceof TraceAction) {
return action;
}
Action0 wrappedAction = this.delegate != null
? this.delegate.onSchedule(action) : action;
if (wrappedAction instanceof TraceAction) {
return action;
}
return super.onSchedule(new TraceAction(this.tracer, this.traceKeys, wrappedAction));
}

static class TraceAction implements Action0 {

private final Action0 actual;
private Tracer tracer;
private TraceKeys traceKeys;
private Span parent;

public TraceAction(Tracer tracer, TraceKeys traceKeys, Action0 actual) {
this.tracer = tracer;
this.traceKeys = traceKeys;
this.parent = tracer.getCurrentSpan();
this.actual = actual;
}

@Override
public void call() {
Span span = this.parent;
boolean created = false;
if (span != null) {
span = this.tracer.continueSpan(span);
} else {
span = this.tracer.createSpan(RXJAVA_COMPONENT);
this.tracer.addTag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, RXJAVA_COMPONENT);
this.tracer.addTag(this.traceKeys.getAsync().getPrefix()
+ this.traceKeys.getAsync().getThreadNameKey(), Thread.currentThread().getName());
created = true;
}
try {
this.actual.call();
} finally {
if (created) {
this.tracer.close(span);
} else {
this.tracer.detach(span);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package rx.plugins;

/**
* {@link RxJavaPlugins} helper class to access the package scope method
* of {@link RxJavaPlugins#reset()}. Will disappear once this gets closed
* https://github.com/ReactiveX/RxJava/issues/2297
*
* @author Shivang Shah
* @since 1.0.0
*/
public class SleuthRxJavaPlugins extends RxJavaPlugins {

SleuthRxJavaPlugins() {
super();
}

public static void resetPlugins() {
getInstance().reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ org.springframework.cloud.sleuth.instrument.web.TraceWebAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.TraceWebClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.TraceWebAsyncClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.web.client.feign.TraceFeignClientAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration
org.springframework.cloud.sleuth.instrument.zuul.TraceZuulAutoConfiguration,\
org.springframework.cloud.sleuth.instrument.rxjava.RxJavaAutoConfiguration

# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package org.springframework.cloud.sleuth;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static org.assertj.core.api.BDDAssertions.then;

/**
Expand All @@ -36,24 +38,24 @@ public class LogTest {
ObjectMapper objectMapper = new ObjectMapper();

@Test public void ctor_missing_event() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("event");
this.thrown.expect(NullPointerException.class);
this.thrown.expectMessage("event");

new Log(1234L, null);
}

@Test public void serialization_round_trip() throws IOException {
Log log = new Log(1234L, "cs");

String serialized = objectMapper.writeValueAsString(log);
Log deserialized = objectMapper.readValue(serialized, Log.class);
String serialized = this.objectMapper.writeValueAsString(log);
Log deserialized = this.objectMapper.readValue(serialized, Log.class);

then(deserialized).isEqualTo(log);
}

@Test public void deserialize_missing_event() throws IOException {
thrown.expect(JsonMappingException.class);
this.thrown.expect(JsonMappingException.class);

objectMapper.readValue("{\"timestamp\": 1234}", Log.class);
this.objectMapper.readValue("{\"timestamp\": 1234}", Log.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.springframework.cloud.sleuth.instrument.rxjava;

import java.util.ArrayList;
import java.util.List;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.cloud.sleuth.Sampler;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.SpanReporter;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import rx.Observable;
import rx.functions.Action0;
import rx.plugins.SleuthRxJavaPlugins;
import rx.schedulers.Schedulers;
import static org.springframework.cloud.sleuth.assertions.SleuthAssertions.then;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {
SleuthRxJavaIntegrationTests.TestConfig.class})
public class SleuthRxJavaIntegrationTests {

@Autowired
Tracer tracer;
@Autowired
TraceKeys traceKeys;
@Autowired
Listener listener;
@Autowired
SleuthRxJavaSchedulersHook sleuthRxJavaSchedulersHook;
StringBuilder caller = new StringBuilder();

@Before
public void cleanTrace() {
this.listener.getEvents().clear();
}

@BeforeClass
@AfterClass
public static void cleanUp() {
SleuthRxJavaPlugins.resetPlugins();
}

@Test
public void should_create_new_span_when_no_current_span_when_rx_java_action_is_executed() {
Observable.defer(() -> Observable.just(
(Action0) () -> this.caller = new StringBuilder("actual_action")
)).subscribeOn(Schedulers.newThread()).toBlocking()
.subscribe(Action0::call);

then(this.caller.toString()).isEqualTo("actual_action");
then(this.tracer.getCurrentSpan()).isNull();
then(this.listener.getEvents().size()).isEqualTo(1);
then(this.listener.getEvents().get(0)).hasNameEqualTo("rxjava");
then(this.listener.getEvents().get(0)).hasATag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, "rxjava");
then(this.listener.getEvents().get(0)).isALocalComponentSpan();
}

@Test
public void should_continue_current_span_when_rx_java_action_is_executed() {
Span spanInCurrentThread = this.tracer.createSpan("current_span");
this.tracer.addTag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, "current_span");

Observable.defer(() -> Observable.just(
(Action0) () -> this.caller = new StringBuilder("actual_action")
)).subscribeOn(Schedulers.newThread()).toBlocking()
.subscribe(Action0::call);

then(this.caller.toString()).isEqualTo("actual_action");
then(this.tracer.getCurrentSpan()).isNotNull();
//making sure here that no new spans were created or reported as closed
then(this.listener.getEvents().size()).isEqualTo(0);
then(spanInCurrentThread).hasNameEqualTo(spanInCurrentThread.getName());
then(spanInCurrentThread).hasATag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, "current_span");
then(spanInCurrentThread).isALocalComponentSpan();
}

@Component
public static class Listener implements SpanReporter {

List<Span> events = new ArrayList<>();

public List<Span> getEvents() {
return this.events;
}

@Override
public void report(Span span) {
this.events.add(span);
}
}

@Configuration
@EnableAutoConfiguration
public static class TestConfig {

@Bean
Listener listener() {
return new Listener();
}

@Bean
Sampler alwaysSampler() {
return new AlwaysSampler();
}
}

}
Loading