diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java index 53516ed..18b5cf2 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java @@ -11,6 +11,7 @@ import com.avioconsulting.mule.opentelemetry.internal.OpenTelemetryOperations; import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection; import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnectionProvider; +import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.AsyncMessageNotificationListener; import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MetricEventNotificationListener; import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MuleMessageProcessorNotificationListener; import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MulePipelineMessageNotificationListener; @@ -190,6 +191,7 @@ public void start() throws MuleException { new MuleMessageProcessorNotificationListener(muleNotificationProcessor)); notificationListenerRegistry.registerListener( new MulePipelineMessageNotificationListener(muleNotificationProcessor)); + notificationListenerRegistry.registerListener(new AsyncMessageNotificationListener(muleNotificationProcessor)); notificationListenerRegistry.registerListener(new MetricEventNotificationListener(muleNotificationProcessor), extensionNotification -> METRIC_NOTIFICATION_DATA_TYPE .isCompatibleWith(extensionNotification.getData().getDataType())); diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactory.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactory.java index 98bc233..69aaa73 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactory.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactory.java @@ -2,6 +2,7 @@ import com.avioconsulting.mule.opentelemetry.api.config.MuleComponent; import com.avioconsulting.mule.opentelemetry.internal.config.OpenTelemetryExtensionConfiguration; +import com.avioconsulting.mule.opentelemetry.internal.processor.MuleCoreProcessorComponent; import com.avioconsulting.mule.opentelemetry.internal.processor.MuleNotificationProcessor; import org.mule.runtime.api.component.ComponentIdentifier; import org.mule.runtime.api.component.location.ComponentLocation; @@ -18,7 +19,6 @@ import java.util.stream.Stream; import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.isFirstProcessor; -import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.*; /** * ProcessorInterceptorFactory can intercept processors. This is injected @@ -72,7 +72,8 @@ private void setupInterceptableComponents(MuleNotificationProcessor muleNotifica Stream.of(excludedNamespaces.split(",")) .forEach(ns -> interceptExclusions.add(new MuleComponent(ns, "*"))); - interceptInclusions.add(new MuleComponent("mule", "flow-ref")); + MuleCoreProcessorComponent.CORE_INTERCEPT_SCOPE_ROUTERS + .forEach(c -> interceptInclusions.add(new MuleComponent("mule", c))); if (muleNotificationProcessor.getTraceLevelConfiguration() != null) { if (muleNotificationProcessor.getTraceLevelConfiguration().getInterceptionDisabledComponents() != null) diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/notifications/listeners/AsyncMessageNotificationListener.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/notifications/listeners/AsyncMessageNotificationListener.java new file mode 100644 index 0000000..e1eec4c --- /dev/null +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/notifications/listeners/AsyncMessageNotificationListener.java @@ -0,0 +1,42 @@ +package com.avioconsulting.mule.opentelemetry.internal.notifications.listeners; + +import com.avioconsulting.mule.opentelemetry.internal.processor.MuleNotificationProcessor; +import org.mule.runtime.api.notification.AsyncMessageNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Async work is scheduled and completed beyond the lifecycle of `mule:async` + * processor execution. + * This dedicated listener for AsyncMessageNotification will let us process the + * spans for Aynch chain of processors. + * + * @since 2.1.2 + */ +public class AsyncMessageNotificationListener extends AbstractMuleNotificationListener + implements org.mule.runtime.api.notification.AsyncMessageNotificationListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncMessageNotificationListener.class); + + public AsyncMessageNotificationListener(MuleNotificationProcessor muleNotificationProcessor) { + super(muleNotificationProcessor); + } + + @Override + public void onNotification(AsyncMessageNotification notification) { + LOGGER.trace("===> Received {}:{}", notification.getClass().getName(), notification.getActionName()); + switch (Integer.parseInt(notification.getAction().getIdentifier())) { + case AsyncMessageNotification.PROCESS_ASYNC_SCHEDULED: + LOGGER.trace("Scheduled {}:{} - {}", notification.getEventName(), + notification.getComponent().getIdentifier().getName(), + notification.getEvent().getContext().getId()); + muleNotificationProcessor.handleAsyncScheduledEvent(notification); + break; + case AsyncMessageNotification.PROCESS_ASYNC_COMPLETE: + LOGGER.trace("Completed {} - {}", notification.getEventName(), + notification.getEvent().getContext().getId()); + muleNotificationProcessor.handleProcessorEndEvent(notification); + break; + } + } +} diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/opentelemetry/sdk/test/DelegatedLoggingSpanTestExporter.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/opentelemetry/sdk/test/DelegatedLoggingSpanTestExporter.java index c7081b5..ed1d758 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/opentelemetry/sdk/test/DelegatedLoggingSpanTestExporter.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/opentelemetry/sdk/test/DelegatedLoggingSpanTestExporter.java @@ -31,6 +31,8 @@ public CompletableResultCode export(Collection spans) { span.setSpanStatusDescription(spanData.getStatus().getDescription()); span.setInstrumentationName(spanData.getInstrumentationScopeInfo().getName()); span.setInstrumentationVersion(spanData.getInstrumentationScopeInfo().getVersion()); + span.setStartEpocNanos(spanData.getStartEpochNanos()); + span.setEndEpocNanos(spanData.getEndEpochNanos()); Map attributes = new HashMap<>(); spanData.getAttributes().forEach((key, value) -> attributes.put(key.getKey(), value)); span.setAttributes(attributes); @@ -70,6 +72,8 @@ public static class Span { private SpanContext parentSpanContext; private SpanContext spanContext; private String spanStatusDescription; + private long startEpocNanos; + private long endEpocNanos; public String getInstrumentationName() { return instrumentationName; @@ -170,12 +174,30 @@ public String toString() { ", spanId='" + spanId + '\'' + ", spanKind='" + spanKind + '\'' + ", spanStatus='" + spanStatus + '\'' + - ", spanStatusDescription='" + spanStatusDescription + '\'' + ", attributes=" + attributes + ", parentSpanContext=" + parentSpanContext + ", spanContext=" + spanContext + + ", spanStatusDescription='" + spanStatusDescription + '\'' + + ", startEpocNanos=" + startEpocNanos + + ", endEpocNanos=" + endEpocNanos + '}'; } + + public void setStartEpocNanos(long startEpocNanos) { + this.startEpocNanos = startEpocNanos; + } + + public long getStartEpocNanos() { + return startEpocNanos; + } + + public void setEndEpocNanos(long endEpocNanos) { + this.endEpocNanos = endEpocNanos; + } + + public long getEndEpocNanos() { + return endEpocNanos; + } } public static class SpanContext { diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java index dba55e6..7a6cddc 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleCoreProcessorComponent.java @@ -4,16 +4,11 @@ import com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil; import org.mule.runtime.api.component.Component; import org.mule.runtime.api.component.ComponentIdentifier; -import org.mule.runtime.api.component.TypedComponentIdentifier; -import org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType; import org.mule.runtime.api.metadata.TypedValue; import org.mule.runtime.api.notification.EnrichedServerNotification; import java.util.*; -import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.ROUTER; -import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE; - /** * This processor handles any specific operations or sources from Mule Core * namespace that are needed for overall tracing. @@ -22,6 +17,16 @@ */ public class MuleCoreProcessorComponent extends AbstractProcessorComponent { + /** + * These core containers are to be processed in interceptor + */ + public static final List CORE_INTERCEPT_SCOPE_ROUTERS = Arrays.asList("flow-ref", "choice", + "first-successful", + "until-successful", + "scatter-gather", "round-robin", "foreach", "parallel-foreach", "try"); + + private static List CORE_PROCESSORS; + @Override protected String getNamespace() { return NAMESPACE_MULE; @@ -29,7 +34,7 @@ protected String getNamespace() { @Override protected List getOperations() { - return Arrays.asList("flow-ref", "choice", "first-successful", "scatter-gather", "round-robin"); + return CORE_PROCESSORS; } @Override @@ -37,6 +42,11 @@ protected List getSources() { return Collections.emptyList(); } + public MuleCoreProcessorComponent() { + CORE_PROCESSORS = new ArrayList<>(CORE_INTERCEPT_SCOPE_ROUTERS); + CORE_PROCESSORS.add("async"); + } + @Override public boolean canHandle(ComponentIdentifier componentIdentifier) { return super.canHandle(componentIdentifier); diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java index 7ea66aa..57f1aaf 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java @@ -13,6 +13,8 @@ import org.mule.runtime.api.component.ComponentIdentifier; import org.mule.runtime.api.component.location.ConfigurationComponentLocator; import org.mule.runtime.api.metadata.TypedValue; +import org.mule.runtime.api.notification.AsyncMessageNotification; +import org.mule.runtime.api.notification.EnrichedServerNotification; import org.mule.runtime.api.notification.MessageProcessorNotification; import org.mule.runtime.api.notification.PipelineMessageNotification; import org.slf4j.Logger; @@ -117,14 +119,40 @@ public void init(OpenTelemetryConnection connection, public void handleProcessorStartEvent(MessageProcessorNotification notification) { String location = notification.getComponent().getLocation().getLocation(); + if (ComponentsUtil.isAsyncScope(notification.getComponent().getLocation().getComponentIdentifier())) { + // Async scopes are handled via AsyncMessageNotifications. + // Creating one here will create duplicate spans + return; + } if (interceptSpannedComponents.contains(location)) { logger.trace( "Component {} will be processed by interceptor, skipping notification processing to create span", location); return; } + processComponentStartSpan(notification); + } + + /** + * Process the {@link AsyncMessageNotification} to capture start of the span. + * + * @param notification + * AsyncMessageNotification + */ + public void handleAsyncScheduledEvent(AsyncMessageNotification notification) { + processComponentStartSpan(notification); + } + + /** + * A common and generic start of the span based on + * {@link EnrichedServerNotification}. + * + * @param notification + * {@link EnrichedServerNotification} + */ + private void processComponentStartSpan(EnrichedServerNotification notification) { try { - ProcessorComponent processorComponent = getProcessorComponent(notification); + ProcessorComponent processorComponent = getProcessorComponent(notification.getComponent().getIdentifier()); if (processorComponent != null) { logger.trace("Handling '{}:{}' processor start event context id {} correlation id {} ", notification.getResourceIdentifier(), notification.getComponent().getIdentifier(), @@ -179,10 +207,10 @@ public ProcessorComponent getProcessorComponent(ComponentIdentifier identifier) return processorComponent; } - public void handleProcessorEndEvent(MessageProcessorNotification notification) { + public void handleProcessorEndEvent(EnrichedServerNotification notification) { String location = notification.getComponent().getLocation().getLocation(); try { - ProcessorComponent processorComponent = getProcessorComponent(notification); + ProcessorComponent processorComponent = getProcessorComponent(notification.getComponent().getIdentifier()); if (processorComponent != null) { logger.trace("Handling '{}:{}' processor end event context id {} correlation id {} ", notification.getResourceIdentifier(), notification.getComponent().getIdentifier(), diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java index a35858f..1064865 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/store/FlowSpan.java @@ -97,6 +97,8 @@ public SpanMeta addProcessorSpan(String containerName, TraceComponent traceCompo Span span = spanBuilder.startSpan(); ProcessorSpan ps = new ProcessorSpan(span, traceComponent.getLocation(), transactionId, traceComponent.getStartTime(), flowName).setTags(traceComponent.getTags()); + LOGGER.trace("Adding span for {}:{} - {}", traceComponent.contextScopedLocation(), traceComponent.getSpanName(), + span.getSpanContext().getSpanId()); childSpans.putIfAbsent(traceComponent.contextScopedLocation(), ps); return ps; } @@ -142,7 +144,7 @@ public SpanMeta endProcessorSpan(TraceComponent traceComponent, Consumer s + getRootSpanName() + " trace transaction " + transactionId + " context " + getSpan().getSpanContext().toString()); - + LOGGER.trace("Removing span for {} - {}", traceComponent.contextScopedLocation(), removed.getSpanId()); endRouteSpans(traceComponent, endTime); removed.setEndTime(endTime); diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java index a3d0981..8dc78cf 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java @@ -135,4 +135,8 @@ public static boolean isFlowTypeContainer(ComponentLocation componentLocation) { || (SCOPE.equals(c.getType()))) .isPresent(); } + + public static boolean isAsyncScope(TypedComponentIdentifier identifier) { + return SCOPE.equals(identifier.getType()) && identifier.getIdentifier().getName().equals("async"); + } } diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java index 3382857..40e9cff 100644 --- a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java +++ b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java @@ -72,6 +72,36 @@ public void testFlowControls_Choice() throws Exception { softly.assertAll(); } + @Test + public void testScopes_foreach() throws Exception { + CoreEvent coreEvent = flowRunner("mule-core-flows-scope_foreach") + .run(); + await().untilAsserted(() -> assertThat(spanQueue) + .hasSize(12)); + Map> groupedSpans = groupSpanByParent(); + System.out.println(groupedSpans); + } + + @Test + public void testFlowScopes() throws Exception { + CoreEvent coreEvent = flowRunner("mule-core-flows-scope-no-generic-span") + .run(); + await().untilAsserted(() -> assertThat(spanQueue) + .hasSize(20)); + Map> groupedSpans = groupSpanByParent(); + System.out.println(groupedSpans); + } + + @Test + public void testFlowWithGenericSpansOnly() throws Exception { + CoreEvent coreEvent = flowRunner("simple-flow-to-flow") + .run(); + await().untilAsserted(() -> assertThat(spanQueue) + .hasSize(3)); + Map> groupedSpans = groupSpanByParent(); + System.out.println(groupedSpans); + } + @NotNull private Map> groupSpanByParent() { // Find the root span diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java index bf28023..7296ae4 100644 --- a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java +++ b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java @@ -5,7 +5,6 @@ import org.jetbrains.annotations.NotNull; import org.junit.Ignore; import org.junit.Test; -import org.mule.runtime.api.exception.DefaultMuleException; import org.mule.runtime.core.api.event.CoreEvent; import java.util.*; @@ -213,9 +212,47 @@ public void testScopes_foreach() throws Exception { CoreEvent coreEvent = flowRunner("mule-core-flows-scope_foreach") .run(); await().untilAsserted(() -> assertThat(spanQueue) - .hasSize(14)); + .hasSize(24)); Map> groupedSpans = groupSpanByParent(); System.out.println(groupedSpans); + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("mule-core-flows-scope_foreach", val -> assertThat(val) + .containsOnly( + "mule-core-flows-scope_foreach", + "foreach:For Each", + "logger:FirstLogger", + "logger:LastLogger")); + softly.assertThat(groupedSpans).hasEntrySatisfying("foreach:For Each", + val -> assertThat(val).containsOnly("/test-simple", "logger:ForEachLogger")); + softly.assertThat(groupedSpans).hasEntrySatisfying("/test-simple", + val -> assertThat(val).containsOnly("GET /test-simple")); + softly.assertThat(groupedSpans).hasEntrySatisfying("GET /test-simple", + val -> assertThat(val).containsOnly("set-payload:Set Payload")); + } + + @Test + public void testScopes_async() throws Exception { + CoreEvent coreEvent = flowRunner("mule-core-flows-async-scope") + .run(); + await().untilAsserted(() -> assertThat(spanQueue) + .hasSize(7)); + Map> groupedSpans = groupSpanByParent(); + System.out.println(groupedSpans); + SoftAssertions softly = new SoftAssertions(); + softly.assertThat(groupedSpans) + .hasEntrySatisfying("mule-core-flows-async-scope", val -> assertThat(val) + .containsOnly( + "mule-core-flows-async-scope", + "logger:FirstLogger", + "async:Async")); + softly.assertThat(groupedSpans).hasEntrySatisfying("async:Async", + val -> assertThat(val).containsOnly("logger:AsyncLogger", "/test-simple")); + softly.assertThat(groupedSpans).hasEntrySatisfying("/test-simple", + val -> assertThat(val).containsOnly("GET /test-simple")); + softly.assertThat(groupedSpans).hasEntrySatisfying("GET /test-simple", + val -> assertThat(val).containsOnly("set-payload:Set Payload")); + softly.assertAll(); } @Test diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactoryTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactoryTest.java index c00b3c1..79c9cc6 100644 --- a/src/test/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactoryTest.java +++ b/src/test/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/MessageProcessorTracingInterceptorFactoryTest.java @@ -279,7 +279,15 @@ public void getInterceptInclusions() { .getInterceptInclusions()) .as("Default Inclusion") .isNotEmpty() - .containsOnly(new MuleComponent("mule", "flow-ref")); + .containsOnly(new MuleComponent("mule", "flow-ref"), + new MuleComponent("mule", "choice"), + new MuleComponent("mule", "first-successful"), + new MuleComponent("mule", "until-successful"), + new MuleComponent("mule", "scatter-gather"), + new MuleComponent("mule", "round-robin"), + new MuleComponent("mule", "foreach"), + new MuleComponent("mule", "parallel-foreach"), + new MuleComponent("mule", "try")); } @Test @@ -294,6 +302,14 @@ public void getInterceptInclusionsWithTraceLevelConfig() { .as("Default with Trace level Inclusion") .isNotEmpty() .containsOnly(new MuleComponent("mule", "flow-ref"), - new MuleComponent("mule", "logger")); + new MuleComponent("mule", "logger"), + new MuleComponent("mule", "choice"), + new MuleComponent("mule", "first-successful"), + new MuleComponent("mule", "until-successful"), + new MuleComponent("mule", "scatter-gather"), + new MuleComponent("mule", "round-robin"), + new MuleComponent("mule", "foreach"), + new MuleComponent("mule", "parallel-foreach"), + new MuleComponent("mule", "try")); } } \ No newline at end of file diff --git a/src/test/resources/mule-core-flows.xml b/src/test/resources/mule-core-flows.xml index 8b17920..1f6abae 100644 --- a/src/test/resources/mule-core-flows.xml +++ b/src/test/resources/mule-core-flows.xml @@ -1,11 +1,13 @@ +http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/tracing/current/mule-tracing.xsd +http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd"> + + + - + + + + + + + + + + + + + @@ -148,11 +165,29 @@ http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/ + + + + + + + + + + + + + + + + + + - + - + @@ -164,10 +199,18 @@ http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/ + + + + + - + + + + @@ -184,4 +227,14 @@ http://www.mulesoft.org/schema/mule/tracing http://www.mulesoft.org/schema/mule/ + + + + + + + + + +