Skip to content

Commit

Permalink
fix: router span relation (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
manikmagar authored May 2, 2024
1 parent b2d869f commit 335bbc0
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,21 @@ public void addProcessorSpan(TraceComponent traceComponent, String containerName
traceComponent.getTags().forEach(spanBuilder::setAttribute);

String parentLocation = getRouteContainerLocation(traceComponent);
if (parentLocation != null && traceComponent.getLocation().endsWith("/0")) {
if (parentLocation != null) {
// Create parent span for the first processor in the chain /0
SpanMeta parentSpan = addRouteSpan(traceComponent, parentLocation,
TraceComponent parentTrace = TraceComponent.of(parentLocation)
.withLocation(parentLocation)
.withTags(Collections.emptyMap())
.withTransactionId(traceComponent.getTransactionId())
.withSpanName(parentLocation)
.withSpanKind(SpanKind.INTERNAL)
.withEventContextId(traceComponent.getEventContextId())
.withStartTime(traceComponent.getStartTime());
// if (!getTransactionStore().processorSpanExists(traceComponent)) {
SpanMeta parentSpan = addRouteSpan(parentTrace, traceComponent, parentLocation,
getLocationParent(parentLocation));
spanBuilder.setParent(parentSpan.getContext());
// }
}
if (parentLocation == null) {
parentLocation = containerName;
Expand All @@ -325,15 +335,8 @@ public void addProcessorSpan(TraceComponent traceComponent, String containerName
traceComponent, spanBuilder);
}

private SpanMeta addRouteSpan(TraceComponent childTrace, String parentLocation, String rootContainerName) {
TraceComponent parentTrace = TraceComponent.of(parentLocation)
.withLocation(parentLocation)
.withTags(Collections.emptyMap())
.withTransactionId(childTrace.getTransactionId())
.withSpanName(parentLocation)
.withSpanKind(SpanKind.INTERNAL)
.withEventContextId(childTrace.getEventContextId())
.withStartTime(childTrace.getStartTime());
private SpanMeta addRouteSpan(TraceComponent parentTrace, TraceComponent childTrace, String parentLocation,
String rootContainerName) {
SpanBuilder spanBuilder = this.spanBuilder(parentLocation)
.setParent(childTrace.getContext())
.setSpanKind(SpanKind.INTERNAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.*;

import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.ROUTER;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE;

/**
* This processor handles any specific operations or sources from Mule Core
Expand All @@ -21,16 +22,14 @@
*/
public class MuleCoreProcessorComponent extends AbstractProcessorComponent {

private List<ComponentType> scopes = Arrays.asList(ROUTER);

@Override
protected String getNamespace() {
return NAMESPACE_MULE;
}

@Override
protected List<String> getOperations() {
return Collections.singletonList("flow-ref");
return Arrays.asList("flow-ref", "choice", "first-successful", "scatter-gather", "round-robin");
}

@Override
Expand All @@ -40,9 +39,8 @@ protected List<String> getSources() {

@Override
public boolean canHandle(ComponentIdentifier componentIdentifier) {
return super.canHandle(componentIdentifier)
|| (componentIdentifier instanceof TypedComponentIdentifier
&& scopes.contains(((TypedComponentIdentifier) componentIdentifier).getType()));
System.out.println("Can handle " + componentIdentifier + " " + super.canHandle(componentIdentifier));
return super.canHandle(componentIdentifier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public SpanMeta addProcessorSpan(String containerName, TraceComponent traceCompo
Span span = spanBuilder.startSpan();
ProcessorSpan ps = new ProcessorSpan(span, traceComponent.getLocation(), transactionId,
traceComponent.getStartTime(), flowName).setTags(traceComponent.getTags());
childSpans.put(traceComponent.contextScopedLocation(), ps);
childSpans.putIfAbsent(traceComponent.contextScopedLocation(), ps);
return ps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public static String getRouteContainerLocation(TraceComponent traceComponent) {
if (parts.size() > 2) {
int routeIndex = parts.size() - 3;
LocationPart parentPart = parts.get(routeIndex);
System.out.println("parentPart: " + parentPart);
parentLocation = parentPart
.getPartIdentifier()
.filter(ComponentsUtil::isRoute)
Expand Down Expand Up @@ -114,7 +115,8 @@ public static String getLocationParent(String location) {

public static boolean isRoute(TypedComponentIdentifier tci) {
Objects.requireNonNull(tci, "Component Identifier cannot be null");
return tci.getIdentifier().getName().equals("route");
return tci.getIdentifier().getName().equals("route")
|| ROUTE.equals(tci.getType());
}

public static boolean isFlowTrace(TraceComponent traceComponent) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.avioconsulting.mule.opentelemetry;

import com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter;
import org.assertj.core.api.SoftAssertions;
import org.jetbrains.annotations.NotNull;
import org.junit.Ignore;
import org.junit.Test;
import org.mule.runtime.core.api.event.CoreEvent;

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter.spanQueue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static org.awaitility.Awaitility.await;

public class MuleCoreFlowsNoSpanAllTest extends AbstractMuleArtifactTraceTest {

@Override
protected String getConfigFile() {
return "mule-core-flows.xml";
}

@Override
protected void doSetUpBeforeMuleContextCreation() throws Exception {
super.doSetUpBeforeMuleContextCreation();
System.setProperty("mule.otel.span.processors.enable", "false");
}

@Override
protected void doTearDownAfterMuleContextDispose() throws Exception {
super.doTearDownAfterMuleContextDispose();
System.clearProperty("mule.otel.span.processors.enable");
}

@Test
public void testFlowControls_Choice() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:choice-\\get-value")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(5));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:choice-\\get-value", val -> assertThat(val)
.contains(
"flow-controls:choice-\\get-value",
"choice:Choice-Control"));
softly.assertThat(groupedSpans)
.hasEntrySatisfying("choice:Choice-Control", val -> assertThat(val)
.containsAnyOf(
"flow-controls:choice-\\get-value/processors/1/route/0",
"flow-controls:choice-\\get-value/processors/1/route/1"));
if (groupedSpans.containsKey("flow-controls:choice-\\get-value/processors/1/route/0")) {
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/0", val -> assertThat(val)
.contains(
"flow-ref:flow-ref"));
} else {
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/1", val -> assertThat(val)
.contains(
"flow-ref:flow-ref"));
}
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-ref:flow-ref", val -> assertThat(val)
.contains(
"simple-flow-logger"));
softly.assertAll();
}

@NotNull
private Map<Object, Set<String>> groupSpanByParent() {
// Find the root span
DelegatedLoggingSpanTestExporter.Span root = spanQueue.stream()
.filter(span -> span.getParentSpanContext().getSpanId().equals("0000000000000000")).findFirst().get();

// Create a lookup of span id and name
Map<String, String> idNameMap = spanQueue.stream().collect(Collectors.toMap(
DelegatedLoggingSpanTestExporter.Span::getSpanId, DelegatedLoggingSpanTestExporter.Span::getSpanName));

Map<Object, Set<String>> groupedSpans = spanQueue.stream()
.collect(Collectors.groupingBy(
span -> idNameMap.getOrDefault(span.getParentSpanContext().getSpanId(), root.getSpanName()),
Collectors.mapping(DelegatedLoggingSpanTestExporter.Span::getSpanName, Collectors.toSet())));
return groupedSpans;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,84 @@ public void testFlowControls() throws Exception {
.hasSize(16));
}

@Test
public void testFlowControls_Choice() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:choice-\\get-value")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(8));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:choice-\\get-value", val -> assertThat(val)
.contains(
"flow-controls:choice-\\get-value",
"logger:FirstLogger",
"choice:Choice-Control"));
softly.assertThat(groupedSpans)
.hasEntrySatisfying("choice:Choice-Control", val -> assertThat(val)
.containsAnyOf(
"flow-controls:choice-\\get-value/processors/1/route/0",
"flow-controls:choice-\\get-value/processors/1/route/1"));
if (groupedSpans.containsKey("flow-controls:choice-\\get-value/processors/1/route/0")) {
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/0", val -> assertThat(val)
.contains(
"logger:ChoiceWhen", "flow-ref:flow-ref"));
} else {
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:choice-\\get-value/processors/1/route/1", val -> assertThat(val)
.contains(
"logger:ChoiceDefault", "flow-ref:flow-ref"));
}
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-ref:flow-ref", val -> assertThat(val)
.contains(
"simple-flow-logger"));
softly.assertThat(groupedSpans)
.hasEntrySatisfying("simple-flow-logger", val -> assertThat(val)
.contains(
"logger:SimpleLogger"));
softly.assertAll();
}

@Test
public void testFlowControls_FirstSuccessful() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:first-successful-\\get-value")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(5));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:first-successful-\\get-value", val -> assertThat(val)
.contains(
"flow-controls:first-successful-\\get-value",
"logger:FirstLogger",
"first-successful:First-Successful-Control"));
softly.assertThat(groupedSpans)
.hasEntrySatisfying("first-successful:First-Successful-Control", val -> assertThat(val)
.containsAnyOf(
"flow-controls:first-successful-\\get-value/processors/1/route/0",
"flow-controls:first-successful-\\get-value/processors/1/route/1"));
if (groupedSpans.containsKey("flow-controls:first-successful-\\get-value/processors/1/route/0")) {
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:first-successful-\\get-value/processors/1/route/0",
val -> assertThat(val)
.contains(
"logger:FirstSuccess1"));
} else {
softly.assertThat(groupedSpans)
.hasEntrySatisfying("flow-controls:first-successful-\\get-value/processors/1/route/1",
val -> assertThat(val)
.contains(
"logger:FirstSuccess2"));
}
softly.assertAll();
}

@Test
public void testFlowControls_ScatterGather() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:scatter-gather:\\get-value")
Expand All @@ -39,7 +117,7 @@ public void testFlowControls_ScatterGather() throws Exception {
.hasSize(23));

Map<Object, Set<String>> groupedSpans = groupSpanByParent();

System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("scatter-gather:Scatter-Gather-Control", val -> assertThat(val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public String getId() {
return id;
}

// @Override
public String getRootId() {
return id;
}

@Override
public String getCorrelationId() {
return correlationId;
Expand Down
30 changes: 30 additions & 0 deletions src/test/resources/mule-core-flows.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/
</route>
</scatter-gather>
</flow>

<flow name="flow-controls:first-successful-\get-value" >
<logger level="INFO" doc:name="FirstLogger" />
<first-successful doc:name="First-Successful-Control" >
<route >
<logger level="INFO" doc:name="FirstSuccess1" />
</route>
<route >
<logger level="INFO" doc:name="FirstSuccess2" />
</route>
</first-successful>
</flow>

<flow name="flow-controls:choice-\get-value" >
<logger level="INFO" doc:name="FirstLogger" />
<choice doc:name="Choice-Control" >
<when expression="#[random() * 10 &gt; 5]">
<logger level="INFO" doc:name="ChoiceWhen" />
<flow-ref name="simple-flow-logger"/>
</when>
<otherwise >
<logger level="INFO" doc:name="ChoiceDefault" />
<flow-ref name="simple-flow-logger"/>
</otherwise>
</choice>
</flow>
<flow name="flow-controls:round-robin:\get-value" >
<logger level="INFO" doc:name="FirstLogger" />
<round-robin doc:name="Round-Robin-Control" >
Expand Down Expand Up @@ -142,6 +168,10 @@ http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/
<logger level="INFO" message="#['Simple flow']" doc:name="SimpleLogger" />
</sub-flow>

<flow name="simple-flow-logger">
<logger level="INFO" message="#['Simple flow']" doc:name="SimpleLogger" />
</flow>

<flow name="mule-core-flow-1" >
<logger level="INFO" doc:name="FirstLogger" />
<flow-ref name="mule-core-flow-2"/>
Expand Down

0 comments on commit 335bbc0

Please sign in to comment.