From 335bbc04ce074388ac0b6752d8f71ac864ab5b9a Mon Sep 17 00:00:00 2001 From: Manik Magar Date: Fri, 3 May 2024 01:03:11 +0530 Subject: [PATCH] fix: router span relation (#179) --- .../connection/OpenTelemetryConnection.java | 25 ++--- .../processor/MuleCoreProcessorComponent.java | 10 +- .../internal/store/FlowSpan.java | 2 +- .../internal/util/ComponentsUtil.java | 4 +- .../MuleCoreFlowsNoSpanAllTest.java | 92 +++++++++++++++++++ .../mule/opentelemetry/MuleCoreFlowsTest.java | 80 +++++++++++++++- .../test/util/TestInterceptionEvent.java | 5 + src/test/resources/mule-core-flows.xml | 30 ++++++ 8 files changed, 228 insertions(+), 20 deletions(-) create mode 100644 src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java index 5798e25..9f4884c 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java @@ -311,11 +311,21 @@ public void addProcessorSpan(TraceComponent traceComponent, String containerName traceComponent.getTags().forEach(spanBuilder::setAttribute); String parentLocation = getRouteContainerLocation(traceComponent); - if (parentLocation != null && traceComponent.getLocation().endsWith("/0")) { + if (parentLocation != null) { // Create parent span for the first processor in the chain /0 - SpanMeta parentSpan = addRouteSpan(traceComponent, parentLocation, + TraceComponent parentTrace = TraceComponent.of(parentLocation) + .withLocation(parentLocation) + .withTags(Collections.emptyMap()) + .withTransactionId(traceComponent.getTransactionId()) + .withSpanName(parentLocation) + .withSpanKind(SpanKind.INTERNAL) + .withEventContextId(traceComponent.getEventContextId()) + .withStartTime(traceComponent.getStartTime()); + // if (!getTransactionStore().processorSpanExists(traceComponent)) { + SpanMeta parentSpan = addRouteSpan(parentTrace, traceComponent, parentLocation, getLocationParent(parentLocation)); spanBuilder.setParent(parentSpan.getContext()); + // } } if (parentLocation == null) { parentLocation = containerName; @@ -325,15 +335,8 @@ public void addProcessorSpan(TraceComponent traceComponent, String containerName traceComponent, spanBuilder); } - private SpanMeta addRouteSpan(TraceComponent childTrace, String parentLocation, String rootContainerName) { - TraceComponent parentTrace = TraceComponent.of(parentLocation) - .withLocation(parentLocation) - .withTags(Collections.emptyMap()) - .withTransactionId(childTrace.getTransactionId()) - .withSpanName(parentLocation) - .withSpanKind(SpanKind.INTERNAL) - .withEventContextId(childTrace.getEventContextId()) - .withStartTime(childTrace.getStartTime()); + private SpanMeta addRouteSpan(TraceComponent parentTrace, TraceComponent childTrace, String parentLocation, + String rootContainerName) { SpanBuilder spanBuilder = this.spanBuilder(parentLocation) .setParent(childTrace.getContext()) .setSpanKind(SpanKind.INTERNAL) diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java index cd04873..cbdb811 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java @@ -12,6 +12,7 @@ import java.util.*; import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.ROUTER; +import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE; /** * This processor handles any specific operations or sources from Mule Core @@ -21,8 +22,6 @@ */ public class MuleCoreProcessorComponent extends AbstractProcessorComponent { - private List scopes = Arrays.asList(ROUTER); - @Override protected String getNamespace() { return NAMESPACE_MULE; @@ -30,7 +29,7 @@ protected String getNamespace() { @Override protected List getOperations() { - return Collections.singletonList("flow-ref"); + return Arrays.asList("flow-ref", "choice", "first-successful", "scatter-gather", "round-robin"); } @Override @@ -40,9 +39,8 @@ protected List getSources() { @Override public boolean canHandle(ComponentIdentifier componentIdentifier) { - return super.canHandle(componentIdentifier) - || (componentIdentifier instanceof TypedComponentIdentifier - && scopes.contains(((TypedComponentIdentifier) componentIdentifier).getType())); + System.out.println("Can handle " + componentIdentifier + " " + super.canHandle(componentIdentifier)); + return super.canHandle(componentIdentifier); } @Override diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java index a3a5292..a35858f 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java @@ -97,7 +97,7 @@ public SpanMeta addProcessorSpan(String containerName, TraceComponent traceCompo Span span = spanBuilder.startSpan(); ProcessorSpan ps = new ProcessorSpan(span, traceComponent.getLocation(), transactionId, traceComponent.getStartTime(), flowName).setTags(traceComponent.getTags()); - childSpans.put(traceComponent.contextScopedLocation(), ps); + childSpans.putIfAbsent(traceComponent.contextScopedLocation(), ps); return ps; } diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java index 73b7e94..7bba230 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java @@ -71,6 +71,7 @@ public static String getRouteContainerLocation(TraceComponent traceComponent) { if (parts.size() > 2) { int routeIndex = parts.size() - 3; LocationPart parentPart = parts.get(routeIndex); + System.out.println("parentPart: " + parentPart); parentLocation = parentPart .getPartIdentifier() .filter(ComponentsUtil::isRoute) @@ -114,7 +115,8 @@ public static String getLocationParent(String location) { public static boolean isRoute(TypedComponentIdentifier tci) { Objects.requireNonNull(tci, "Component Identifier cannot be null"); - return tci.getIdentifier().getName().equals("route"); + return tci.getIdentifier().getName().equals("route") + || ROUTE.equals(tci.getType()); } public static boolean isFlowTrace(TraceComponent traceComponent) { diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java new file mode 100644 index 0000000..3382857 --- /dev/null +++ b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java @@ -0,0 +1,92 @@ +package com.avioconsulting.mule.opentelemetry; + +import com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter; +import org.assertj.core.api.SoftAssertions; +import org.jetbrains.annotations.NotNull; +import org.junit.Ignore; +import org.junit.Test; +import org.mule.runtime.core.api.event.CoreEvent; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter.spanQueue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowableOfType; +import static org.awaitility.Awaitility.await; + +public class MuleCoreFlowsNoSpanAllTest extends AbstractMuleArtifactTraceTest { + + @Override + protected String getConfigFile() { + return "mule-core-flows.xml"; + } + + @Override + protected void doSetUpBeforeMuleContextCreation() throws Exception { + super.doSetUpBeforeMuleContextCreation(); + System.setProperty("mule.otel.span.processors.enable", "false"); + } + + @Override + protected void doTearDownAfterMuleContextDispose() throws Exception { + super.doTearDownAfterMuleContextDispose(); + System.clearProperty("mule.otel.span.processors.enable"); + } + + @Test + public void testFlowControls_Choice() throws Exception { + CoreEvent coreEvent = flowRunner("flow-controls:choice-\\get-value") + .run(); + await().untilAsserted(() -> assertThat(spanQueue) + .hasSize(5)); + Map> groupedSpans = groupSpanByParent(); + System.out.println(groupedSpans); + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:choice-\\get-value", val -> assertThat(val) + .contains( + "flow-controls:choice-\\get-value", + "choice:Choice-Control")); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("choice:Choice-Control", val -> assertThat(val) + .containsAnyOf( + "flow-controls:choice-\\get-value/processors/1/route/0", + "flow-controls:choice-\\get-value/processors/1/route/1")); + if (groupedSpans.containsKey("flow-controls:choice-\\get-value/processors/1/route/0")) { + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/0", val -> assertThat(val) + .contains( + "flow-ref:flow-ref")); + } else { + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/1", val -> assertThat(val) + .contains( + "flow-ref:flow-ref")); + } + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-ref:flow-ref", val -> assertThat(val) + .contains( + "simple-flow-logger")); + softly.assertAll(); + } + + @NotNull + private Map> groupSpanByParent() { + // Find the root span + DelegatedLoggingSpanTestExporter.Span root = spanQueue.stream() + .filter(span -> span.getParentSpanContext().getSpanId().equals("0000000000000000")).findFirst().get(); + + // Create a lookup of span id and name + Map idNameMap = spanQueue.stream().collect(Collectors.toMap( + DelegatedLoggingSpanTestExporter.Span::getSpanId, DelegatedLoggingSpanTestExporter.Span::getSpanName)); + + Map> groupedSpans = spanQueue.stream() + .collect(Collectors.groupingBy( + span -> idNameMap.getOrDefault(span.getParentSpanContext().getSpanId(), root.getSpanName()), + Collectors.mapping(DelegatedLoggingSpanTestExporter.Span::getSpanName, Collectors.toSet()))); + return groupedSpans; + } + +} diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java index d04d177..bf28023 100644 --- a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java +++ b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java @@ -31,6 +31,84 @@ public void testFlowControls() throws Exception { .hasSize(16)); } + @Test + public void testFlowControls_Choice() throws Exception { + CoreEvent coreEvent = flowRunner("flow-controls:choice-\\get-value") + .run(); + await().untilAsserted(() -> assertThat(spanQueue) + .hasSize(8)); + Map> groupedSpans = groupSpanByParent(); + System.out.println(groupedSpans); + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:choice-\\get-value", val -> assertThat(val) + .contains( + "flow-controls:choice-\\get-value", + "logger:FirstLogger", + "choice:Choice-Control")); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("choice:Choice-Control", val -> assertThat(val) + .containsAnyOf( + "flow-controls:choice-\\get-value/processors/1/route/0", + "flow-controls:choice-\\get-value/processors/1/route/1")); + if (groupedSpans.containsKey("flow-controls:choice-\\get-value/processors/1/route/0")) { + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/0", val -> assertThat(val) + .contains( + "logger:ChoiceWhen", "flow-ref:flow-ref")); + } else { + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/1", val -> assertThat(val) + .contains( + "logger:ChoiceDefault", "flow-ref:flow-ref")); + } + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-ref:flow-ref", val -> assertThat(val) + .contains( + "simple-flow-logger")); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("simple-flow-logger", val -> assertThat(val) + .contains( + "logger:SimpleLogger")); + softly.assertAll(); + } + + @Test + public void testFlowControls_FirstSuccessful() throws Exception { + CoreEvent coreEvent = flowRunner("flow-controls:first-successful-\\get-value") + .run(); + await().untilAsserted(() -> assertThat(spanQueue) + .hasSize(5)); + Map> groupedSpans = groupSpanByParent(); + System.out.println(groupedSpans); + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:first-successful-\\get-value", val -> assertThat(val) + .contains( + "flow-controls:first-successful-\\get-value", + "logger:FirstLogger", + "first-successful:First-Successful-Control")); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("first-successful:First-Successful-Control", val -> assertThat(val) + .containsAnyOf( + "flow-controls:first-successful-\\get-value/processors/1/route/0", + "flow-controls:first-successful-\\get-value/processors/1/route/1")); + if (groupedSpans.containsKey("flow-controls:first-successful-\\get-value/processors/1/route/0")) { + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:first-successful-\\get-value/processors/1/route/0", + val -> assertThat(val) + .contains( + "logger:FirstSuccess1")); + } else { + softly.assertThat(groupedSpans) + .hasEntrySatisfying("flow-controls:first-successful-\\get-value/processors/1/route/1", + val -> assertThat(val) + .contains( + "logger:FirstSuccess2")); + } + softly.assertAll(); + } + @Test public void testFlowControls_ScatterGather() throws Exception { CoreEvent coreEvent = flowRunner("flow-controls:scatter-gather:\\get-value") @@ -39,7 +117,7 @@ public void testFlowControls_ScatterGather() throws Exception { .hasSize(23)); Map> groupedSpans = groupSpanByParent(); - + System.out.println(groupedSpans); SoftAssertions softly = new SoftAssertions(); softly.assertThat(groupedSpans) .hasEntrySatisfying("scatter-gather:Scatter-Gather-Control", val -> assertThat(val) diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/test/util/TestInterceptionEvent.java b/src/test/java/com/avioconsulting/mule/opentelemetry/test/util/TestInterceptionEvent.java index e169817..523bb91 100644 --- a/src/test/java/com/avioconsulting/mule/opentelemetry/test/util/TestInterceptionEvent.java +++ b/src/test/java/com/avioconsulting/mule/opentelemetry/test/util/TestInterceptionEvent.java @@ -124,6 +124,11 @@ public String getId() { return id; } + // @Override + public String getRootId() { + return id; + } + @Override public String getCorrelationId() { return correlationId; diff --git a/src/test/resources/mule-core-flows.xml b/src/test/resources/mule-core-flows.xml index 48f1c6e..8b17920 100644 --- a/src/test/resources/mule-core-flows.xml +++ b/src/test/resources/mule-core-flows.xml @@ -57,6 +57,32 @@ http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/ + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -142,6 +168,10 @@ http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/ + + + +