diff --git a/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts b/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts index 2a4293ad9f9f..b5aa31a0f46b 100644 --- a/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts +++ b/instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts @@ -40,12 +40,4 @@ tasks { jvmTarget = "1.8" } } - - val compileTestKotlin by existing(AbstractCompile::class) - - named("compileTestGroovy") { - // Note: look like it should be `classpath += files(sourceSets.test.kotlin.classesDirectory)` - // instead, but kotlin plugin doesn't support it (yet?) - classpath = classpath.plus(files(compileTestKotlin.get().destinationDir)) - } } diff --git a/instrumentation/kotlinx-coroutines/javaagent/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy b/instrumentation/kotlinx-coroutines/javaagent/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy deleted file mode 100644 index 8ab030cf46be..000000000000 --- a/instrumentation/kotlinx-coroutines/javaagent/src/test/groovy/KotlinCoroutineInstrumentationTest.groovy +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ThreadPoolDispatcherKt - -class KotlinCoroutineInstrumentationTest extends AgentInstrumentationSpecification { - - static dispatchersToTest = [ - Dispatchers.Default, - Dispatchers.IO, - Dispatchers.Unconfined, - ThreadPoolDispatcherKt.newFixedThreadPoolContext(2, "Fixed-Thread-Pool"), - ThreadPoolDispatcherKt.newSingleThreadContext("Single-Thread"), - ] - - def "kotlin traced across channels"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) - - when: - kotlinTest.tracedAcrossChannels() - - then: - assertTraces(1) { - trace(0, 7) { - span(0) { - name "parent" - attributes { - } - } - (0..2).each { - span("produce_$it") { - childOf span(0) - attributes { - } - } - span("consume_$it") { - childOf span(0) - attributes { - } - } - } - } - } - - where: - dispatcher << dispatchersToTest - } - - def "kotlin cancellation prevents trace"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) - - when: - kotlinTest.tracePreventedByCancellation() - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - attributes { - } - } - span("preLaunch") { - childOf span(0) - attributes { - } - } - } - } - - where: - dispatcher << dispatchersToTest - } - - def "kotlin propagates across nested jobs"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) - - when: - kotlinTest.tracedAcrossThreadsWithNested() - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - attributes { - } - } - span("nested") { - childOf span(0) - attributes { - } - } - } - } - - where: - dispatcher << dispatchersToTest - } - - def "kotlin either deferred completion"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default) - - when: - kotlinTest.traceWithDeferred() - - then: - assertTraces(1) { - trace(0, 5) { - span(0) { - name "parent" - attributes { - } - } - span("future1") { - childOf span(0) - attributes { - } - } - span("keptPromise") { - childOf span(0) - attributes { - } - } - span("keptPromise2") { - childOf span(0) - attributes { - } - } - span("brokenPromise") { - childOf span(0) - attributes { - } - } - } - } - - where: - dispatcher << dispatchersToTest - } - - def "kotlin first completed deferred"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default) - - when: - kotlinTest.tracedWithDeferredFirstCompletions() - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - attributes { - } - } - span("timeout1") { - childOf span(0) - attributes { - } - } - span("timeout2") { - childOf span(0) - attributes { - } - } - span("timeout3") { - childOf span(0) - attributes { - } - } - } - } - - where: - dispatcher << dispatchersToTest - } - - def "test concurrent suspend functions"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(Dispatchers.Default) - int numIters = 100 - HashSet seenItersA = new HashSet<>() - HashSet seenItersB = new HashSet<>() - HashSet expectedIters = new HashSet<>((0L..(numIters - 1)).toList()) - - when: - kotlinTest.launchConcurrentSuspendFunctions(numIters) - - then: - // This generates numIters each of "a calls a2" and "b calls b2" traces. Each - // trace should have a single pair of spans (a and a2) and each of those spans - // should have the same iteration number (attribute "iter"). - // The traces are in some random order, so let's keep track and make sure we see - // each iteration # exactly once - assertTraces(numIters * 2) { - for (int i = 0; i < numIters * 2; i++) { - trace(i, 2) { - boolean a = false - long iter = -1 - span(0) { - a = span.name.matches("a") - iter = span.getAttributes().get(AttributeKey.longKey("iter")) - (a ? seenItersA : seenItersB).add(iter) - name(a ? "a" : "b") - } - span(1) { - name(a ? "a2" : "b2") - childOf(span(0)) - assert span.getAttributes().get(AttributeKey.longKey("iter")) == iter - - } - } - } - } - assert seenItersA.equals(expectedIters) - assert seenItersB.equals(expectedIters) - } - - def "kotlin traced mono"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) - - when: - kotlinTest.tracedMono() - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - attributes { - } - } - span("child") { - childOf span(0) - attributes { - } - } - } - } - - where: - dispatcher << dispatchersToTest - } - - def "kotlin traced mono with context propagation operator"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) - - when: - kotlinTest.tracedMonoContextPropagationOperator() - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - attributes { - } - } - span("child") { - childOf span(0) - attributes { - } - } - } - } - - where: - dispatcher << dispatchersToTest - } - - def "kotlin traced flux"() { - setup: - KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher) - - when: - kotlinTest.tracedFlux() - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - attributes { - } - } - (0..2).each { - span("child_$it") { - childOf span(0) - attributes { - } - } - } - } - } - - where: - dispatcher << dispatchersToTest - } -} diff --git a/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/KotlinCoroutineTests.kt b/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/KotlinCoroutineTests.kt deleted file mode 100644 index 4afd2e48c67c..000000000000 --- a/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/KotlinCoroutineTests.kt +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.trace.Tracer -import io.opentelemetry.context.Context -import io.opentelemetry.extension.kotlin.asContextElement -import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.channels.produce -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.reactive.collect -import kotlinx.coroutines.reactor.ReactorContext -import kotlinx.coroutines.reactor.flux -import kotlinx.coroutines.reactor.mono -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.selects.select -import kotlinx.coroutines.withContext -import kotlinx.coroutines.withTimeout -import kotlinx.coroutines.yield -import java.util.concurrent.TimeUnit - -class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) { - val tracer: Tracer = GlobalOpenTelemetry.getTracer("test") - - fun tracedAcrossChannels() = runTest { - - val producer = produce { - repeat(3) { - tracedChild("produce_$it") - send(it) - } - } - - producer.consumeAsFlow().onEach { - tracedChild("consume_$it") - }.collect() - } - - fun tracePreventedByCancellation() { - - kotlin.runCatching { - runTest { - tracedChild("preLaunch") - - launch(start = CoroutineStart.UNDISPATCHED) { - throw Exception("Child Error") - } - - yield() - - tracedChild("postLaunch") - } - } - } - - fun tracedAcrossThreadsWithNested() = runTest { - val goodDeferred = async { 1 } - - launch { - goodDeferred.await() - launch { tracedChild("nested") } - } - } - - fun traceWithDeferred() = runTest { - - val keptPromise = CompletableDeferred() - val brokenPromise = CompletableDeferred() - val afterPromise = async { - keptPromise.await() - tracedChild("keptPromise") - } - val afterPromise2 = async { - keptPromise.await() - tracedChild("keptPromise2") - } - val failedAfterPromise = async { - brokenPromise - .runCatching { await() } - .onFailure { tracedChild("brokenPromise") } - } - - launch { - tracedChild("future1") - keptPromise.complete(true) - brokenPromise.completeExceptionally(IllegalStateException()) - } - - listOf(afterPromise, afterPromise2, failedAfterPromise).awaitAll() - } - - /** - * @return Number of expected spans in the trace - */ - fun tracedWithDeferredFirstCompletions() = runTest { - - val children = listOf( - async { - tracedChild("timeout1") - false - }, - async { - tracedChild("timeout2") - false - }, - async { - tracedChild("timeout3") - true - } - ) - - withTimeout(TimeUnit.SECONDS.toMillis(30)) { - select { - children.forEach { child -> - child.onAwait { it } - } - } - } - } - - fun tracedMono(): Unit = runTest { - mono(dispatcher) { - tracedChild("child") - }.awaitSingle() - } - - fun tracedMonoContextPropagationOperator(): Unit = runTest { - val currentContext = Context.current() - // clear current context to ensure that ContextPropagationOperator is used for context propagation - withContext(Context.root().asContextElement()) { - val mono = mono(dispatcher) { - // extract context from reactor and propagate it into coroutine - val reactorContext = coroutineContext[ReactorContext.Key]?.context - val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current()) - withContext(otelContext.asContextElement()) { - tracedChild("child") - } - } - ContextPropagationOperator.runWithContext(mono, currentContext).awaitSingle() - } - } - - fun tracedFlux() = runTest { - flux(dispatcher) { - repeat(3) { - tracedChild("child_$it") - send(it) - } - }.collect { - } - } - - fun launchConcurrentSuspendFunctions(numIters: Int) { - runBlocking { - for (i in 0 until numIters) { - GlobalScope.launch { - a(i.toLong()) - } - GlobalScope.launch { - b(i.toLong()) - } - } - } - } - - suspend fun a(iter: Long) { - var span = tracer.spanBuilder("a").startSpan() - span.setAttribute("iter", iter) - withContext(span.asContextElement()) { - delay(10) - a2(iter) - } - span.end() - } - - suspend fun a2(iter: Long) { - var span = tracer.spanBuilder("a2").startSpan() - span.setAttribute("iter", iter) - withContext(span.asContextElement()) { - delay(10) - } - span.end() - } - - suspend fun b(iter: Long) { - var span = tracer.spanBuilder("b").startSpan() - span.setAttribute("iter", iter) - withContext(span.asContextElement()) { - delay(10) - b2(iter) - } - span.end() - } - - suspend fun b2(iter: Long) { - var span = tracer.spanBuilder("b2").startSpan() - span.setAttribute("iter", iter) - withContext(span.asContextElement()) { - delay(10) - } - span.end() - } - - fun tracedChild(opName: String) { - tracer.spanBuilder(opName).startSpan().end() - } - - private fun runTest(block: suspend CoroutineScope.() -> T): T { - val parentSpan = tracer.spanBuilder("parent").startSpan() - val parentScope = parentSpan.makeCurrent() - try { - return runBlocking(dispatcher, block = block) - } finally { - parentSpan.end() - parentScope.close() - } - } -} diff --git a/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt b/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt new file mode 100644 index 000000000000..43be9b9dc8fb --- /dev/null +++ b/instrumentation/kotlinx-coroutines/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt @@ -0,0 +1,524 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines + +import io.opentelemetry.context.Context +import io.opentelemetry.extension.kotlin.asContextElement +import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat +import io.opentelemetry.sdk.trace.data.SpanData +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.asExecutor +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactive.collect +import kotlinx.coroutines.reactor.ReactorContext +import kotlinx.coroutines.reactor.flux +import kotlinx.coroutines.reactor.mono +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.selects.select +import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeout +import kotlinx.coroutines.yield +import org.awaitility.Awaitility.await +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.extension.ExtensionContext +import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.Arguments.arguments +import org.junit.jupiter.params.provider.ArgumentsProvider +import org.junit.jupiter.params.provider.ArgumentsSource +import java.time.Duration +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.function.Consumer +import java.util.stream.Stream + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@ExperimentalCoroutinesApi +class KotlinCoroutinesInstrumentationTest { + + companion object { + val threadPool = Executors.newFixedThreadPool(2) + val singleThread = Executors.newSingleThreadExecutor() + } + + @AfterAll + fun shutdown() { + threadPool.shutdown() + singleThread.shutdown() + } + + @RegisterExtension val testing = AgentInstrumentationExtension.create() + + val tracer = testing.openTelemetry.getTracer("test") + + @ParameterizedTest + @ArgumentsSource(DispatchersSource::class) + fun `traced across channels`(dispatcher: CoroutineDispatcher) { + runTest(dispatcher) { + val producer = produce { + repeat(3) { + tracedChild("produce_$it") + send(it) + } + } + + producer.consumeAsFlow().onEach { + tracedChild("consume_$it") + }.collect() + } + + testing.waitAndAssertTraces( + { trace -> + // TODO(anuraaga): Need hasSpansSatisfyingExactlyInAnyOrder sometimes + trace.satisfiesExactlyInAnyOrder( + Consumer { + assertThat(it) + .hasName("parent") + .hasNoParent() + }, + Consumer { + assertThat(it) + .hasName("produce_0") + .hasParent(trace.getSpan(0)) + }, + Consumer { + assertThat(it) + .hasName("consume_0") + .hasParent(trace.getSpan(0)) + }, + Consumer { + assertThat(it) + .hasName("produce_1") + .hasParent(trace.getSpan(0)) + }, + Consumer { + assertThat(it) + .hasName("consume_1") + .hasParent(trace.getSpan(0)) + }, + Consumer { + assertThat(it) + .hasName("produce_2") + .hasParent(trace.getSpan(0)) + }, + Consumer { + assertThat(it) + .hasName("consume_2") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + @ParameterizedTest + @ArgumentsSource(DispatchersSource::class) + fun `cancellation prevents trace`(dispatcher: CoroutineDispatcher) { + runCatching { + runTest(dispatcher) { + tracedChild("preLaunch") + + launch(start = CoroutineStart.UNDISPATCHED) { + throw Exception("Child Error") + } + + yield() + + tracedChild("postLaunch") + } + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly( + { + it.hasName("parent") + .hasNoParent() + }, + { + it.hasName("preLaunch") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + @ParameterizedTest + @ArgumentsSource(DispatchersSource::class) + fun `propagates across nested jobs`(dispatcher: CoroutineDispatcher) { + runTest(dispatcher) { + val goodDeferred = async { 1 } + + launch { + goodDeferred.await() + launch { tracedChild("nested") } + } + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly( + { + it.hasName("parent") + .hasNoParent() + }, + { + it.hasName("nested") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + @Test + fun `deferred completion`() { + runTest(Dispatchers.Default) { + val keptPromise = CompletableDeferred() + val brokenPromise = CompletableDeferred() + val afterPromise = async { + keptPromise.await() + tracedChild("keptPromise") + } + val afterPromise2 = async { + listOf(afterPromise, keptPromise).awaitAll() + tracedChild("keptPromise2") + } + val failedAfterPromise = async { + brokenPromise + .runCatching { await() } + .onFailure { tracedChild("brokenPromise") } + } + + launch { + tracedChild("future1") + keptPromise.complete(true) + brokenPromise.completeExceptionally(IllegalStateException()) + } + + listOf(afterPromise, afterPromise2, failedAfterPromise).awaitAll() + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly( + { + it.hasName("parent") + .hasNoParent() + }, + { + it.hasName("future1") + .hasParent(trace.getSpan(0)) + }, + { + it.hasName("keptPromise") + .hasParent(trace.getSpan(0)) + }, + { + it.hasName("keptPromise2") + .hasParent(trace.getSpan(0)) + }, + { + it.hasName("brokenPromise") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + @Test + fun `first completed deferred`() { + runTest(Dispatchers.Default) { + val children = listOf( + async { + tracedChild("timeout1") + false + }, + async { + tracedChild("timeout2") + false + }, + async { + tracedChild("timeout3") + true + } + ) + + withTimeout(TimeUnit.SECONDS.toMillis(30)) { + select { + children.forEach { child -> + child.onAwait { it } + } + } + } + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly( + { + it.hasName("parent") + .hasNoParent() + }, + { + it.hasName("timeout1") + .hasParent(trace.getSpan(0)) + }, + { + it.hasName("timeout2") + .hasParent(trace.getSpan(0)) + }, + { + it.hasName("timeout3") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + @Test + fun `concurrent suspend functions`() { + val numIters = 100 + runBlocking { + for (i in 0 until numIters) { + GlobalScope.launch { + a(i.toLong()) + } + GlobalScope.launch { + b(i.toLong()) + } + } + } + + // This generates numIters each of "a calls a2" and "b calls b2" traces. Each + // trace should have a single pair of spans (a and a2) and each of those spans + // should have the same iteration number (attribute "iter"). + // The traces are in some random order, so let's keep track and make sure we see + // each iteration # exactly once + val assertions = mutableListOf>>() + for (i in 0 until numIters) { + assertions.add { trace -> + assertThat(trace).satisfiesExactly( + Consumer { + assertThat(it) + .hasName("a") + .hasNoParent() + }, + Consumer { + assertThat(it) + .hasName("a2") + .hasParent(trace.get(0)) + }, + ) + } + assertions.add { trace -> + assertThat(trace).satisfiesExactly( + Consumer { + assertThat(it) + .hasName("b") + .hasNoParent() + }, + Consumer { + assertThat(it) + .hasName("b2") + .hasParent(trace.get(0)) + }, + ) + } + } + + await().atMost(Duration.ofSeconds(30)).untilAsserted { + val traces = testing.waitForTraces(assertions.size) + assertThat(traces).satisfiesExactlyInAnyOrder(*assertions.toTypedArray()) + } + } + + @ParameterizedTest + @ArgumentsSource(DispatchersSource::class) + fun `traced mono`(dispatcher: CoroutineDispatcher) { + runTest(dispatcher) { + mono(dispatcher) { + tracedChild("child") + }.awaitSingle() + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly( + { + it.hasName("parent") + .hasNoParent() + }, + { + it.hasName("child") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + @ParameterizedTest + @ArgumentsSource(DispatchersSource::class) + fun `traced mono with context propagation operator`(dispatcher: CoroutineDispatcher) { + runTest(dispatcher) { + val currentContext = Context.current() + // clear current context to ensure that ContextPropagationOperator is used for context propagation + withContext(Context.root().asContextElement()) { + val mono = mono(dispatcher) { + // extract context from reactor and propagate it into coroutine + val reactorContext = coroutineContext[ReactorContext.Key]?.context + val otelContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, Context.current()) + withContext(otelContext.asContextElement()) { + tracedChild("child") + } + } + ContextPropagationOperator.runWithContext(mono, currentContext).awaitSingle() + } + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly( + { + it.hasName("parent") + .hasNoParent() + }, + { + it.hasName("child") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + @ParameterizedTest + @ArgumentsSource(DispatchersSource::class) + fun `traced flux`(dispatcher: CoroutineDispatcher) { + runTest(dispatcher) { + flux(dispatcher) { + repeat(3) { + tracedChild("child_$it") + send(it) + } + }.collect { + } + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly( + { + it.hasName("parent") + .hasNoParent() + }, + { + it.hasName("child_0") + .hasParent(trace.getSpan(0)) + }, + { + it.hasName("child_1") + .hasParent(trace.getSpan(0)) + }, + { + it.hasName("child_2") + .hasParent(trace.getSpan(0)) + }, + ) + } + ) + } + + private fun tracedChild(opName: String) { + tracer.spanBuilder(opName).startSpan().end() + } + + private fun runTest(dispatcher: CoroutineDispatcher, block: suspend CoroutineScope.() -> T): T { + val parentSpan = tracer.spanBuilder("parent").startSpan() + val parentScope = parentSpan.makeCurrent() + try { + return runBlocking(dispatcher, block = block) + } finally { + parentSpan.end() + parentScope.close() + } + } + + private suspend fun a(iter: Long) { + var span = tracer.spanBuilder("a").startSpan() + span.setAttribute("iter", iter) + withContext(span.asContextElement()) { + delay(10) + a2(iter) + } + span.end() + } + + private suspend fun a2(iter: Long) { + var span = tracer.spanBuilder("a2").startSpan() + span.setAttribute("iter", iter) + withContext(span.asContextElement()) { + delay(10) + } + span.end() + } + + private suspend fun b(iter: Long) { + var span = tracer.spanBuilder("b").startSpan() + span.setAttribute("iter", iter) + withContext(span.asContextElement()) { + delay(10) + b2(iter) + } + span.end() + } + + private suspend fun b2(iter: Long) { + var span = tracer.spanBuilder("b2").startSpan() + span.setAttribute("iter", iter) + withContext(span.asContextElement()) { + delay(10) + } + span.end() + } + + class DispatchersSource : ArgumentsProvider { + override fun provideArguments(context: ExtensionContext?): Stream = + Stream.of( + // Round-trip through Executor for global dispatchers since it seems ParameterizedTest tries to automatically + // close Closeable arguments with no way to avoid it. + arguments(Dispatchers.Default.asExecutor().asCoroutineDispatcher()), + arguments(Dispatchers.IO.asExecutor().asCoroutineDispatcher()), + arguments(Dispatchers.Unconfined.asExecutor().asCoroutineDispatcher()), + arguments(threadPool.asCoroutineDispatcher()), + arguments(singleThread.asCoroutineDispatcher()), + ) + } +} diff --git a/instrumentation/ktor-1.0/library/build.gradle.kts b/instrumentation/ktor-1.0/library/build.gradle.kts index 8e9bdd488860..e4d7e01cc64b 100644 --- a/instrumentation/ktor-1.0/library/build.gradle.kts +++ b/instrumentation/ktor-1.0/library/build.gradle.kts @@ -26,12 +26,4 @@ tasks { jvmTarget = "1.8" } } - - val compileTestKotlin by existing(AbstractCompile::class) - - named("compileTestGroovy") { - // Note: look like it should be `classpath += files(sourceSets.test.kotlin.classesDirectory)` - // instead, but kotlin plugin doesn't support it (yet?) - classpath = classpath.plus(files(compileTestKotlin.get().destinationDir)) - } }