Skip to content

Commit

Permalink
Fill jvm.thread.state attribute for jvm.thread.count metric on jdk8 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Nov 15, 2024
1 parent c631801 commit 6bafd2b
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -55,11 +57,34 @@ public final class Threads {

/** Register observers for java runtime class metrics. */
public static List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry) {
return INSTANCE.registerObservers(openTelemetry, ManagementFactory.getThreadMXBean());
return INSTANCE.registerObservers(openTelemetry, !isJava9OrNewer());
}

private List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, boolean useThread) {
if (useThread) {
return registerObservers(openTelemetry, Threads::getThreads);
}
return registerObservers(openTelemetry, ManagementFactory.getThreadMXBean());
}

// Visible for testing
List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, ThreadMXBean threadBean) {
return registerObservers(
openTelemetry,
isJava9OrNewer() ? Threads::java9AndNewerCallback : Threads::java8Callback,
threadBean);
}

// Visible for testing
List<AutoCloseable> registerObservers(
OpenTelemetry openTelemetry, Supplier<Thread[]> threadSupplier) {
return registerObservers(openTelemetry, Threads::java8ThreadCallback, threadSupplier);
}

private static <T> List<AutoCloseable> registerObservers(
OpenTelemetry openTelemetry,
Function<T, Consumer<ObservableLongMeasurement>> callbackProvider,
T threadInfo) {
Meter meter = JmxRuntimeMetricsUtil.getMeter(openTelemetry);
List<AutoCloseable> observables = new ArrayList<>();

Expand All @@ -68,8 +93,7 @@ List<AutoCloseable> registerObservers(OpenTelemetry openTelemetry, ThreadMXBean
.upDownCounterBuilder("jvm.thread.count")
.setDescription("Number of executing platform threads.")
.setUnit("{thread}")
.buildWithCallback(
isJava9OrNewer() ? java9AndNewerCallback(threadBean) : java8Callback(threadBean)));
.buildWithCallback(callbackProvider.apply(threadInfo)));

return observables;
}
Expand Down Expand Up @@ -104,6 +128,36 @@ private static Consumer<ObservableLongMeasurement> java8Callback(ThreadMXBean th
};
}

private static Consumer<ObservableLongMeasurement> java8ThreadCallback(
Supplier<Thread[]> supplier) {
return measurement -> {
Map<Attributes, Long> counts = new HashMap<>();
for (Thread thread : supplier.get()) {
Attributes threadAttributes = threadAttributes(thread);
counts.compute(threadAttributes, (k, value) -> value == null ? 1 : value + 1);
}
counts.forEach((threadAttributes, count) -> measurement.record(count, threadAttributes));
};
}

// Visible for testing
static Thread[] getThreads() {
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
while (threadGroup.getParent() != null) {
threadGroup = threadGroup.getParent();
}
// use a slightly larger array in case new threads are created
int count = threadGroup.activeCount() + 10;
Thread[] threads = new Thread[count];
int resultSize = threadGroup.enumerate(threads);
if (resultSize == threads.length) {
return threads;
}
Thread[] result = new Thread[resultSize];
System.arraycopy(threads, 0, result, 0, resultSize);
return result;
}

private static Consumer<ObservableLongMeasurement> java9AndNewerCallback(
ThreadMXBean threadBean) {
return measurement -> {
Expand Down Expand Up @@ -132,5 +186,12 @@ private static Attributes threadAttributes(ThreadInfo threadInfo) {
JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState);
}

private static Attributes threadAttributes(Thread thread) {
boolean isDaemon = thread.isDaemon();
String threadState = thread.getState().name().toLowerCase(Locale.ROOT);
return Attributes.of(
JvmAttributes.JVM_THREAD_DAEMON, isDaemon, JvmAttributes.JVM_THREAD_STATE, threadState);
}

private Threads() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.EnabledOnJre;
Expand All @@ -41,7 +44,7 @@ class ThreadsStableSemconvTest {

@Test
@EnabledOnJre(JRE.JAVA_8)
void registerObservers_Java8() {
void registerObservers_Java8Jmx() {
when(threadBean.getThreadCount()).thenReturn(7);
when(threadBean.getDaemonThreadCount()).thenReturn(2);

Expand Down Expand Up @@ -75,6 +78,45 @@ void registerObservers_Java8() {
equalTo(JVM_THREAD_DAEMON, false))))));
}

@Test
void registerObservers_Java8Thread() {
Thread threadInfo1 = mock(Thread.class, new ThreadInfoAnswer(false, Thread.State.RUNNABLE));
Thread threadInfo2 = mock(Thread.class, new ThreadInfoAnswer(true, Thread.State.WAITING));

Thread[] threads = new Thread[] {threadInfo1, threadInfo2};

Threads.INSTANCE
.registerObservers(testing.getOpenTelemetry(), () -> threads)
.forEach(cleanup::deferCleanup);

testing.waitAndAssertMetrics(
"io.opentelemetry.runtime-telemetry-java8",
"jvm.thread.count",
metrics ->
metrics.anySatisfy(
metricData ->
assertThat(metricData)
.hasInstrumentationScope(EXPECTED_SCOPE)
.hasDescription("Number of executing platform threads.")
.hasUnit("{thread}")
.hasLongSumSatisfying(
sum ->
sum.isNotMonotonic()
.hasPointsSatisfying(
point ->
point
.hasValue(1)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, false),
equalTo(JVM_THREAD_STATE, "runnable")),
point ->
point
.hasValue(1)
.hasAttributesSatisfying(
equalTo(JVM_THREAD_DAEMON, true),
equalTo(JVM_THREAD_STATE, "waiting"))))));
}

@Test
@EnabledForJreRange(min = JRE.JAVA_9)
void registerObservers_Java9AndNewer() {
Expand Down Expand Up @@ -120,6 +162,13 @@ void registerObservers_Java9AndNewer() {
equalTo(JVM_THREAD_STATE, "waiting"))))));
}

@Test
void getThreads() {
Thread[] threads = Threads.getThreads();
Set<Thread> set = new HashSet<>(Arrays.asList(threads));
assertThat(set).contains(Thread.currentThread());
}

static final class ThreadInfoAnswer implements Answer<Object> {

private final boolean isDaemon;
Expand All @@ -135,7 +184,7 @@ public Object answer(InvocationOnMock invocation) {
String methodName = invocation.getMethod().getName();
if (methodName.equals("isDaemon")) {
return isDaemon;
} else if (methodName.equals("getThreadState")) {
} else if (methodName.equals("getThreadState") || methodName.equals("getState")) {
return state;
}
return null;
Expand Down

0 comments on commit 6bafd2b

Please sign in to comment.