Skip to content

Commit

Permalink
fix: trace transaction id generation
Browse files Browse the repository at this point in the history
  • Loading branch information
manikmagar committed Mar 18, 2024
1 parent b300a8b commit cb1d4a1
Show file tree
Hide file tree
Showing 18 changed files with 258 additions and 110 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.avioconsulting.mule</groupId>
<artifactId>mule-opentelemetry-module</artifactId>
<version>2.0.0-M4-SNAPSHOT</version>
<version>2.0.0-M5.1-Scopes.3-SNAPSHOT</version>
<packaging>mule-extension</packaging>

<parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.notification.EnrichedServerNotification;

public interface ProcessorComponent {
Expand All @@ -23,7 +23,17 @@ public interface ProcessorComponent {
*/
TraceComponent getStartTraceComponent(EnrichedServerNotification notification);

TraceComponent getStartTraceComponent(Component component, Message message, String correlationId);
/**
* Build a {@link TraceComponent} for start of the {@link Component} processing
* given {@link Event}
*
* @param component
* {@link Component}
* @param event
* {@link Event}
* @return TraceComponent
*/
TraceComponent getStartTraceComponent(Component component, Event event);

/**
* Build a {@link TraceComponent} for end of a flow-like container or a message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface TransactionStore {
* @return {@link String} transaction id
*/
default String transactionIdFor(Event muleEvent) {
return muleEvent.getCorrelationId();
return muleEvent.getContext().getId();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.runtime.parameter.CorrelationInfo;
import org.mule.runtime.extension.api.runtime.parameter.ParameterResolver;

import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -14,9 +16,9 @@ public class OpenTelemetryOperations {
@DisplayName("Get Trace Context")
@Alias("get-trace-context")
public Map<String, String> getTraceContext(@Connection Supplier<OpenTelemetryConnection> openTelemetryConnection,
@DisplayName("Trace Transaction Id") @Optional(defaultValue = "#[vars.OTEL_TRACE_CONTEXT.TRACE_TRANSACTION_ID]") ParameterResolver<String> traceTransactionId,
CorrelationInfo correlationInfo) {
String transactionId = correlationInfo.getCorrelationId();
return openTelemetryConnection.get().getTraceContext(transactionId);
return openTelemetryConnection.get().getTraceContext(traceTransactionId.resolve());
}

/**
Expand All @@ -36,9 +38,10 @@ public Map<String, String> getTraceContext(@Connection Supplier<OpenTelemetryCon
*/
@DisplayName("Add Custom Tags")
public void addCustomTags(@Connection Supplier<OpenTelemetryConnection> openTelemetryConnection,
@DisplayName("Trace Transaction Id") @Optional(defaultValue = "#[vars.OTEL_TRACE_CONTEXT.TRACE_TRANSACTION_ID]") ParameterResolver<String> traceTransactionId,
Map<String, String> tags,
CorrelationInfo correlationInfo) {
openTelemetryConnection.get().getTransactionStore().addTransactionTags(correlationInfo.getCorrelationId(),
openTelemetryConnection.get().getTransactionStore().addTransactionTags(traceTransactionId.resolve(),
"custom",
tags);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.avioconsulting.mule.opentelemetry.internal.config.OpenTelemetryExtensionConfiguration;
import com.avioconsulting.mule.opentelemetry.internal.processor.MuleNotificationProcessor;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.TypedComponentIdentifier;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.interception.ProcessorInterceptor;
Expand All @@ -14,11 +15,11 @@

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.FLOW;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE;
import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.*;

/**
* ProcessorInterceptorFactory can intercept processors. This is injected
Expand All @@ -36,7 +37,8 @@ public class MessageProcessorTracingInterceptorFactory implements ProcessorInter
public static final String MULE_OTEL_INTERCEPTOR_PROCESSOR_ENABLE_PROPERTY_NAME = "mule.otel.interceptor.processor.enable";
private final boolean interceptorEnabled = Boolean
.parseBoolean(System.getProperty(MULE_OTEL_INTERCEPTOR_PROCESSOR_ENABLE_PROPERTY_NAME, "true"));

private final List<TypedComponentIdentifier.ComponentType> interceptableComponentTypes = Arrays.asList(SCOPE, ROUTE,
ROUTER);
/**
* {@link MuleNotificationProcessor} instance for getting opentelemetry
* connection supplier by processor.
Expand Down Expand Up @@ -135,6 +137,8 @@ public boolean intercept(ComponentLocation location) {
|| "*".equalsIgnoreCase(mc.getName())));
intercept = firstProcessor
|| interceptConfigured;
// ||
// interceptableComponentTypes.contains(location.getComponentIdentifier().getType());

if (intercept) {
// This factory executes during application initialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.TRACE_PREV_CONTEXT_MAP_KEY;
import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.findLocation;
import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.isFlowRef;
import static com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil.getEventTransactionId;

/**
* Interceptor to set tracing context information in flow a variable
Expand All @@ -50,6 +51,7 @@ public class ProcessorTracingInterceptor implements ProcessorInterceptor {
* {@link MuleNotificationProcessor} if configured fully to acquire
* connection supplier.
* @param configurationComponentLocator
* to locate mule components
*/
public ProcessorTracingInterceptor(MuleNotificationProcessor muleNotificationProcessor,
ConfigurationComponentLocator configurationComponentLocator) {
Expand All @@ -71,8 +73,7 @@ public void before(
switchTraceContext(event, TRACE_CONTEXT_MAP_KEY, TRACE_PREV_CONTEXT_MAP_KEY);
if (processorComponent == null) {
// when spanAllProcessor is false, and it's the first generic processor
String transactionId = muleNotificationProcessor.getOpenTelemetryConnection().getTransactionStore()
.transactionIdFor(event);
String transactionId = getEventTransactionId(event);
event.addVariable(TRACE_CONTEXT_MAP_KEY,
muleNotificationProcessor.getOpenTelemetryConnection().getTraceContext(transactionId));
} else {
Expand All @@ -93,8 +94,7 @@ public void before(
switchTraceContext(event, TRACE_PREV_CONTEXT_MAP_KEY, TRACE_CONTEXT_MAP_KEY);
return;
}
TraceComponent traceComponent = processorComponent.getStartTraceComponent(component, event.getMessage(),
event.getCorrelationId());
TraceComponent traceComponent = processorComponent.getStartTraceComponent(component, event);
if (traceComponent == null) {
LOGGER.warn("Could not build a trace component for {} at {}",
location.getComponentIdentifier().getIdentifier(), location.getLocation());
Expand All @@ -105,9 +105,7 @@ public void before(
location.getComponentIdentifier().getIdentifier(), location.getLocation());
muleNotificationProcessor.getOpenTelemetryConnection().addProcessorSpan(traceComponent,
location.getRootContainerName());
final String transactionId = muleNotificationProcessor.getOpenTelemetryConnection()
.getTransactionStore()
.transactionIdFor(event);
final String transactionId = getEventTransactionId(event);
if (isFlowRef(location)) {
Optional<ComponentLocation> subFlowLocation = findLocation(
traceComponent.getTags().get("mule.app.processor.flowRef.name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.TypedValue;
Expand All @@ -19,6 +20,7 @@
import java.util.*;

import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.*;
import static com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil.getEventTransactionId;

public abstract class AbstractProcessorComponent implements ProcessorComponent {

Expand Down Expand Up @@ -95,7 +97,7 @@ protected String getDefaultSpanName(Map<String, String> tags) {
}

protected String getTransactionId(EnrichedServerNotification notification) {
return notification.getEvent().getCorrelationId();
return getEventTransactionId(notification.getEvent());
}

protected Map<String, String> getProcessorCommonTags(Component component) {
Expand Down Expand Up @@ -134,33 +136,30 @@ protected <A> Map<String, String> getAttributes(Component component, TypedValue<

@Override
public TraceComponent getStartTraceComponent(EnrichedServerNotification notification) {
return getStartTraceComponent(notification.getComponent(), notification.getEvent().getMessage(),
getTransactionId(notification));
return getStartTraceComponent(notification.getComponent(), notification.getEvent());
}

/**
* Create a start trace component without the notification object. This is
* mostly consumed by interceptors.
*
*
* @param component
* {@link Component}
* @param message
* @param event
* {@link Message}
* @param correlationId
* {@link String}
* @return TraceComponent
*/
public TraceComponent getStartTraceComponent(Component component, Message message, String correlationId) {
public TraceComponent getStartTraceComponent(Component component, Event event) {
Map<String, String> tags = new HashMap<>(getProcessorCommonTags(component));
tags.put(MULE_CORRELATION_ID.getKey(), correlationId);
tags.put(MULE_CORRELATION_ID.getKey(), event.getCorrelationId());
tags.putAll(getAttributes(component,
message.getAttributes()));
event.getMessage().getAttributes()));
return TraceComponent.named(component.getLocation().getLocation())
.withLocation(component.getLocation().getLocation())
.withSpanName(getDefaultSpanName(tags))
.withTags(tags)
.withSpanKind(getSpanKind())
.withTransactionId(correlationId);
.withTransactionId(getEventTransactionId(event));
}

protected void addTagIfPresent(Map<String, String> sourceMap, String sourceKey, Map<String, String> targetMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.EnrichedServerNotification;

Expand Down Expand Up @@ -55,8 +55,8 @@ private String formattedSpanName(String queueName, String operation) {
}

@Override
public TraceComponent getStartTraceComponent(Component component, Message message, String correlationId) {
TraceComponent startTraceComponent = super.getStartTraceComponent(component, message, correlationId);
public TraceComponent getStartTraceComponent(Component component, Event event) {
TraceComponent startTraceComponent = super.getStartTraceComponent(component, event);
if ("consume".equalsIgnoreCase(startTraceComponent.getTags().get(MULE_APP_PROCESSOR_NAME.getKey()))) {
// TODO: Handling a different Parent Span than flow containing Consume
// It may be possible that message was published by a different server flow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.mule.extension.http.api.HttpResponseAttributes;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.ComponentIdentifier;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.MediaType;
Expand Down Expand Up @@ -123,12 +124,12 @@ private StatusCode getSpanStatus(boolean isServer, int statusCode) {
}

@Override
public TraceComponent getStartTraceComponent(Component component, Message message, String correlationId) {
public TraceComponent getStartTraceComponent(Component component, Event event) {

TraceComponent traceComponent = super.getStartTraceComponent(component, message, correlationId);
TraceComponent traceComponent = super.getStartTraceComponent(component, event);

Map<String, String> requesterTags = getAttributes(component,
message.getAttributes());
event.getMessage().getAttributes());
requesterTags.putAll(traceComponent.getTags());

return TraceComponent.named(component.getLocation().getRootContainerName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ public void handleProcessorEndEvent(MessageProcessorNotification notification) {

public void handleFlowStartEvent(PipelineMessageNotification notification) {
try {
logger.trace("Handling '{}' flow start event", notification.getResourceIdentifier());
logger.info("Handling '{}' flow start event context id {} correlation id {} ",
notification.getResourceIdentifier(), notification.getEvent().getContext().getId(),
notification.getEvent().getCorrelationId());
TraceComponent traceComponent = flowProcessorComponent
.getSourceStartTraceComponent(notification, openTelemetryConnection)
.withStartTime(Instant.ofEpochMilli(notification.getTimestamp()));
Expand All @@ -244,7 +246,9 @@ public void handleFlowStartEvent(PipelineMessageNotification notification) {

public void handleFlowEndEvent(PipelineMessageNotification notification) {
try {
logger.trace("Handling '{}' flow end event", notification.getResourceIdentifier());
logger.info("Handling '{}' flow end event context id {} correlation id {} ",
notification.getResourceIdentifier(), notification.getEvent().getContext().getId(),
notification.getEvent().getCorrelationId());
TraceComponent traceComponent = flowProcessorComponent
.getSourceEndTraceComponent(notification, openTelemetryConnection)
.withEndTime(Instant.ofEpochMilli(notification.getTimestamp()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@
import com.avioconsulting.mule.opentelemetry.internal.util.PropertiesUtil;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import static com.avioconsulting.mule.opentelemetry.internal.processor.util.HttpSpanUtil.apiKitRoutePath;

public class FlowSpan implements Serializable {

private static final Logger LOGGER = LoggerFactory.getLogger(FlowSpan.class);

private final String flowName;
private String rootSpanName;
private final Span span;
Expand Down Expand Up @@ -94,8 +100,15 @@ private void extractAPIKitConfigName(TraceComponent traceComponent) {
}

public SpanMeta endProcessorSpan(String location, Consumer<Span> spanUpdater, Instant endTime) {
LOGGER.info("Ending Span at location {} for flow {} trace transaction {} context {}", location,
this.getRootSpanName(),
this.transactionId, this.getSpan().getSpanContext().toString());
if (childSpans.containsKey(location)) {
ProcessorSpan removed = childSpans.remove(location);
ProcessorSpan removed = Objects.requireNonNull(childSpans.remove(location),
"Missing child span at location " + location + " for flow " + getRootSpanName()
+ " trace transaction " + transactionId + " context "
+ getSpan().getSpanContext().toString());

removed.setEndTime(endTime);
if (spanUpdater != null)
spanUpdater.accept(removed.getSpan());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.avioconsulting.mule.opentelemetry.internal.util;

import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.event.EventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;

public class OpenTelemetryUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(OpenTelemetryUtil.class);

/**
* <pre>
* Extract any attributes defined via system properties (see {@link System#getProperties()}) for provided <code>configName</code>.
Expand Down Expand Up @@ -34,4 +41,18 @@ public static void addGlobalConfigSystemAttributes(String configName, Map<String
});
}

/**
* This method uses {@link EventContext#getId()} for extracting the unique id
* for current event processing.
*
* @param event
* {@link Event} to extract id from
* @return String id for the current event
*/
public static String getEventTransactionId(Event event) {
// For child contexts, the primary id is appended with "_{timeInMillis}".
// We remove time part to get a unique id across the event processing.
return event.getContext().getId().split("_")[0];
}

}
Loading

0 comments on commit cb1d4a1

Please sign in to comment.