Skip to content

Commit 2cbaf51

Browse files
committed
feat(concurrent): Improve loom support with structured concurrency
Add structured concurrency support Improve virtual thread tests Create a dedicated concurrency module for JDK21+
1 parent 86231f4 commit 2cbaf51

File tree

8 files changed

+313
-29
lines changed

8 files changed

+313
-29
lines changed
Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
ext {
2-
latestDepTestMinJavaVersionForTests = JavaVersion.VERSION_21
3-
}
4-
51
muzzle {
62
pass {
73
coreJdk()
@@ -10,17 +6,11 @@ muzzle {
106

117
apply from: "$rootDir/gradle/java.gradle"
128

13-
addTestSuite('latestDepTest')
14-
15-
compileLatestDepTestGroovy.configure {
16-
javaLauncher = getJavaLauncherFor(21)
17-
}
189
dependencies {
1910
testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
2011

2112
// test dependencies required for testing the executors we permit
2213
testImplementation 'org.apache.tomcat.embed:tomcat-embed-core:7.0.0'
2314
testImplementation deps.guava
2415
testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.9.Final'
25-
latestDepTestImplementation group: 'io.netty', name: 'netty-all', version: '4.+'
2616
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
ext {
2+
minJavaVersionForTests = JavaVersion.VERSION_21
3+
}
4+
5+
apply from: "$rootDir/gradle/java.gradle"
6+
apply plugin: 'idea'
7+
8+
muzzle {
9+
pass {
10+
coreJdk('21')
11+
}
12+
}
13+
14+
idea {
15+
module {
16+
jdkName = '21'
17+
}
18+
}
19+
20+
/*
21+
* Declare previewTest, a test suite that requires the Javac/Java --enable-preview feature flag
22+
*/
23+
addTestSuite('previewTest')
24+
// Configure groovy test file compilation
25+
compilePreviewTestGroovy.configure {
26+
javaLauncher = javaToolchains.launcherFor {
27+
languageVersion = JavaLanguageVersion.of(21)
28+
}
29+
options.compilerArgs.add("--enable-preview")
30+
}
31+
// Configure Java test files compilation
32+
compilePreviewTestJava.configure {
33+
options.compilerArgs.add("--enable-preview")
34+
}
35+
// Configure tests execution
36+
previewTest.configure {
37+
jvmArgs = ['--enable-preview']
38+
}
39+
// Require the preview test suite to run as part of module check
40+
tasks.named("check").configure {
41+
dependsOn "previewTest"
42+
}
43+
44+
dependencies {
45+
testImplementation project(':dd-java-agent:instrumentation:trace-annotation')
46+
}
47+
48+
// Set all compile tasks to use JDK21 but let instrumentation code targets 1.8 compatibility
49+
project.tasks.withType(AbstractCompile).configureEach {
50+
setJavaVersion(it, 21)
51+
}
52+
compileJava.configure {
53+
sourceCompatibility = JavaVersion.VERSION_1_8
54+
targetCompatibility = JavaVersion.VERSION_1_8
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package datadog.trace.instrumentation.java.concurrent.structuredconcurrency;
2+
3+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
4+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.FORK_JOIN_TASK;
5+
import static java.util.Collections.singleton;
6+
import static java.util.Collections.singletonMap;
7+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
8+
9+
import com.google.auto.service.AutoService;
10+
import datadog.trace.agent.tooling.ExcludeFilterProvider;
11+
import datadog.trace.agent.tooling.Instrumenter;
12+
import datadog.trace.agent.tooling.InstrumenterModule;
13+
import datadog.trace.api.Platform;
14+
import datadog.trace.bootstrap.ContextStore;
15+
import datadog.trace.bootstrap.InstrumentationContext;
16+
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
17+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
18+
import java.util.Collection;
19+
import java.util.Map;
20+
import net.bytebuddy.asm.Advice;
21+
22+
/**
23+
* This instrumentation captures the active span scope at StructuredTaskScope task creation
24+
* (SubtaskImpl). The scope is then activate and close through the Runnable instrumentation
25+
* (SubtaskImpl implementation Runnable).
26+
*/
27+
@SuppressWarnings("unused")
28+
@AutoService(InstrumenterModule.class)
29+
public class StructuredTaskScopeInstrumentation extends InstrumenterModule.Tracing
30+
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, ExcludeFilterProvider {
31+
32+
public StructuredTaskScopeInstrumentation() {
33+
super("java_concurrent", "structured_task_scope");
34+
}
35+
36+
@Override
37+
public String instrumentedType() {
38+
return "java.util.concurrent.StructuredTaskScope$SubtaskImpl";
39+
}
40+
41+
@Override
42+
public boolean isEnabled() {
43+
return Platform.isJavaVersionAtLeast(21) && super.isEnabled();
44+
}
45+
46+
@Override
47+
public Map<String, String> contextStore() {
48+
return singletonMap(
49+
"java.util.concurrent.StructuredTaskScope.SubtaskImpl", State.class.getName());
50+
}
51+
52+
@Override
53+
public void methodAdvice(MethodTransformer transformer) {
54+
transformer.applyAdvice(isConstructor(), getClass().getName() + "$ConstructorAdvice");
55+
}
56+
57+
@Override
58+
public Map<ExcludeFilter.ExcludeType, ? extends Collection<String>> excludedClasses() {
59+
// Prevent the ForkJoinPool instrumentation to enable the task scope too early on the carrier
60+
// thread rather than on the expected running thread, which is virtual by default.
61+
return singletonMap(
62+
FORK_JOIN_TASK, singleton("java.util.concurrent.ForkJoinTask$RunnableExecuteAction"));
63+
}
64+
65+
public static final class ConstructorAdvice {
66+
@Advice.OnMethodExit
67+
public static <T> void captureScope(
68+
@Advice.This Object task // StructuredTaskScope.SubtaskImpl (can't use the type)
69+
) {
70+
ContextStore<Object, State> contextStore =
71+
InstrumentationContext.get(
72+
"java.util.concurrent.StructuredTaskScope.SubtaskImpl",
73+
"datadog.trace.bootstrap.instrumentation.java.concurrent.State");
74+
capture(contextStore, task, true);
75+
}
76+
}
77+
}
Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package datadog.trace.instrumentation.java.concurrent;
1+
package datadog.trace.instrumentation.java.concurrent.virtualthread;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
44
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
@@ -11,12 +11,17 @@
1111
import com.google.auto.service.AutoService;
1212
import datadog.trace.agent.tooling.Instrumenter;
1313
import datadog.trace.agent.tooling.InstrumenterModule;
14+
import datadog.trace.api.Platform;
1415
import datadog.trace.bootstrap.InstrumentationContext;
1516
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
1617
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
1718
import java.util.Map;
1819
import net.bytebuddy.asm.Advice;
1920

21+
/**
22+
* Instruments {@code TaskRunner}, internal runnable for {@code ThreadPerTaskExecutor} (JDK 19+ as
23+
* preview, 21+ as stable), the executor with default virtual thread factory.
24+
*/
2025
@AutoService(InstrumenterModule.class)
2126
public final class TaskRunnerInstrumentation extends InstrumenterModule.Tracing
2227
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType {
@@ -29,6 +34,11 @@ public String instrumentedType() {
2934
return "java.util.concurrent.ThreadPerTaskExecutor$TaskRunner";
3035
}
3136

37+
@Override
38+
public boolean isEnabled() {
39+
return Platform.isJavaVersionAtLeast(19) && super.isEnabled();
40+
}
41+
3242
@Override
3343
public Map<String, String> contextStore() {
3444
return singletonMap("java.lang.Runnable", State.class.getName());
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import datadog.trace.agent.test.AgentTestRunner
2+
import datadog.trace.api.Trace
3+
4+
import java.util.concurrent.Callable
5+
import java.util.concurrent.StructuredTaskScope
6+
7+
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
8+
import static datadog.trace.agent.test.utils.TraceUtils.runnableUnderTrace
9+
import static java.time.Instant.now
10+
11+
class StructuredConcurrencyTest extends AgentTestRunner {
12+
/**
13+
* Tests the structured task scope with a single task.
14+
*/
15+
def "test single task"() {
16+
setup:
17+
def taskScope = new StructuredTaskScope.ShutdownOnFailure()
18+
def result = false
19+
20+
when:
21+
runUnderTrace("parent") {
22+
def task = taskScope.fork(new Callable<Boolean>() {
23+
@Trace(operationName = "child")
24+
@Override
25+
Boolean call() throws Exception {
26+
return true
27+
}
28+
})
29+
taskScope.joinUntil(now() + 1) // Wait for a second at maximum
30+
result = task.get()
31+
}
32+
taskScope.close()
33+
34+
then:
35+
result
36+
assertTraces(1) {
37+
sortSpansByStart()
38+
trace(2) {
39+
span(0) {
40+
parent()
41+
operationName "parent"
42+
}
43+
span(1) {
44+
childOfPrevious()
45+
operationName "child"
46+
}
47+
}
48+
}
49+
}
50+
51+
/**
52+
* Tests the structured task scope with a multiple tasks.
53+
* Here is the expected task/span structure:
54+
* <pre>
55+
* parent
56+
* |-- child1
57+
* |-- child2
58+
* \-- child3
59+
* </pre>
60+
*/
61+
def "test multiple tasks"() {
62+
setup:
63+
def taskScope = new StructuredTaskScope.ShutdownOnFailure()
64+
65+
when:
66+
runUnderTrace("parent") {
67+
taskScope.fork {
68+
runnableUnderTrace("child1") {}
69+
}
70+
taskScope.fork {
71+
runnableUnderTrace("child2") {}
72+
}
73+
taskScope.fork {
74+
runnableUnderTrace("child3") {}
75+
}
76+
taskScope.joinUntil(now() + 2) // Wait for two seconds at maximum
77+
}
78+
taskScope.close()
79+
80+
then:
81+
assertTraces(1) {
82+
sortSpansByStart()
83+
trace(4) {
84+
span {
85+
parent()
86+
operationName "parent"
87+
}
88+
def parent = span(0)
89+
span {
90+
childOf(parent)
91+
assert span.operationName.toString().startsWith("child")
92+
}
93+
span {
94+
childOf(parent)
95+
assert span.operationName.toString().startsWith("child")
96+
}
97+
span {
98+
childOf(parent)
99+
assert span.operationName.toString().startsWith("child")
100+
}
101+
}
102+
}
103+
}
104+
105+
/**
106+
* Tests the structured task scope with a multiple nested tasks.
107+
* Here is the expected task/span structure:
108+
* <pre>
109+
* parent
110+
* |-- child1
111+
* | |-- great-child1-1
112+
* | \-- great-child1-2
113+
* \-- child2
114+
* </pre>
115+
*/
116+
def "test nested tasks"() {
117+
setup:
118+
def taskScope = new StructuredTaskScope.ShutdownOnFailure()
119+
120+
when:
121+
runUnderTrace("parent") {
122+
taskScope.fork {
123+
runnableUnderTrace("child1") {
124+
taskScope.fork {
125+
runnableUnderTrace("great-child1-1") {}
126+
}
127+
taskScope.fork {
128+
runnableUnderTrace("great-child1-2") {}
129+
}
130+
}
131+
}
132+
taskScope.fork {
133+
runnableUnderTrace("child2") {}
134+
}
135+
taskScope.joinUntil(now() + 2) // Wait for two seconds at maximum
136+
}
137+
taskScope.close()
138+
139+
then:
140+
assertTraces(1) {
141+
sortSpansByStart()
142+
trace(5) {
143+
// Check parent span
144+
span {
145+
parent()
146+
operationName "parent"
147+
}
148+
def parent = span(0)
149+
// Check child and great child spans
150+
def child1 = null
151+
for (i in 0..<4) {
152+
span {
153+
def name = span.operationName.toString()
154+
if (name.startsWith("child")) {
155+
childOf(parent)
156+
if (name == "child1") {
157+
child1 = span
158+
}
159+
} else if (name.startsWith("great-child1")) {
160+
childOf(child1) // We can assume child1 will be set as spans are sorted by start time
161+
}
162+
}
163+
}
164+
}
165+
}
166+
}
167+
}

dd-java-agent/instrumentation/java-concurrent/src/latestDepTest/groovy/VirtualThreadTest.groovy renamed to dd-java-agent/instrumentation/java-concurrent/java-concurrent-21/src/test/groovy/VirtualThreadTest.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import java.util.concurrent.TimeUnit
1111
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
1212

1313
class VirtualThreadTest extends AgentTestRunner {
14-
1514
@Shared
1615
def executeRunnable = { e, c -> e.execute((Runnable) c) }
1716
@Shared

0 commit comments

Comments
 (0)