Skip to content

Commit 833a988

Browse files
authored
ServiceTalk async context propagation instrumentation (#7241)
1 parent 08f0979 commit 833a988

File tree

6 files changed

+334
-0
lines changed

6 files changed

+334
-0
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
plugins {
2+
id 'java-test-fixtures'
3+
}
4+
5+
muzzle {
6+
pass {
7+
group = 'io.servicetalk'
8+
module = 'servicetalk-concurrent-api'
9+
// prev versions missing ContextMap
10+
versions = '[0.41.12,)'
11+
assertInverse = true
12+
}
13+
pass {
14+
group = 'io.servicetalk'
15+
module = 'servicetalk-context-api'
16+
versions = '[0.1.0,)'
17+
assertInverse = true
18+
}
19+
}
20+
21+
ext {
22+
minJavaVersionForTests = JavaVersion.VERSION_11
23+
}
24+
25+
apply from: "$rootDir/gradle/java.gradle"
26+
27+
addTestSuiteForDir('latestDepTest', 'test')
28+
29+
dependencies {
30+
compileOnly group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '0.42.45'
31+
compileOnly group: 'io.servicetalk', name: 'servicetalk-context-api', version: '0.42.45'
32+
33+
testImplementation group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '0.42.0'
34+
testImplementation group: 'io.servicetalk', name: 'servicetalk-context-api', version: '0.42.0'
35+
36+
latestDepTestImplementation group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '+'
37+
latestDepTestImplementation group: 'io.servicetalk', name: 'servicetalk-context-api', version: '+'
38+
}
39+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package datadog.trace.instrumentation.servicetalk;
2+
3+
import datadog.trace.agent.tooling.InstrumenterModule;
4+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
5+
import java.util.Collections;
6+
import java.util.Map;
7+
8+
public abstract class AbstractAsyncContextInstrumentation extends InstrumenterModule.Tracing {
9+
10+
public AbstractAsyncContextInstrumentation() {
11+
super("servicetalk", "servicetalk-concurrent");
12+
}
13+
14+
@Override
15+
public Map<String, String> contextStore() {
16+
return Collections.singletonMap(
17+
"io.servicetalk.context.api.ContextMap", AgentSpan.class.getName());
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package datadog.trace.instrumentation.servicetalk;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
5+
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
6+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
7+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
8+
9+
import com.google.auto.service.AutoService;
10+
import datadog.trace.agent.tooling.Instrumenter;
11+
import datadog.trace.agent.tooling.InstrumenterModule;
12+
import datadog.trace.bootstrap.InstrumentationContext;
13+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
14+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
15+
import io.servicetalk.context.api.ContextMap;
16+
import net.bytebuddy.asm.Advice;
17+
18+
@AutoService(InstrumenterModule.class)
19+
public class ContextMapInstrumentation extends AbstractAsyncContextInstrumentation
20+
implements Instrumenter.ForSingleType {
21+
22+
@Override
23+
public String instrumentedType() {
24+
return "io.servicetalk.concurrent.api.CopyOnWriteContextMap";
25+
}
26+
27+
@Override
28+
public void methodAdvice(MethodTransformer transformer) {
29+
transformer.applyAdvice(
30+
isConstructor()
31+
.and(isPrivate())
32+
.and(takesArguments(1))
33+
.and(
34+
takesArgument(
35+
0,
36+
named("io.servicetalk.concurrent.api.CopyOnWriteContextMap$CopyContextMap"))),
37+
getClass().getName() + "$Construct");
38+
}
39+
40+
private static final class Construct {
41+
@Advice.OnMethodExit(suppress = Throwable.class)
42+
public static void exit(@Advice.This ContextMap contextMap) {
43+
InstrumentationContext.get(ContextMap.class, AgentSpan.class)
44+
.put(contextMap, AgentTracer.activeSpan());
45+
}
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package datadog.trace.instrumentation.servicetalk;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
4+
5+
import com.google.auto.service.AutoService;
6+
import datadog.trace.agent.tooling.Instrumenter;
7+
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import datadog.trace.bootstrap.InstrumentationContext;
9+
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
10+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
11+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
12+
import io.servicetalk.context.api.ContextMap;
13+
import net.bytebuddy.asm.Advice;
14+
15+
@AutoService(InstrumenterModule.class)
16+
public class ContextPreservingInstrumentation extends AbstractAsyncContextInstrumentation
17+
implements Instrumenter.ForKnownTypes {
18+
19+
@Override
20+
public String[] knownMatchingTypes() {
21+
return new String[] {
22+
"io.servicetalk.concurrent.api.ContextPreservingBiConsumer",
23+
"io.servicetalk.concurrent.api.ContextPreservingBiFunction",
24+
"io.servicetalk.concurrent.api.ContextPreservingCallable",
25+
"io.servicetalk.concurrent.api.ContextPreservingCancellable",
26+
"io.servicetalk.concurrent.api.ContextPreservingCompletableSubscriber",
27+
"io.servicetalk.concurrent.api.ContextPreservingConsumer",
28+
"io.servicetalk.concurrent.api.ContextPreservingFunction",
29+
"io.servicetalk.concurrent.api.ContextPreservingRunnable",
30+
"io.servicetalk.concurrent.api.ContextPreservingSingleSubscriber",
31+
"io.servicetalk.concurrent.api.ContextPreservingSubscriber",
32+
"io.servicetalk.concurrent.api.ContextPreservingSubscription",
33+
};
34+
}
35+
36+
@Override
37+
public void methodAdvice(MethodTransformer transformer) {
38+
transformer.applyAdvice(
39+
namedOneOf(
40+
"accept",
41+
"apply",
42+
"call",
43+
"cancel",
44+
"onComplete",
45+
"onError",
46+
"onSuccess",
47+
"request",
48+
"onNext",
49+
"onSubscribe",
50+
"run"),
51+
getClass().getName() + "$Wrapper");
52+
}
53+
54+
public static final class Wrapper {
55+
@Advice.OnMethodEnter(suppress = Throwable.class)
56+
public static AgentScope enter(@Advice.FieldValue("saved") final ContextMap contextMap) {
57+
AgentSpan parent =
58+
InstrumentationContext.get(ContextMap.class, AgentSpan.class).get(contextMap);
59+
if (parent != null) {
60+
return AgentTracer.activateSpan(parent);
61+
}
62+
return null;
63+
}
64+
65+
@Advice.OnMethodExit(suppress = Throwable.class)
66+
public static void exit(@Advice.Enter final AgentScope agentScope) {
67+
if (agentScope != null) {
68+
agentScope.close();
69+
}
70+
}
71+
}
72+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import datadog.trace.agent.test.AgentTestRunner
2+
import datadog.trace.bootstrap.instrumentation.api.AgentScope
3+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
4+
import io.servicetalk.concurrent.api.AsyncContext
5+
import io.servicetalk.context.api.ContextMap
6+
7+
import java.util.concurrent.ExecutorService
8+
import java.util.concurrent.Executors
9+
10+
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
11+
12+
class ContextPreservingInstrumentationTest extends AgentTestRunner {
13+
14+
def "wrapBiConsumer"() {
15+
setup:
16+
def parent = startParentContext()
17+
def wrapped =
18+
asyncContextProvider.wrapBiConsumer({ t, u -> childSpan() }, parent.contextMap)
19+
20+
when:
21+
runInSeparateThread{ wrapped.accept(null, null) }
22+
parent.releaseParentSpan()
23+
24+
then:
25+
assertParentChildTrace()
26+
}
27+
28+
def "wrapBiFunction"() {
29+
setup:
30+
def parent = startParentContext()
31+
def wrapped =
32+
asyncContextProvider.wrapBiFunction({ t, u -> childSpan() }, parent.contextMap)
33+
34+
when:
35+
runInSeparateThread{ wrapped.apply(null, null) }
36+
parent.releaseParentSpan()
37+
38+
then:
39+
assertParentChildTrace()
40+
}
41+
42+
def "wrapCallable"() {
43+
setup:
44+
def parent = startParentContext()
45+
def wrapped =
46+
asyncContextProvider.wrapCallable({ -> childSpan() }, parent.contextMap)
47+
48+
when:
49+
runInSeparateThread{ wrapped.call() }
50+
parent.releaseParentSpan()
51+
52+
then:
53+
assertParentChildTrace()
54+
}
55+
56+
def "wrapConsumer"() {
57+
setup:
58+
def parent = startParentContext()
59+
def wrapped =
60+
asyncContextProvider.wrapConsumer({ t -> childSpan() }, parent.contextMap)
61+
62+
when:
63+
runInSeparateThread{ wrapped.accept(null) }
64+
parent.releaseParentSpan()
65+
66+
then:
67+
assertParentChildTrace()
68+
}
69+
70+
def "wrapFunction"() {
71+
setup:
72+
def parent = startParentContext()
73+
def wrapped =
74+
asyncContextProvider.wrapFunction({ t -> childSpan() }, parent.contextMap)
75+
76+
when:
77+
runInSeparateThread { wrapped.apply(null) }
78+
parent.releaseParentSpan()
79+
80+
then:
81+
assertParentChildTrace()
82+
}
83+
84+
def "wrapRunnable"() {
85+
setup:
86+
def parent = startParentContext()
87+
def wrapped =
88+
asyncContextProvider.wrapRunnable({ -> childSpan() }, parent.contextMap)
89+
90+
when:
91+
runInSeparateThread(wrapped)
92+
parent.releaseParentSpan()
93+
94+
then:
95+
assertParentChildTrace()
96+
}
97+
98+
ExecutorService executor = Executors.newFixedThreadPool(5)
99+
def asyncContextProvider = AsyncContext.provider
100+
101+
def cleanup() {
102+
if (executor != null) {
103+
executor.shutdown()
104+
}
105+
}
106+
107+
private runInSeparateThread(Runnable runnable) {
108+
executor.submit(runnable).get()
109+
}
110+
111+
/**
112+
* Captures async context. Also uses continuation to prevent the span from being reported until it is released.
113+
*/
114+
private class ParentContext {
115+
final ContextMap contextMap = AsyncContext.context().copy()
116+
final AgentScope.Continuation spanContinuation = AgentTracer.capture()
117+
118+
def releaseParentSpan() {
119+
spanContinuation.cancel()
120+
}
121+
}
122+
123+
private startParentContext() {
124+
runUnderTrace("parent") {
125+
new ParentContext()
126+
}
127+
}
128+
129+
/**
130+
* Asserts a parent-child trace meaning that async context propagation works correctly.
131+
*/
132+
private void assertParentChildTrace() {
133+
assertTraces(1) {
134+
trace(2) {
135+
sortSpansByStart()
136+
span {
137+
operationName "parent"
138+
tags {
139+
defaultTags()
140+
}
141+
}
142+
span {
143+
childOf span(0)
144+
operationName "child"
145+
tags {
146+
defaultTags()
147+
}
148+
}
149+
}
150+
}
151+
}
152+
153+
private childSpan() {
154+
AgentTracer.startSpan("test", "child").finish()
155+
}
156+
}

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ include ':dd-java-agent:instrumentation:scala-promise:scala-promise-2.10'
401401
include ':dd-java-agent:instrumentation:scala-promise:scala-promise-2.13'
402402
include ':dd-java-agent:instrumentation:scalatest'
403403
include ':dd-java-agent:instrumentation:selenium'
404+
include ':dd-java-agent:instrumentation:servicetalk'
404405
include ':dd-java-agent:instrumentation:servlet'
405406
include ':dd-java-agent:instrumentation:servlet-common'
406407
include ':dd-java-agent:instrumentation:servlet:request-2'

0 commit comments

Comments
 (0)