Skip to content

Commit

Permalink
Split out MessagingAttributesGetter (#5626)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored Mar 18, 2022
1 parent a1e45a5 commit f0bdce9
Show file tree
Hide file tree
Showing 34 changed files with 738 additions and 777 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.internal.SpanKey;
import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider;
Expand Down Expand Up @@ -188,9 +188,8 @@ public InstrumenterBuilder<REQUEST, RESPONSE> setDisabled(boolean disabled) {
* <li>SERVER nested spans are always suppressed. If a SERVER span is present in the parent
* context object, new SERVER span will not be started.
* <li>Messaging (PRODUCER and CONSUMER) nested spans are suppressed depending on their
* {@linkplain MessagingAttributesExtractor#operation() operation}. If a span with the same
* operation is present in the parent context object, new span with the same operation will
* not be started.
* {@linkplain MessageOperation operation}. If a span with the same operation is present in
* the parent context object, new span with the same operation will not be started.
* <li>INTERNAL spans are never suppressed.
* </ul>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ public enum MessageOperation {
* Returns the operation name as defined in <a
* href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#operation-names">the
* specification</a>.
*
* @deprecated This method is going to be made non-public in the next release.
*/
@Deprecated
public String operationName() {
return name().toLowerCase(Locale.ROOT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.instrumentation.api.instrumenter.messaging;

import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.PROCESS;
import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.annotations.UnstableApi;
Expand All @@ -17,97 +20,80 @@
/**
* Extractor of <a
* href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md">messaging
* attributes</a>. Instrumentation of messaging frameworks/libraries should extend this class,
* defining {@link REQUEST} and {@link RESPONSE} with the actual request / response types of the
* instrumented library. If an attribute is not available in this library, it is appropriate to
* return {@code null} from the protected attribute methods, but implement as many as possible for
* best compliance with the OpenTelemetry specification.
* attributes</a>.
*
* <p>This class delegates to a type-specific {@link MessagingAttributesGetter} for individual
* attribute extraction from request/response objects.
*/
public abstract class MessagingAttributesExtractor<REQUEST, RESPONSE>
public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
implements AttributesExtractor<REQUEST, RESPONSE>, SpanKeyProvider {
public static final String TEMP_DESTINATION_NAME = "(temporary)";

static final String TEMP_DESTINATION_NAME = "(temporary)";

/**
* Creates the messaging attributes extractor for the given {@link MessageOperation operation}.
*/
public static <REQUEST, RESPONSE> MessagingAttributesExtractor<REQUEST, RESPONSE> create(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return new MessagingAttributesExtractor<>(getter, operation);
}

private final MessagingAttributesGetter<REQUEST, RESPONSE> getter;
private final MessageOperation operation;

private MessagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
this.getter = getter;
this.operation = operation;
}

@SuppressWarnings("deprecation") // operationName
@Override
public final void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
set(attributes, SemanticAttributes.MESSAGING_SYSTEM, system(request));
set(attributes, SemanticAttributes.MESSAGING_DESTINATION_KIND, destinationKind(request));
boolean isTemporaryDestination = temporaryDestination(request);
public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
set(attributes, SemanticAttributes.MESSAGING_SYSTEM, getter.system(request));
set(attributes, SemanticAttributes.MESSAGING_DESTINATION_KIND, getter.destinationKind(request));
boolean isTemporaryDestination = getter.temporaryDestination(request);
if (isTemporaryDestination) {
set(attributes, SemanticAttributes.MESSAGING_TEMP_DESTINATION, true);
set(attributes, SemanticAttributes.MESSAGING_DESTINATION, TEMP_DESTINATION_NAME);
} else {
set(attributes, SemanticAttributes.MESSAGING_DESTINATION, destination(request));
set(attributes, SemanticAttributes.MESSAGING_DESTINATION, getter.destination(request));
}
set(attributes, SemanticAttributes.MESSAGING_PROTOCOL, protocol(request));
set(attributes, SemanticAttributes.MESSAGING_PROTOCOL_VERSION, protocolVersion(request));
set(attributes, SemanticAttributes.MESSAGING_URL, url(request));
set(attributes, SemanticAttributes.MESSAGING_CONVERSATION_ID, conversationId(request));
set(attributes, SemanticAttributes.MESSAGING_PROTOCOL, getter.protocol(request));
set(attributes, SemanticAttributes.MESSAGING_PROTOCOL_VERSION, getter.protocolVersion(request));
set(attributes, SemanticAttributes.MESSAGING_URL, getter.url(request));
set(attributes, SemanticAttributes.MESSAGING_CONVERSATION_ID, getter.conversationId(request));
set(
attributes,
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
messagePayloadSize(request));
getter.messagePayloadSize(request));
set(
attributes,
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_COMPRESSED_SIZE_BYTES,
messagePayloadCompressedSize(request));
MessageOperation operation = operation();
if (operation == MessageOperation.RECEIVE || operation == MessageOperation.PROCESS) {
getter.messagePayloadCompressedSize(request));
if (operation == RECEIVE || operation == PROCESS) {
set(attributes, SemanticAttributes.MESSAGING_OPERATION, operation.operationName());
}
}

@Override
public final void onEnd(
public void onEnd(
AttributesBuilder attributes,
Context context,
REQUEST request,
@Nullable RESPONSE response,
@Nullable Throwable error) {
set(attributes, SemanticAttributes.MESSAGING_MESSAGE_ID, messageId(request, response));
set(attributes, SemanticAttributes.MESSAGING_MESSAGE_ID, getter.messageId(request, response));
}

public abstract MessageOperation operation();

@Nullable
protected abstract String system(REQUEST request);

@Nullable
protected abstract String destinationKind(REQUEST request);

@Nullable
protected abstract String destination(REQUEST request);

protected abstract boolean temporaryDestination(REQUEST request);

@Nullable
protected abstract String protocol(REQUEST request);

@Nullable
protected abstract String protocolVersion(REQUEST request);

@Nullable
protected abstract String url(REQUEST request);

@Nullable
protected abstract String conversationId(REQUEST request);

@Nullable
protected abstract Long messagePayloadSize(REQUEST request);

@Nullable
protected abstract Long messagePayloadCompressedSize(REQUEST request);

@Nullable
protected abstract String messageId(REQUEST request, @Nullable RESPONSE response);

/**
* This method is internal and is hence not for public use. Its API is unstable and can change at
* any time.
*/
@UnstableApi
@Override
public SpanKey internalGetSpanKey() {
switch (operation()) {
switch (operation) {
case SEND:
return SpanKey.PRODUCER;
case RECEIVE:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.instrumenter.messaging;

import javax.annotation.Nullable;

/**
* An interface for getting messaging attributes.
*
* <p>Instrumentation authors will create implementations of this interface for their specific
* library/framework. It will be used by the {@link MessagingAttributesExtractor} to obtain the
* various messaging attributes in a type-generic way.
*/
public interface MessagingAttributesGetter<REQUEST, RESPONSE> {

@Nullable
String system(REQUEST request);

@Nullable
String destinationKind(REQUEST request);

@Nullable
String destination(REQUEST request);

boolean temporaryDestination(REQUEST request);

@Nullable
String protocol(REQUEST request);

@Nullable
String protocolVersion(REQUEST request);

@Nullable
String url(REQUEST request);

@Nullable
String conversationId(REQUEST request);

@Nullable
Long messagePayloadSize(REQUEST request);

@Nullable
Long messagePayloadCompressedSize(REQUEST request);

@Nullable
String messageId(REQUEST request, @Nullable RESPONSE response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,34 @@ public final class MessagingSpanNameExtractor<REQUEST> implements SpanNameExtrac
* href="https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#span-name">
* messaging semantic conventions</a>: {@code <destination name> <operation name>}.
*
* @see MessagingAttributesExtractor#destination(Object) used to extract {@code <destination
* name>}.
* @see MessagingAttributesExtractor#operation() used to extract {@code <operation name>}.
* @see MessagingAttributesGetter#destination(Object) used to extract {@code <destination name>}.
* @see MessageOperation used to extract {@code <operation name>}.
*/
public static <REQUEST> SpanNameExtractor<REQUEST> create(
MessagingAttributesExtractor<REQUEST, ?> attributesExtractor) {
return new MessagingSpanNameExtractor<>(attributesExtractor);
MessagingAttributesGetter<REQUEST, ?> getter, MessageOperation operation) {
return new MessagingSpanNameExtractor<>(getter, operation);
}

private final MessagingAttributesExtractor<REQUEST, ?> attributesExtractor;
private final MessagingAttributesGetter<REQUEST, ?> getter;
private final MessageOperation operation;

private MessagingSpanNameExtractor(MessagingAttributesExtractor<REQUEST, ?> attributesExtractor) {
this.attributesExtractor = attributesExtractor;
private MessagingSpanNameExtractor(
MessagingAttributesGetter<REQUEST, ?> getter, MessageOperation operation) {
this.getter = getter;
this.operation = operation;
}

@SuppressWarnings("deprecation") // operationName
@Override
public String extract(REQUEST request) {
String destinationName =
attributesExtractor.temporaryDestination(request)
getter.temporaryDestination(request)
? MessagingAttributesExtractor.TEMP_DESTINATION_NAME
: attributesExtractor.destination(request);
: getter.destination(request);
if (destinationName == null) {
destinationName = "unknown";
}

MessageOperation operation = attributesExtractor.operation();
return destinationName + " " + operation.operationName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetServerAttributesGetter;
Expand Down Expand Up @@ -657,8 +656,7 @@ void instrumentationTypeDetected_rpc() {

@Test
void instrumentationTypeDetected_producer() {
when(mockMessagingAttributes.operation()).thenReturn(MessageOperation.SEND);
when(mockMessagingAttributes.internalGetSpanKey()).thenCallRealMethod();
when(mockMessagingAttributes.internalGetSpanKey()).thenReturn(SpanKey.PRODUCER);

Instrumenter<Map<String, String>, Map<String, String>> instrumenter =
getInstrumenterWithType(true, mockMessagingAttributes);
Expand All @@ -671,8 +669,7 @@ void instrumentationTypeDetected_producer() {

@Test
void instrumentationTypeDetected_mix() {
when(mockMessagingAttributes.operation()).thenReturn(MessageOperation.SEND);
when(mockMessagingAttributes.internalGetSpanKey()).thenCallRealMethod();
when(mockMessagingAttributes.internalGetSpanKey()).thenReturn(SpanKey.PRODUCER);

Instrumenter<Map<String, String>, Map<String, String>> instrumenter =
getInstrumenterWithType(
Expand Down
Loading

0 comments on commit f0bdce9

Please sign in to comment.