Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper methods for manual spans in mutiny pipelines #45478

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,60 @@ public void tracedWork() {
}
----

brunobat marked this conversation as resolved.
Show resolved Hide resolved
=== Mutiny
Methods returning reactive types can also be annotated with `@WithSpan` and `@AddingSpanAttributes` to create a new span or add attributes to the current span.

If you need to create spans manually within a mutiny pipeline, use `wrapWithSpan` method from `io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper`.

Example. Assuming you have the following pipeline:
[source,java]
----
Uni<String> uni = Uni.createFrom().item("hello")
//start trace here
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
//end trace here
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
wrap it like this:
[source,java]
----
import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan;
...
@Inject
Tracer tracer;
...
Context context = Context.current();
Uni<String> uni = Uni.createFrom().item("hello")
.transformToUni(m -> wrapWithSpan(tracer, Optional.of(context), "my-span-name",
Uni.createFrom().item(m)
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
for multi-pipelines it works similarly:
[source,java]
----
Multi.createFrom().items("Alice", "Bob", "Charlie")
.transformToMultiAndConcatenate(m -> TracingHelper.withTrace("my-span-name",
Multi.createFrom().item(m)
.onItem().transform(name -> "Hello " + name)
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
Instead of `transformToMultiAndConcatenate` you can use `transformToMultiAndMerge` if you don't care about the order of the items.

=== Quarkus Messaging - Kafka

When using the Quarkus Messaging extension for Kafka,
Expand Down
5 changes: 5 additions & 0 deletions extensions/opentelemetry/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
<artifactId>vertx-web-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
package io.quarkus.opentelemetry.deployment.traces;

import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Optional;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporter;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporterProvider;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import jakarta.inject.Inject;

class MutinyTracingHelperTest {

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestSpanExporter.class, TestSpanExporterProvider.class)
.addAsResource(new StringAsset(TestSpanExporterProvider.class.getCanonicalName()),
"META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider"));

@Inject
private TestSpanExporter spanExporter;

@Inject
private Tracer tracer;

@Inject
private Vertx vertx;

@AfterEach
void tearDown() {
spanExporter.reset();
}

@Test
void testSimpleUniPipeline_noContext() {
final ContextProviderRunner contextRunner = new WithoutContextRunner();

simpleuniPipelineTest(contextRunner);
}

@Test
void testSimpleUniPipeline_onRootContext() {
final ContextProviderRunner contextRunner = new RootContextRunner();

simpleuniPipelineTest(contextRunner);
}

@Test
void testSimpleUniPipeline_onDuplicatedContext() {
final ContextProviderRunner contextRunner = new DuplicatedContextRunner();

simpleuniPipelineTest(contextRunner);
}

private void simpleuniPipelineTest(final ContextProviderRunner contextRunner) {
final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("Hello")
.emitOn(r -> contextRunner.runOnContext(r, vertx))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, "testSpan",
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan").startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " world";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem().assertItem("Hello world");

//ensure there are two spans with subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder("testSpan", "subspan");
assertChildSpan(spans, "testSpan", "subspan");
}

@Test
void testSpanWithExplicitParent() {
final ContextProviderRunner contextRunner = new RootContextRunner();

final String parentSpanName = "parentSpan";
final String pipelineSpanName = "pipelineSpan";
final String subspanName = "subspan";

final Span parentSpan = tracer.spanBuilder(parentSpanName).startSpan();
final io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.current().with(parentSpan);

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("Hello")
.emitOn(r -> contextRunner.runOnContext(r, vertx))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, Optional.of(parentContext),
pipelineSpanName,
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder(subspanName).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " world";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem().assertItem("Hello world");
parentSpan.end();

//ensure there are 3 spans with proper parent-child relationships
final List<SpanData> spans = spanExporter.getFinishedSpanItems(3);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, pipelineSpanName,
subspanName);
assertChildSpan(spans, parentSpanName, pipelineSpanName);
assertChildSpan(spans, pipelineSpanName, subspanName);
}

@Test
void testNestedPipeline_implicitParent() {
final ContextProviderRunner contextRunner = new RootContextRunner();

final String parentSpanName = "parentSpan";
final String childSpanName = "childSpan";

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("test")
.emitOn(r -> contextRunner.runOnContext(r, vertx))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, parentSpanName,
Uni.createFrom().item(m)
.onItem().transform(s -> s + " in outer span")
.onItem().transformToUni(m1 -> wrapWithSpan(tracer, childSpanName,
Uni.createFrom().item(m1)
.onItem().transform(s -> "now in inner span")))

))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem();

//ensure there are 2 spans with doSomething and doSomethingAsync as children of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName);
assertChildSpan(spans, parentSpanName, childSpanName);
}

