Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flow span lookup on error #175

Merged
merged 1 commit into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public interface TransactionStore {
String TRACE_PREV_CONTEXT_MAP_KEY = "OTEL_PREV_TRACE_CONTEXT";
String TRACE_ID = "traceId";
String SPAN_ID = "spanId";
String OTEL_FLOW_CONTEXT_ID = "_OTEL_FLOW_CONTEXT_ID";
String OTEL_FLOW_PREV_CONTEXT_ID = "_OTEL_FLOW_PREV_CONTEXT_ID";

/**
* A default implementation to get a mule correlation id as a local transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.ERROR_TYPE;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.*;
import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.*;

Expand Down Expand Up @@ -366,6 +367,7 @@ public TransactionMeta endTransaction(final TraceComponent traceComponent, Excep
openTelemetryConnection.setSpanStatus(traceComponent, rootSpan);
if (exception != null) {
rootSpan.recordException(exception);
rootSpan.setAttribute(ERROR_TYPE.getKey(), exception.getClass().getTypeName());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.stream.Stream;

import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.isFirstProcessor;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.*;

/**
Expand Down Expand Up @@ -116,13 +117,10 @@ public boolean intercept(ComponentLocation location) {
boolean intercept = false;
if (interceptorEnabled &&
muleNotificationProcessor.hasConnection()) {
String interceptPath = String.format("%s/processors/0", location.getRootContainerName());
// Intercept the first processor of the flow OR
// included processor/namespaces OR
// any processor/namespaces that are not excluded
ComponentIdentifier identifier = location.getComponentIdentifier().getIdentifier();
boolean firstProcessor = isFlowTypeContainer(location)
&& interceptPath.equalsIgnoreCase(location.getLocation());
boolean interceptConfigured = interceptInclusions.stream()
.anyMatch(mc -> mc.getNamespace().equalsIgnoreCase(identifier.getNamespace())
& (mc.getName().equalsIgnoreCase(identifier.getName())
Expand All @@ -131,7 +129,7 @@ public boolean intercept(ComponentLocation location) {
.noneMatch(mc -> mc.getNamespace().equalsIgnoreCase(identifier.getNamespace())
& (mc.getName().equalsIgnoreCase(identifier.getName())
|| "*".equalsIgnoreCase(mc.getName())));
intercept = firstProcessor
intercept = isFirstProcessor(location)
|| interceptConfigured;

if (intercept) {
Expand All @@ -153,10 +151,4 @@ public boolean intercept(ComponentLocation location) {
return intercept;
}

private boolean isFlowTypeContainer(ComponentLocation componentLocation) {
return componentLocation.getParts().get(0).getPartIdentifier()
.filter(c -> FLOW.equals(c.getType())
|| (SCOPE.equals(c.getType())))
.isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import java.util.concurrent.CompletableFuture;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_SCOPE_SUBFLOW_NAME;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.TRACE_CONTEXT_MAP_KEY;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.TRACE_PREV_CONTEXT_MAP_KEY;
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.*;
import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.*;
import static com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil.getEventTransactionId;

Expand Down Expand Up @@ -70,6 +69,10 @@ public void before(
ProcessorComponent processorComponent = muleNotificationProcessor
.getProcessorComponent(location.getComponentIdentifier().getIdentifier());
switchTraceContext(event, TRACE_CONTEXT_MAP_KEY, TRACE_PREV_CONTEXT_MAP_KEY);
if (isFirstProcessor(location)) {
switchTraceContext(event, OTEL_FLOW_CONTEXT_ID, OTEL_FLOW_PREV_CONTEXT_ID);
event.addVariable(OTEL_FLOW_CONTEXT_ID, event.getContext().getId());
}
if (processorComponent == null) {
// when spanAllProcessor is false, and it's the first generic processor
String transactionId = getEventTransactionId(event);
Expand Down Expand Up @@ -147,6 +150,8 @@ public void before(
@Override
public void after(ComponentLocation location, InterceptionEvent event, Optional<Throwable> thrown) {
switchTraceContext(event, TRACE_PREV_CONTEXT_MAP_KEY, TRACE_CONTEXT_MAP_KEY);
if (isFlowRef(location))
switchTraceContext(event, OTEL_FLOW_PREV_CONTEXT_ID, OTEL_FLOW_CONTEXT_ID);
}

private void switchTraceContext(InterceptionEvent event, String removalContextKey, String newContextKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.avioconsulting.mule.opentelemetry.api.processor.ProcessorComponent;
import com.avioconsulting.mule.opentelemetry.api.store.SpanMeta;
import com.avioconsulting.mule.opentelemetry.api.store.TransactionMeta;
import com.avioconsulting.mule.opentelemetry.api.store.TransactionStore;
import com.avioconsulting.mule.opentelemetry.api.traces.TraceComponent;
import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection;
import com.avioconsulting.mule.opentelemetry.internal.processor.service.ProcessorComponentService;
import com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil;
import io.opentelemetry.api.trace.SpanKind;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.PipelineMessageNotification;
import org.slf4j.Logger;
Expand All @@ -21,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_SCOPE_SUBFLOW_NAME;
Expand Down Expand Up @@ -259,7 +262,21 @@ public void handleFlowEndEvent(PipelineMessageNotification notification) {
.withEventContextId(notification.getEvent().getContext().getId());
TransactionMeta transactionMeta = openTelemetryConnection.endTransaction(traceComponent,
notification.getException());
openTelemetryConnection.getMetricsProviders().captureFlowMetrics(transactionMeta,
if (transactionMeta == null) {
// If transaction isn't found by the current context,
// search by any context from variable
TypedValue<String> contextId = (TypedValue<String>) notification.getEvent().getVariables()
.get(TransactionStore.OTEL_FLOW_CONTEXT_ID);
if (contextId != null && contextId.getValue() != null) {
traceComponent = traceComponent.withEventContextId(contextId.getValue());
}
transactionMeta = openTelemetryConnection.endTransaction(traceComponent,
notification.getException());
}

openTelemetryConnection.getMetricsProviders().captureFlowMetrics(
Objects.requireNonNull(transactionMeta,
"Transaction for " + traceComponent.contextScopedLocation() + " cannot be null"),
notification.getResourceIdentifier(),
notification.getException());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import java.util.Objects;
import java.util.Optional;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_PROCESSOR_NAME;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.FLOW;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.ROUTE;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE;
import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_PROCESSOR_NAME;

public class ComponentsUtil {

private static List<TypedComponentIdentifier.ComponentType> ROUTE_IDENTIFIERS = Arrays.asList(ROUTE, SCOPE);
private static final List<TypedComponentIdentifier.ComponentType> ROUTE_IDENTIFIERS = Arrays.asList(ROUTE, SCOPE);

public static Optional<ComponentLocation> findLocation(String location,
ConfigurationComponentLocator configurationComponentLocator) {
Expand Down Expand Up @@ -120,4 +121,17 @@ public static boolean isFlowTrace(TraceComponent traceComponent) {
return traceComponent != null && traceComponent.getTags() != null
&& "flow".equalsIgnoreCase(traceComponent.getTags().get(MULE_APP_PROCESSOR_NAME.getKey()));
}

public static boolean isFirstProcessor(ComponentLocation location) {
String interceptPath = String.format("%s/processors/0", location.getRootContainerName());
return isFlowTypeContainer(location)
&& interceptPath.equalsIgnoreCase(location.getLocation());
}

public static boolean isFlowTypeContainer(ComponentLocation componentLocation) {
return !componentLocation.getParts().isEmpty() && componentLocation.getParts().get(0).getPartIdentifier()
.filter(c -> FLOW.equals(c.getType())
|| (SCOPE.equals(c.getType())))
.isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public abstract class AbstractMuleArtifactTraceTest extends MuleArtifactFunction

protected static final java.util.Queue<CoreEvent> CAPTURED = new ConcurrentLinkedDeque<>();

protected static DelegatedLoggingSpanTestExporter.Span getSpan(String spanKind, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(spanKind) && s.getSpanName().equals(spanName))
.findFirst().get();
}

@Before
public void beforeTest() {
Awaitility.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import org.jetbrains.annotations.NotNull;
import org.junit.Ignore;
import org.junit.Test;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.core.api.event.CoreEvent;

import java.util.*;
import java.util.stream.Collectors;

import static com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter.spanQueue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static org.awaitility.Awaitility.await;

public class MuleCoreFlowsTest extends AbstractMuleArtifactTraceTest {
Expand All @@ -24,15 +27,15 @@ protected String getConfigFile() {
public void testFlowControls() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:\\get-value")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(16));
}

@Test
public void testFlowControls_ScatterGather() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:scatter-gather:\\get-value")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(23));

Map<Object, Set<String>> groupedSpans = groupSpanByParent();
Expand Down Expand Up @@ -73,7 +76,7 @@ public void testFlowControls_ScatterGather() throws Exception {
"flow-controls:scatter-gather:\\get-value", "logger:LastLogger"));
softly.assertThat((Integer) groupedSpans.values().stream().mapToInt(Set::size).sum())
.as("Total grouped span count")
.isEqualTo(DelegatedLoggingSpanTestExporter.spanQueue.size());
.isEqualTo(spanQueue.size());
softly.assertThat(groupedSpans).as("Number of keys asserted").hasSize(10);

softly.assertAll();
Expand All @@ -82,14 +85,14 @@ public void testFlowControls_ScatterGather() throws Exception {
@NotNull
private Map<Object, Set<String>> groupSpanByParent() {
// Find the root span
DelegatedLoggingSpanTestExporter.Span root = DelegatedLoggingSpanTestExporter.spanQueue.stream()
DelegatedLoggingSpanTestExporter.Span root = spanQueue.stream()
.filter(span -> span.getParentSpanContext().getSpanId().equals("0000000000000000")).findFirst().get();

// Create a lookup of span id and name
Map<String, String> idNameMap = DelegatedLoggingSpanTestExporter.spanQueue.stream().collect(Collectors.toMap(
Map<String, String> idNameMap = spanQueue.stream().collect(Collectors.toMap(
DelegatedLoggingSpanTestExporter.Span::getSpanId, DelegatedLoggingSpanTestExporter.Span::getSpanName));

Map<Object, Set<String>> groupedSpans = DelegatedLoggingSpanTestExporter.spanQueue.stream()
Map<Object, Set<String>> groupedSpans = spanQueue.stream()
.collect(Collectors.groupingBy(
span -> idNameMap.getOrDefault(span.getParentSpanContext().getSpanId(), root.getSpanName()),
Collectors.mapping(DelegatedLoggingSpanTestExporter.Span::getSpanName, Collectors.toSet())));
Expand All @@ -101,15 +104,15 @@ private Map<Object, Set<String>> groupSpanByParent() {
public void testWithCorrelationId() throws Exception {
CoreEvent coreEvent = flowRunner("flow-scope-with-correlation-id")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(20));
}

@Test
public void testRouter_RoundRobin() throws Exception {
CoreEvent coreEvent = flowRunner("flow-controls:round-robin:\\get-value")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(6));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
SoftAssertions softly = new SoftAssertions();
Expand All @@ -131,10 +134,24 @@ public void testRouter_RoundRobin() throws Exception {
public void testScopes_foreach() throws Exception {
CoreEvent coreEvent = flowRunner("mule-core-flows-scope_foreach")
.run();
await().untilAsserted(() -> assertThat(DelegatedLoggingSpanTestExporter.spanQueue)
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(14));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
}

@Test
public void testFlowErrorPropagationSpans() throws Exception {
Exception muleException = catchThrowableOfType(() -> flowRunner("mule-core-flow-1")
.run(), Exception.class);
assertThat(muleException).hasMessage("Random failure").isNotNull();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(9));
assertThat(spanQueue)
.extracting("spanName")
.contains("mule-core-flow-1", "mule-core-flow-2", "mule-core-flow-3");
assertThat(getSpan("SERVER", "mule-core-flow-3").getAttributes())
.containsEntry("error.type", "org.mule.runtime.core.internal.exception.MessagingException");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,9 @@ public void testTraceContextExtraction() throws Exception {
.extracting("spanName", "spanKind", "traceId")
.containsOnly("GET /test/propagation/target", "SERVER", head.getTraceId());
}));
DelegatedLoggingSpanTestExporter.Span sourceServer = DelegatedLoggingSpanTestExporter.spanQueue.stream()
.filter(s -> s.getSpanKind().equals("SERVER") && s.getSpanName().equals("GET /test/propagation/source"))
.findFirst().get();
DelegatedLoggingSpanTestExporter.Span client = DelegatedLoggingSpanTestExporter.spanQueue.stream()
.filter(s -> s.getSpanKind().equals("CLIENT") && s.getSpanName().equals("/test/propagation/target"))
.findFirst().get();
DelegatedLoggingSpanTestExporter.Span targetServer = DelegatedLoggingSpanTestExporter.spanQueue.stream()
.filter(s -> s.getSpanKind().equals("SERVER") && s.getSpanName().equals("GET /test/propagation/target"))
.findFirst().get();
DelegatedLoggingSpanTestExporter.Span sourceServer = getSpan("SERVER", "GET /test/propagation/source");
DelegatedLoggingSpanTestExporter.Span client = getSpan("CLIENT", "/test/propagation/target");
DelegatedLoggingSpanTestExporter.Span targetServer = getSpan("SERVER", "GET /test/propagation/target");

assertThat(targetServer.getParentSpanContext())
.extracting("traceId", "spanId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,4 @@ private static void assertParentSpan(Span childSpan, String description, Span pa
.containsExactly(parentSpan.getTraceId(), parentSpan.getSpanId());
}

private static Span getSpan(String INTERNAL, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(INTERNAL) && s.getSpanName().equals(spanName))
.findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,4 @@ private static void assertParentSpan(DelegatedLoggingSpanTestExporter.Span child
.containsExactly(parentSpan.getTraceId(), parentSpan.getSpanId());
}

private static DelegatedLoggingSpanTestExporter.Span getSpan(String INTERNAL, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(INTERNAL) && s.getSpanName().equals(spanName))
.findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,4 @@ private static void assertParentSpan(DelegatedLoggingSpanTestExporter.Span child
.containsExactly(parentSpan.getTraceId(), parentSpan.getSpanId());
}

private static DelegatedLoggingSpanTestExporter.Span getSpan(String INTERNAL, String spanName) {
return DelegatedLoggingSpanTestExporter.spanQueue
.stream()
.filter(s -> s.getSpanKind().equals(INTERNAL) && s.getSpanName().equals(spanName))
.findFirst().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void verifyVariableReset() {
when(location.getLocation()).thenReturn("test-location");
when(location.getRootContainerName()).thenReturn("test-flow-name");
ComponentIdentifier ci = mock(ComponentIdentifier.class);
when(ci.getName()).thenReturn("some");
TypedComponentIdentifier tci = mock(TypedComponentIdentifier.class);
when(tci.getIdentifier()).thenReturn(ci);
when(location.getComponentIdentifier()).thenReturn(tci);
Expand Down Expand Up @@ -134,6 +135,7 @@ public void verifyVariableResetOnComponentNotFound() {
when(location.getLocation()).thenReturn("test-location");
when(location.getRootContainerName()).thenReturn("test-flow-name");
ComponentIdentifier ci = mock(ComponentIdentifier.class);
when(ci.getName()).thenReturn("some");
TypedComponentIdentifier tci = mock(TypedComponentIdentifier.class);
when(tci.getIdentifier()).thenReturn(ci);
when(location.getComponentIdentifier()).thenReturn(tci);
Expand Down Expand Up @@ -187,6 +189,7 @@ public void verifyVariableResetOnTraceComponentNotFound() {
when(location.getLocation()).thenReturn("test-location");
when(location.getRootContainerName()).thenReturn("test-flow-name");
ComponentIdentifier ci = mock(ComponentIdentifier.class);
when(ci.getName()).thenReturn("some");
TypedComponentIdentifier tci = mock(TypedComponentIdentifier.class);
when(tci.getIdentifier()).thenReturn(ci);
when(location.getComponentIdentifier()).thenReturn(tci);
Expand Down
4 changes: 2 additions & 2 deletions src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
<Logger name="com.avioconsulting.mule.opentelemetry.internal.anypoint.clients.AMCApplicationManagerApi" level="DEBUG">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="io.opentelemetry" level="TRACE">
<Logger name="io.opentelemetry" level="TRACE" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="com.avioconsulting.mule.opentelemetry" level="INFO">
<Logger name="com.avioconsulting.mule.opentelemetry" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.mule.service.http.impl.service.HttpMessageLogger" level="INFO">
Expand Down
Loading
Loading