Skip to content

Commit

Permalink
fix: async scope spans (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
manikmagar authored May 21, 2024
1 parent b8534a7 commit 65840ea
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.avioconsulting.mule.opentelemetry.internal.OpenTelemetryOperations;
import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection;
import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnectionProvider;
import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.AsyncMessageNotificationListener;
import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MetricEventNotificationListener;
import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MuleMessageProcessorNotificationListener;
import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MulePipelineMessageNotificationListener;
Expand Down Expand Up @@ -190,6 +191,7 @@ public void start() throws MuleException {
new MuleMessageProcessorNotificationListener(muleNotificationProcessor));
notificationListenerRegistry.registerListener(
new MulePipelineMessageNotificationListener(muleNotificationProcessor));
notificationListenerRegistry.registerListener(new AsyncMessageNotificationListener(muleNotificationProcessor));
notificationListenerRegistry.registerListener(new MetricEventNotificationListener(muleNotificationProcessor),
extensionNotification -> METRIC_NOTIFICATION_DATA_TYPE
.isCompatibleWith(extensionNotification.getData().getDataType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.avioconsulting.mule.opentelemetry.api.config.MuleComponent;
import com.avioconsulting.mule.opentelemetry.internal.config.OpenTelemetryExtensionConfiguration;
import com.avioconsulting.mule.opentelemetry.internal.processor.MuleCoreProcessorComponent;
import com.avioconsulting.mule.opentelemetry.internal.processor.MuleNotificationProcessor;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.location.ComponentLocation;
Expand All @@ -18,7 +19,6 @@
import java.util.stream.Stream;

import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.isFirstProcessor;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.*;

/**
* ProcessorInterceptorFactory can intercept processors. This is injected
Expand Down Expand Up @@ -72,7 +72,8 @@ private void setupInterceptableComponents(MuleNotificationProcessor muleNotifica
Stream.of(excludedNamespaces.split(","))
.forEach(ns -> interceptExclusions.add(new MuleComponent(ns, "*")));

interceptInclusions.add(new MuleComponent("mule", "flow-ref"));
MuleCoreProcessorComponent.CORE_INTERCEPT_SCOPE_ROUTERS
.forEach(c -> interceptInclusions.add(new MuleComponent("mule", c)));

if (muleNotificationProcessor.getTraceLevelConfiguration() != null) {
if (muleNotificationProcessor.getTraceLevelConfiguration().getInterceptionDisabledComponents() != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.avioconsulting.mule.opentelemetry.internal.notifications.listeners;

import com.avioconsulting.mule.opentelemetry.internal.processor.MuleNotificationProcessor;
import org.mule.runtime.api.notification.AsyncMessageNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Async work is scheduled and completed beyond the lifecycle of `mule:async`
* processor execution.
* This dedicated listener for AsyncMessageNotification will let us process the
* spans for Aynch chain of processors.
*
* @since 2.1.2
*/
public class AsyncMessageNotificationListener extends AbstractMuleNotificationListener
implements org.mule.runtime.api.notification.AsyncMessageNotificationListener<AsyncMessageNotification> {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncMessageNotificationListener.class);

public AsyncMessageNotificationListener(MuleNotificationProcessor muleNotificationProcessor) {
super(muleNotificationProcessor);
}

@Override
public void onNotification(AsyncMessageNotification notification) {
LOGGER.trace("===> Received {}:{}", notification.getClass().getName(), notification.getActionName());
switch (Integer.parseInt(notification.getAction().getIdentifier())) {
case AsyncMessageNotification.PROCESS_ASYNC_SCHEDULED:
LOGGER.trace("Scheduled {}:{} - {}", notification.getEventName(),
notification.getComponent().getIdentifier().getName(),
notification.getEvent().getContext().getId());
muleNotificationProcessor.handleAsyncScheduledEvent(notification);
break;
case AsyncMessageNotification.PROCESS_ASYNC_COMPLETE:
LOGGER.trace("Completed {} - {}", notification.getEventName(),
notification.getEvent().getContext().getId());
muleNotificationProcessor.handleProcessorEndEvent(notification);
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public CompletableResultCode export(Collection<SpanData> spans) {
span.setSpanStatusDescription(spanData.getStatus().getDescription());
span.setInstrumentationName(spanData.getInstrumentationScopeInfo().getName());
span.setInstrumentationVersion(spanData.getInstrumentationScopeInfo().getVersion());
span.setStartEpocNanos(spanData.getStartEpochNanos());
span.setEndEpocNanos(spanData.getEndEpochNanos());
Map<String, Object> attributes = new HashMap<>();
spanData.getAttributes().forEach((key, value) -> attributes.put(key.getKey(), value));
span.setAttributes(attributes);
Expand Down Expand Up @@ -70,6 +72,8 @@ public static class Span {
private SpanContext parentSpanContext;
private SpanContext spanContext;
private String spanStatusDescription;
private long startEpocNanos;
private long endEpocNanos;

public String getInstrumentationName() {
return instrumentationName;
Expand Down Expand Up @@ -170,12 +174,30 @@ public String toString() {
", spanId='" + spanId + '\'' +
", spanKind='" + spanKind + '\'' +
", spanStatus='" + spanStatus + '\'' +
", spanStatusDescription='" + spanStatusDescription + '\'' +
", attributes=" + attributes +
", parentSpanContext=" + parentSpanContext +
", spanContext=" + spanContext +
", spanStatusDescription='" + spanStatusDescription + '\'' +
", startEpocNanos=" + startEpocNanos +
", endEpocNanos=" + endEpocNanos +
'}';
}

public void setStartEpocNanos(long startEpocNanos) {
this.startEpocNanos = startEpocNanos;
}

public long getStartEpocNanos() {
return startEpocNanos;
}

public void setEndEpocNanos(long endEpocNanos) {
this.endEpocNanos = endEpocNanos;
}

public long getEndEpocNanos() {
return endEpocNanos;
}
}

public static class SpanContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@
import com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.TypedComponentIdentifier;
import org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.EnrichedServerNotification;

import java.util.*;

import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.ROUTER;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE;

/**
* This processor handles any specific operations or sources from Mule Core
* namespace that are needed for overall tracing.
Expand All @@ -22,21 +17,36 @@
*/
public class MuleCoreProcessorComponent extends AbstractProcessorComponent {

/**
* These core containers are to be processed in interceptor
*/
public static final List<String> CORE_INTERCEPT_SCOPE_ROUTERS = Arrays.asList("flow-ref", "choice",
"first-successful",
"until-successful",
"scatter-gather", "round-robin", "foreach", "parallel-foreach", "try");

private static List<String> CORE_PROCESSORS;

@Override
protected String getNamespace() {
return NAMESPACE_MULE;
}

@Override
protected List<String> getOperations() {
return Arrays.asList("flow-ref", "choice", "first-successful", "scatter-gather", "round-robin");
return CORE_PROCESSORS;
}

@Override
protected List<String> getSources() {
return Collections.emptyList();
}

public MuleCoreProcessorComponent() {
CORE_PROCESSORS = new ArrayList<>(CORE_INTERCEPT_SCOPE_ROUTERS);
CORE_PROCESSORS.add("async");
}

@Override
public boolean canHandle(ComponentIdentifier componentIdentifier) {
return super.canHandle(componentIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.AsyncMessageNotification;
import org.mule.runtime.api.notification.EnrichedServerNotification;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.api.notification.PipelineMessageNotification;
import org.slf4j.Logger;
Expand Down Expand Up @@ -117,14 +119,40 @@ public void init(OpenTelemetryConnection connection,

public void handleProcessorStartEvent(MessageProcessorNotification notification) {
String location = notification.getComponent().getLocation().getLocation();
if (ComponentsUtil.isAsyncScope(notification.getComponent().getLocation().getComponentIdentifier())) {
// Async scopes are handled via AsyncMessageNotifications.
// Creating one here will create duplicate spans
return;
}
if (interceptSpannedComponents.contains(location)) {
logger.trace(
"Component {} will be processed by interceptor, skipping notification processing to create span",
location);
return;
}
processComponentStartSpan(notification);
}

/**
* Process the {@link AsyncMessageNotification} to capture start of the span.
*
* @param notification
* AsyncMessageNotification
*/
public void handleAsyncScheduledEvent(AsyncMessageNotification notification) {
processComponentStartSpan(notification);
}

/**
* A common and generic start of the span based on
* {@link EnrichedServerNotification}.
*
* @param notification
* {@link EnrichedServerNotification}
*/
private void processComponentStartSpan(EnrichedServerNotification notification) {
try {
ProcessorComponent processorComponent = getProcessorComponent(notification);
ProcessorComponent processorComponent = getProcessorComponent(notification.getComponent().getIdentifier());
if (processorComponent != null) {
logger.trace("Handling '{}:{}' processor start event context id {} correlation id {} ",
notification.getResourceIdentifier(), notification.getComponent().getIdentifier(),
Expand Down Expand Up @@ -179,10 +207,10 @@ public ProcessorComponent getProcessorComponent(ComponentIdentifier identifier)
return processorComponent;
}

public void handleProcessorEndEvent(MessageProcessorNotification notification) {
public void handleProcessorEndEvent(EnrichedServerNotification notification) {
String location = notification.getComponent().getLocation().getLocation();
try {
ProcessorComponent processorComponent = getProcessorComponent(notification);
ProcessorComponent processorComponent = getProcessorComponent(notification.getComponent().getIdentifier());
if (processorComponent != null) {
logger.trace("Handling '{}:{}' processor end event context id {} correlation id {} ",
notification.getResourceIdentifier(), notification.getComponent().getIdentifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public SpanMeta addProcessorSpan(String containerName, TraceComponent traceCompo
Span span = spanBuilder.startSpan();
ProcessorSpan ps = new ProcessorSpan(span, traceComponent.getLocation(), transactionId,
traceComponent.getStartTime(), flowName).setTags(traceComponent.getTags());
LOGGER.trace("Adding span for {}:{} - {}", traceComponent.contextScopedLocation(), traceComponent.getSpanName(),
span.getSpanContext().getSpanId());
childSpans.putIfAbsent(traceComponent.contextScopedLocation(), ps);
return ps;
}
Expand Down Expand Up @@ -142,7 +144,7 @@ public SpanMeta endProcessorSpan(TraceComponent traceComponent, Consumer<Span> s
+ getRootSpanName()
+ " trace transaction " + transactionId + " context "
+ getSpan().getSpanContext().toString());

LOGGER.trace("Removing span for {} - {}", traceComponent.contextScopedLocation(), removed.getSpanId());
endRouteSpans(traceComponent, endTime);

removed.setEndTime(endTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ public static boolean isFlowTypeContainer(ComponentLocation componentLocation) {
|| (SCOPE.equals(c.getType())))
.isPresent();
}

public static boolean isAsyncScope(TypedComponentIdentifier identifier) {
return SCOPE.equals(identifier.getType()) && identifier.getIdentifier().getName().equals("async");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,36 @@ public void testFlowControls_Choice() throws Exception {
softly.assertAll();
}

@Test
public void testScopes_foreach() throws Exception {
CoreEvent coreEvent = flowRunner("mule-core-flows-scope_foreach")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(12));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
}

@Test
public void testFlowScopes() throws Exception {
CoreEvent coreEvent = flowRunner("mule-core-flows-scope-no-generic-span")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(20));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
}

@Test
public void testFlowWithGenericSpansOnly() throws Exception {
CoreEvent coreEvent = flowRunner("simple-flow-to-flow")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(3));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
}

@NotNull
private Map<Object, Set<String>> groupSpanByParent() {
// Find the root span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.jetbrains.annotations.NotNull;
import org.junit.Ignore;
import org.junit.Test;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.core.api.event.CoreEvent;

import java.util.*;
Expand Down Expand Up @@ -213,9 +212,47 @@ public void testScopes_foreach() throws Exception {
CoreEvent coreEvent = flowRunner("mule-core-flows-scope_foreach")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(14));
.hasSize(24));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("mule-core-flows-scope_foreach", val -> assertThat(val)
.containsOnly(
"mule-core-flows-scope_foreach",
"foreach:For Each",
"logger:FirstLogger",
"logger:LastLogger"));
softly.assertThat(groupedSpans).hasEntrySatisfying("foreach:For Each",
val -> assertThat(val).containsOnly("/test-simple", "logger:ForEachLogger"));
softly.assertThat(groupedSpans).hasEntrySatisfying("/test-simple",
val -> assertThat(val).containsOnly("GET /test-simple"));
softly.assertThat(groupedSpans).hasEntrySatisfying("GET /test-simple",
val -> assertThat(val).containsOnly("set-payload:Set Payload"));
}

@Test
public void testScopes_async() throws Exception {
CoreEvent coreEvent = flowRunner("mule-core-flows-async-scope")
.run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(7));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("mule-core-flows-async-scope", val -> assertThat(val)
.containsOnly(
"mule-core-flows-async-scope",
"logger:FirstLogger",
"async:Async"));
softly.assertThat(groupedSpans).hasEntrySatisfying("async:Async",
val -> assertThat(val).containsOnly("logger:AsyncLogger", "/test-simple"));
softly.assertThat(groupedSpans).hasEntrySatisfying("/test-simple",
val -> assertThat(val).containsOnly("GET /test-simple"));
softly.assertThat(groupedSpans).hasEntrySatisfying("GET /test-simple",
val -> assertThat(val).containsOnly("set-payload:Set Payload"));
softly.assertAll();
}

@Test
Expand Down
Loading

0 comments on commit 65840ea

Please sign in to comment.