@Test
void testNestedPipeline_explicitNoParent() {
final ContextProviderRunner contextRunner = new RootContextRunner();

final String parentSpanName = "parentSpan";
final String childSpanName = "childSpan";

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("test")
.emitOn(r -> contextRunner.runOnContext(r, vertx))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, parentSpanName,
Uni.createFrom().item(m)
.onItem().transform(s -> s + " in outer span")
.onItem().transformToUni(m1 -> wrapWithSpan(tracer, Optional.empty(), childSpanName,
Uni.createFrom().item(m1)
.onItem().transform(s -> "now in inner span")))

))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem();

//ensure there are 2 spans but without parent-child relationship
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName);
assertThat(spans.stream()
.filter(span -> span.getName().equals(childSpanName))
.findAny()
.orElseThrow()
.getParentSpanId()).isEqualTo("0000000000000000");//signifies no parent
}

@Test
void testSimpleMultiPipeline_Concatenate() {
final ContextProviderRunner contextRunner = new RootContextRunner();

final AssertSubscriber<String> subscriber = Multi.createFrom()
.items("test1", "test2", "test3")
.emitOn(r -> contextRunner.runOnContext(r, vertx))
.onItem()
.transformToUniAndConcatenate(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m,
//the traced pipeline
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan " + s).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " transformed";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(AssertSubscriber.create(3));

subscriber.awaitCompletion().assertItems("test1 transformed", "test2 transformed", "test3 transformed");

//ensure there are six spans with three pairs of subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
for (int i = 1; i <= 3; i++) {
final int currentI = i;
assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue();
assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue();
assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI);
}
}

@Test
void testSimpleMultiPipeline_Merge() {
final ContextProviderRunner contextRunner = new RootContextRunner();

final AssertSubscriber<String> subscriber = Multi.createFrom()
.items("test1", "test2", "test3")
.emitOn(r -> contextRunner.runOnContext(r, vertx))
.onItem()
.transformToUniAndMerge(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m,
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan " + s).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " transformed";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(AssertSubscriber.create(3));

subscriber.awaitCompletion();

//ensure there are six spans with three pairs of subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
for (int i = 1; i <= 3; i++) {
final int currentI = i;
assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue();
assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue();
assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI);
}
}

private static void assertChildSpan(final List<SpanData> spans, final String parentSpanName,
final String childSpanName1) {
assertThat(spans.stream()
.filter(span -> span.getName().equals(childSpanName1))
.findAny()
.orElseThrow()
.getParentSpanId()).isEqualTo(
spans.stream().filter(span -> span.getName().equals(parentSpanName)).findAny().get().getSpanId());
}

private interface ContextProviderRunner {
void runOnContext(Runnable runnable, Vertx vertx);
}

private static class WithoutContextRunner implements ContextProviderRunner {

@Override
public void runOnContext(final Runnable runnable, final Vertx vertx) {
assertThat(QuarkusContextStorage.getVertxContext()).isNull();
runnable.run();
}
}

private static class RootContextRunner implements ContextProviderRunner {
@Override
public void runOnContext(final Runnable runnable, final Vertx vertx) {
final Context rootContext = VertxContext.getRootContext(vertx.getOrCreateContext());
assertThat(rootContext).isNotNull();
assertThat(VertxContext.isDuplicatedContext(rootContext)).isFalse();
assertThat(rootContext).isNotEqualTo(QuarkusContextStorage.getVertxContext());

rootContext.runOnContext(v -> runnable.run());
}
}

private static class DuplicatedContextRunner implements ContextProviderRunner {
@Override
public void runOnContext(final Runnable runnable, final Vertx vertx) {
final Context duplicatedContext = VertxContext.createNewDuplicatedContext(vertx.getOrCreateContext());
assertThat(duplicatedContext).isNotNull();
assertThat(VertxContext.isDuplicatedContext(duplicatedContext)).isTrue();

duplicatedContext.runOnContext(v -> runnable.run());
}
}

}
Loading
Loading