Skip to content

Commit

Permalink
fix: flow span lookup on error (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
manikmagar authored Apr 5, 2024
1 parent d5ca8bc commit 5429bef
Show file tree
Hide file tree
Showing 15 changed files with 101 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public interface TransactionStore {
String TRACE_PREV_CONTEXT_MAP_KEY = "OTEL_PREV_TRACE_CONTEXT";
String TRACE_ID = "traceId";
String SPAN_ID = "spanId";
String OTEL_FLOW_CONTEXT_ID = "_OTEL_FLOW_CONTEXT_ID";
String OTEL_FLOW_PREV_CONTEXT_ID = "_OTEL_FLOW_PREV_CONTEXT_ID";

/**
* A default implementation to get a mule correlation id as a local transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.ERROR_TYPE;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.*;
import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.*;

Expand Down Expand Up @@ -366,6 +367,7 @@ public TransactionMeta endTransaction(final TraceComponent traceComponent, Excep
openTelemetryConnection.setSpanStatus(traceComponent, rootSpan);
if (exception != null) {
rootSpan.recordException(exception);
rootSpan.setAttribute(ERROR_TYPE.getKey(), exception.getClass().getTypeName());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.stream.Stream;

import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.isFirstProcessor;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.*;

/**
Expand Down Expand Up @@ -116,13 +117,10 @@ public boolean intercept(ComponentLocation location) {
boolean intercept = false;
if (interceptorEnabled &&
muleNotificationProcessor.hasConnection()) {
String interceptPath = String.format("%s/processors/0", location.getRootContainerName());
// Intercept the first processor of the flow OR
// included processor/namespaces OR
// any processor/namespaces that are not excluded
ComponentIdentifier identifier = location.getComponentIdentifier().getIdentifier();
boolean firstProcessor = isFlowTypeContainer(location)
&& interceptPath.equalsIgnoreCase(location.getLocation());
boolean interceptConfigured = interceptInclusions.stream()
.anyMatch(mc -> mc.getNamespace().equalsIgnoreCase(identifier.getNamespace())
& (mc.getName().equalsIgnoreCase(identifier.getName())
Expand All @@ -131,7 +129,7 @@ public boolean intercept(ComponentLocation location) {
.noneMatch(mc -> mc.getNamespace().equalsIgnoreCase(identifier.getNamespace())
& (mc.getName().equalsIgnoreCase(identifier.getName())
|| "*".equalsIgnoreCase(mc.getName())));
intercept = firstProcessor
intercept = isFirstProcessor(location)
|| interceptConfigured;

if (intercept) {
Expand All @@ -153,10 +151,4 @@ public boolean intercept(ComponentLocation location) {
return intercept;
}

private boolean isFlowTypeContainer(ComponentLocation componentLocation) {
return componentLocation.getParts().get(0).getPartIdentifier()
.filter(c -> FLOW.equals(c.getType())
|| (SCOPE.equals(c.getType())))
.isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import java.util.concurrent.CompletableFuture;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_SCOPE_SUBFLOW_NAME;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.TRACE_CONTEXT_MAP_KEY;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.TRACE_PREV_CONTEXT_MAP_KEY;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.*;
import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.*;
import static com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil.getEventTransactionId;

Expand Down Expand Up @@ -70,6 +69,10 @@ public void before(
ProcessorComponent processorComponent = muleNotificationProcessor
.getProcessorComponent(location.getComponentIdentifier().getIdentifier());
switchTraceContext(event, TRACE_CONTEXT_MAP_KEY, TRACE_PREV_CONTEXT_MAP_KEY);
if (isFirstProcessor(location)) {
switchTraceContext(event, OTEL_FLOW_CONTEXT_ID, OTEL_FLOW_PREV_CONTEXT_ID);
event.addVariable(OTEL_FLOW_CONTEXT_ID, event.getContext().getId());
}
if (processorComponent == null) {
// when spanAllProcessor is false, and it's the first generic processor
String transactionId = getEventTransactionId(event);
Expand Down Expand Up @@ -147,6 +150,8 @@ public void before(
@Override
public void after(ComponentLocation location, InterceptionEvent event, Optional<Throwable> thrown) {
switchTraceContext(event, TRACE_PREV_CONTEXT_MAP_KEY, TRACE_CONTEXT_MAP_KEY);
if (isFlowRef(location))
switchTraceContext(event, OTEL_FLOW_PREV_CONTEXT_ID, OTEL_FLOW_CONTEXT_ID);
}

private void switchTraceContext(InterceptionEvent event, String removalContextKey, String newContextKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.avioconsulting.mule.opentelemetry.api.processor.ProcessorComponent;
import com.avioconsulting.mule.opentelemetry.api.store.SpanMeta;
import com.avioconsulting.mule.opentelemetry.api.store.TransactionMeta;
import com.avioconsulting.mule.opentelemetry.api.store.TransactionStore;
import com.avioconsulting.mule.opentelemetry.api.traces.TraceComponent;
import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection;
import com.avioconsulting.mule.opentelemetry.internal.processor.service.ProcessorComponentService;
import com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil;
import io.opentelemetry.api.trace.SpanKind;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.PipelineMessageNotification;
import org.slf4j.Logger;
Expand All @@ -21,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_SCOPE_SUBFLOW_NAME;
Expand Down Expand Up @@ -259,7 +262,21 @@ public void handleFlowEndEvent(PipelineMessageNotification notification) {
.withEventContextId(notification.getEvent().getContext().getId());
TransactionMeta transactionMeta = openTelemetryConnection.endTransaction(traceComponent,
notification.getException());
openTelemetryConnection.getMetricsProviders().captureFlowMetrics(transactionMeta,
if (transactionMeta == null) {
// If transaction isn't found by the current context,
// search by any context from variable
TypedValue<String> contextId = (TypedValue<String>) notification.getEvent().getVariables()
.get(TransactionStore.OTEL_FLOW_CONTEXT_ID);
if (contextId != null && contextId.getValue() != null) {
traceComponent = traceComponent.withEventContextId(contextId.getValue());
}
transactionMeta = openTelemetryConnection.endTransaction(traceComponent,
notification.getException());
}

openTelemetryConnection.getMetricsProviders().captureFlowMetrics(
Objects.requireNonNull(transactionMeta,
"Transaction for " + traceComponent.contextScopedLocation() + " cannot be null"),
notification.getResourceIdentifier(),
notification.getException());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import java.util.Objects;
import java.util.Optional;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_PROCESSOR_NAME;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.FLOW;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.ROUTE;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE;
import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_PROCESSOR_NAME;

public class ComponentsUtil {

private static List<TypedComponentIdentifier.ComponentType> ROUTE_IDENTIFIERS = Arrays.asList(ROUTE, SCOPE);
private static final List<TypedComponentIdentifier.ComponentType> ROUTE_IDENTIFIERS = Arrays.asList(ROUTE, SCOPE);

public static Optional<ComponentLocation> findLocation(String location,
ConfigurationComponentLocator configurationComponentLocator) {
Expand Down Expand Up @@ -120,4 +121,17 @@ public static boolean isFlowTrace(TraceComponent traceComponent) {
return traceComponent != null && traceComponent.getTags() != null
&& "flow".equalsIgnoreCase(traceComponent.getTags().get(MULE_APP_PROCESSOR_NAME.getKey()));
}

public static boolean isFirstProcessor(ComponentLocation location) {
String interceptPath = String.format("%s/processors/0", location.getRootContainerName());
return isFlowTypeContainer(location)
&& interceptPath.equalsIgnoreCase(location.getLocation());
}

public static boolean isFlowTypeContainer(ComponentLocation componentLocation) {
return !componentLocation.getParts().isEmpty() && componentLocation.getParts().get(0).getPartIdentifier()
.filter(c -> FLOW.equals(c.getType())
|| (SCOPE.equals(c.getType())))
.isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public abstract class AbstractMuleArtifactTraceTest extends MuleArtifactFunction

protected static final java.util.Queue<CoreEvent> CAPTURED = new ConcurrentLinkedDeque<>();

protected static DelegatedLoggingSpanTestExporter.Span getSpan(String spanKind, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(spanKind) && s.getSpanName().equals(spanName))
.findFirst().get();
}

@Before
public void beforeTest() {
Awaitility.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import org.jetbrains.annotations.NotNull;
import org.junit.Ignore;
import org.junit.Test;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.core.api.event.CoreEvent;

import java.util.*;
import java.util.stream.Collectors;

import static com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter.spanQueue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static org.awaitility.Awaitility.await;

public class MuleCoreFlowsTest extends AbstractMuleArtifactTraceTest {
Expand All @@ -24,15 +27,15 @@ protected String getConfigFile() {
public void testFlowControls() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:\\get-value")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(16));
}

@Test
public void testFlowControls_ScatterGather() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:scatter-gather:\\get-value")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(23));

Map<Object, Set<String>> groupedSpans = groupSpanByParent();
Expand Down Expand Up @@ -73,7 +76,7 @@ public void testFlowControls_ScatterGather() throws Exception {
"flow-controls:scatter-gather:\\get-value", "logger:LastLogger"));
softly.assertThat((Integer) groupedSpans.values().stream().mapToInt(Set::size).sum())
.as("Total grouped span count")
.isEqualTo(DelegatedLoggingSpanTestExporter.spanQueue.size());
.isEqualTo(spanQueue.size());
softly.assertThat(groupedSpans).as("Number of keys asserted").hasSize(10);

softly.assertAll();
Expand All @@ -82,14 +85,14 @@ public void testFlowControls_ScatterGather() throws Exception {
@NotNull
private Map<Object, Set<String>> groupSpanByParent() {
// Find the root span
DelegatedLoggingSpanTestExporter.Span root = DelegatedLoggingSpanTestExporter.spanQueue.stream()
DelegatedLoggingSpanTestExporter.Span root = spanQueue.stream()
.filter(span -> span.getParentSpanContext().getSpanId().equals("0000000000000000")).findFirst().get();

// Create a lookup of span id and name
Map<String, String> idNameMap = DelegatedLoggingSpanTestExporter.spanQueue.stream().collect(Collectors.toMap(
Map<String, String> idNameMap = spanQueue.stream().collect(Collectors.toMap(
DelegatedLoggingSpanTestExporter.Span::getSpanId, DelegatedLoggingSpanTestExporter.Span::getSpanName));

Map<Object, Set<String>> groupedSpans = DelegatedLoggingSpanTestExporter.spanQueue.stream()
Map<Object, Set<String>> groupedSpans = spanQueue.stream()
.collect(Collectors.groupingBy(
span -> idNameMap.getOrDefault(span.getParentSpanContext().getSpanId(), root.getSpanName()),
Collectors.mapping(DelegatedLoggingSpanTestExporter.Span::getSpanName, Collectors.toSet())));
Expand All @@ -101,15 +104,15 @@ private Map<Object, Set<String>> groupSpanByParent() {
public void testWithCorrelationId() throws Exception {
CoreEvent coreEvent = flowRunner("flow-scope-with-correlation-id")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(20));
}

@Test
public void testRouter_RoundRobin() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:round-robin:\\get-value")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(6));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
SoftAssertions softly = new SoftAssertions();
Expand All @@ -131,10 +134,24 @@ public void testRouter_RoundRobin() throws Exception {
public void testScopes_foreach() throws Exception {
CoreEvent coreEvent = flowRunner("mule-core-flows-scope_foreach")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(14));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
}

@Test
public void testFlowErrorPropagationSpans() throws Exception {
Exception muleException = catchThrowableOfType(() -> flowRunner("mule-core-flow-1")
.run(), Exception.class);
assertThat(muleException).hasMessage("Random failure").isNotNull();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(9));
assertThat(spanQueue)
.extracting("spanName")
.contains("mule-core-flow-1", "mule-core-flow-2", "mule-core-flow-3");
assertThat(getSpan("SERVER", "mule-core-flow-3").getAttributes())
.containsEntry("error.type", "org.mule.runtime.core.internal.exception.MessagingException");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,9 @@ public void testTraceContextExtraction() throws Exception {
.extracting("spanName", "spanKind", "traceId")
.containsOnly("GET /test/propagation/target", "SERVER", head.getTraceId());
}));
DelegatedLoggingSpanTestExporter.Span sourceServer = DelegatedLoggingSpanTestExporter.spanQueue.stream()
.filter(s -> s.getSpanKind().equals("SERVER") && s.getSpanName().equals("GET /test/propagation/source"))
.findFirst().get();
DelegatedLoggingSpanTestExporter.Span client = DelegatedLoggingSpanTestExporter.spanQueue.stream()
.filter(s -> s.getSpanKind().equals("CLIENT") && s.getSpanName().equals("/test/propagation/target"))
.findFirst().get();
DelegatedLoggingSpanTestExporter.Span targetServer = DelegatedLoggingSpanTestExporter.spanQueue.stream()
.filter(s -> s.getSpanKind().equals("SERVER") && s.getSpanName().equals("GET /test/propagation/target"))
.findFirst().get();
DelegatedLoggingSpanTestExporter.Span sourceServer = getSpan("SERVER", "GET /test/propagation/source");
DelegatedLoggingSpanTestExporter.Span client = getSpan("CLIENT", "/test/propagation/target");
DelegatedLoggingSpanTestExporter.Span targetServer = getSpan("SERVER", "GET /test/propagation/target");

assertThat(targetServer.getParentSpanContext())
.extracting("traceId", "spanId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,4 @@ private static void assertParentSpan(Span childSpan, String description, Span pa
.containsExactly(parentSpan.getTraceId(), parentSpan.getSpanId());
}

private static Span getSpan(String INTERNAL, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(INTERNAL) && s.getSpanName().equals(spanName))
.findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,4 @@ private static void assertParentSpan(DelegatedLoggingSpanTestExporter.Span child
.containsExactly(parentSpan.getTraceId(), parentSpan.getSpanId());
}

private static DelegatedLoggingSpanTestExporter.Span getSpan(String INTERNAL, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(INTERNAL) && s.getSpanName().equals(spanName))
.findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,4 @@ private static void assertParentSpan(DelegatedLoggingSpanTestExporter.Span child
.containsExactly(parentSpan.getTraceId(), parentSpan.getSpanId());
}

private static DelegatedLoggingSpanTestExporter.Span getSpan(String INTERNAL, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(INTERNAL) && s.getSpanName().equals(spanName))
.findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void verifyVariableReset() {
when(location.getLocation()).thenReturn("test-location");
when(location.getRootContainerName()).thenReturn("test-flow-name");
ComponentIdentifier ci = mock(ComponentIdentifier.class);
when(ci.getName()).thenReturn("some");
TypedComponentIdentifier tci = mock(TypedComponentIdentifier.class);
when(tci.getIdentifier()).thenReturn(ci);
when(location.getComponentIdentifier()).thenReturn(tci);
Expand Down Expand Up @@ -134,6 +135,7 @@ public void verifyVariableResetOnComponentNotFound() {
when(location.getLocation()).thenReturn("test-location");
when(location.getRootContainerName()).thenReturn("test-flow-name");
ComponentIdentifier ci = mock(ComponentIdentifier.class);
when(ci.getName()).thenReturn("some");
TypedComponentIdentifier tci = mock(TypedComponentIdentifier.class);
when(tci.getIdentifier()).thenReturn(ci);
when(location.getComponentIdentifier()).thenReturn(tci);
Expand Down Expand Up @@ -187,6 +189,7 @@ public void verifyVariableResetOnTraceComponentNotFound() {
when(location.getLocation()).thenReturn("test-location");
when(location.getRootContainerName()).thenReturn("test-flow-name");
ComponentIdentifier ci = mock(ComponentIdentifier.class);
when(ci.getName()).thenReturn("some");
TypedComponentIdentifier tci = mock(TypedComponentIdentifier.class);
when(tci.getIdentifier()).thenReturn(ci);
when(location.getComponentIdentifier()).thenReturn(tci);
Expand Down
4 changes: 2 additions & 2 deletions src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
<Logger name="com.avioconsulting.mule.opentelemetry.internal.anypoint.clients.AMCApplicationManagerApi" level="DEBUG">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="io.opentelemetry" level="TRACE">
<Logger name="io.opentelemetry" level="TRACE" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="com.avioconsulting.mule.opentelemetry" level="INFO">
<Logger name="com.avioconsulting.mule.opentelemetry" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.mule.service.http.impl.service.HttpMessageLogger" level="INFO">
Expand Down
Loading

0 comments on commit 5429bef

Please sign in to comment.