diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java new file mode 100644 index 00000000000..abe70c8806a --- /dev/null +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java @@ -0,0 +1,408 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.opentelemetry; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientStreamTracer; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerStreamTracer; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * Provides factories for {@link io.grpc.StreamTracer} that records tracing to OpenTelemetry. + */ +final class OpenTelemetryTracingModule { + private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName()); + + @VisibleForTesting + static final String OTEL_TRACING_SCOPE_NAME = "grpc-opentelemetry-tracing"; + @Nullable + private static final AtomicIntegerFieldUpdater callEndedUpdater; + @Nullable + private static final AtomicIntegerFieldUpdater streamClosedUpdater; + + /* + * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK + * reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to + * (potentially racy) direct updates of the volatile variables. + */ + static { + AtomicIntegerFieldUpdater tmpCallEndedUpdater; + AtomicIntegerFieldUpdater tmpStreamClosedUpdater; + try { + tmpCallEndedUpdater = + AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded"); + tmpStreamClosedUpdater = + AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed"); + } catch (Throwable t) { + logger.log(Level.SEVERE, "Creating atomic field updaters failed", t); + tmpCallEndedUpdater = null; + tmpStreamClosedUpdater = null; + } + callEndedUpdater = tmpCallEndedUpdater; + streamClosedUpdater = tmpStreamClosedUpdater; + } + + private final Tracer otelTracer; + private final ContextPropagators contextPropagators; + private final MetadataGetter metadataGetter = MetadataGetter.getInstance(); + private final MetadataSetter metadataSetter = MetadataSetter.getInstance(); + private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor(); + private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); + + OpenTelemetryTracingModule(OpenTelemetry openTelemetry) { + this.otelTracer = checkNotNull(openTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME), "otelTracer"); + this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators"); + } + + /** + * Creates a {@link CallAttemptsTracerFactory} for a new call. + */ + @VisibleForTesting + CallAttemptsTracerFactory newClientCallTracer(Span clientSpan, MethodDescriptor method) { + return new CallAttemptsTracerFactory(clientSpan, method); + } + + /** + * Returns the server tracer factory. + */ + ServerStreamTracer.Factory getServerTracerFactory() { + return serverTracerFactory; + } + + /** + * Returns the client interceptor that facilitates otel tracing reporting. + */ + ClientInterceptor getClientInterceptor() { + return clientInterceptor; + } + + @VisibleForTesting + final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory { + volatile int callEnded; + private final Span clientSpan; + private final String fullMethodName; + + CallAttemptsTracerFactory(Span clientSpan, MethodDescriptor method) { + checkNotNull(method, "method"); + this.fullMethodName = checkNotNull(method.getFullMethodName(), "fullMethodName"); + this.clientSpan = checkNotNull(clientSpan, "clientSpan"); + } + + @Override + public ClientStreamTracer newClientStreamTracer( + ClientStreamTracer.StreamInfo info, Metadata headers) { + Span attemptSpan = otelTracer.spanBuilder( + "Attempt." + fullMethodName.replace('/', '.')) + .setParent(Context.current().with(clientSpan)) + .startSpan(); + attemptSpan.setAttribute( + "previous-rpc-attempts", info.getPreviousAttempts()); + attemptSpan.setAttribute( + "transparent-retry",info.isTransparentRetry()); + if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED) != null) { + clientSpan.addEvent("Delayed name resolution complete"); + } + return new ClientTracer(attemptSpan, clientSpan); + } + + /** + * Record a finished call and mark the current time as the end time. + * + *

Can be called from any thread without synchronization. Calling it the second time or more + * is a no-op. + */ + void callEnded(io.grpc.Status status) { + if (callEndedUpdater != null) { + if (callEndedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (callEnded != 0) { + return; + } + callEnded = 1; + } + endSpanWithStatus(clientSpan, status); + } + } + + private final class ClientTracer extends ClientStreamTracer { + private final Span span; + private final Span parentSpan; + volatile int seqNo; + boolean isPendingStream; + + ClientTracer(Span span, Span parentSpan) { + this.span = checkNotNull(span, "span"); + this.parentSpan = checkNotNull(parentSpan, "parent span"); + } + + @Override + public void streamCreated(Attributes transportAtts, Metadata headers) { + contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers, + metadataSetter); + if (isPendingStream) { + span.addEvent("Delayed LB pick complete"); + } + } + + @Override + public void createPendingStream() { + isPendingStream = true; + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + //TODO(yifeizhuang): needs support from message deframer. + if (optionalWireSize != optionalUncompressedSize) { + recordInboundCompressedMessage(span, seqNo, optionalWireSize); + } + } + + @Override + public void inboundMessage(int seqNo) { + this.seqNo = seqNo; + } + + @Override + public void inboundUncompressedSize(long bytes) { + recordInboundMessageSize(parentSpan, seqNo, bytes); + } + + @Override + public void streamClosed(io.grpc.Status status) { + endSpanWithStatus(span, status); + } + } + + private final class ServerTracer extends ServerStreamTracer { + private final Span span; + volatile int streamClosed; + private int seqNo; + + ServerTracer(String fullMethodName, @Nullable Span remoteSpan) { + checkNotNull(fullMethodName, "fullMethodName"); + this.span = + otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName)) + .setParent(remoteSpan == null ? null : Context.current().with(remoteSpan)) + .startSpan(); + } + + /** + * Record a finished stream and mark the current time as the end time. + * + *

Can be called from any thread without synchronization. Calling it the second time or more + * is a no-op. + */ + @Override + public void streamClosed(io.grpc.Status status) { + if (streamClosedUpdater != null) { + if (streamClosedUpdater.getAndSet(this, 1) != 0) { + return; + } + } else { + if (streamClosed != 0) { + return; + } + streamClosed = 1; + } + endSpanWithStatus(span, status); + } + + @Override + public void outboundMessageSent( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize); + } + + @Override + public void inboundMessageRead( + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + if (optionalWireSize != optionalUncompressedSize) { + recordInboundCompressedMessage(span, seqNo, optionalWireSize); + } + } + + @Override + public void inboundMessage(int seqNo) { + this.seqNo = seqNo; + } + + @Override + public void inboundUncompressedSize(long bytes) { + recordInboundMessageSize(span, seqNo, bytes); + } + } + + @VisibleForTesting + final class ServerTracerFactory extends ServerStreamTracer.Factory { + @SuppressWarnings("ReferenceEquality") + @Override + public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { + Context context = contextPropagators.getTextMapPropagator().extract( + Context.current(), headers, metadataGetter + ); + Span remoteSpan = Span.fromContext(context); + if (remoteSpan == Span.getInvalid()) { + remoteSpan = null; + } + return new ServerTracer(fullMethodName, remoteSpan); + } + } + + @VisibleForTesting + final class TracingClientInterceptor implements ClientInterceptor { + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + Span clientSpan = otelTracer.spanBuilder( + generateTraceSpanName(false, method.getFullMethodName())) + .startSpan(); + + final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method); + ClientCall call = + next.newCall( + method, + callOptions.withStreamTracerFactory(tracerFactory)); + return new SimpleForwardingClientCall(call) { + @Override + public void start(Listener responseListener, Metadata headers) { + delegate().start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onClose(io.grpc.Status status, Metadata trailers) { + tracerFactory.callEnded(status); + super.onClose(status, trailers); + } + }, + headers); + } + }; + } + } + + // Attribute named "message-size" always means the message size the application sees. + // If there was compression, additional event reports "message-size-compressed". + // + // An example trace with message compression: + // + // Sending: + // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854, + // 'message-size-compressed' = 5493) ----| + // + // Receiving: + // |-- Event 'Inbound compressed message', attributes('sequence-numer' = 0, + // 'message-size-compressed' = 5493 ) ----| + // |-- Event 'Inbound message received', attributes('sequence-numer' = 0, + // 'message-size' = 7854) ----| + // + // An example trace with no message compression: + // + // Sending: + // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854) ---| + // + // Receiving: + // |-- Event 'Inbound message received', attributes('sequence-numer' = 0, + // 'message-size' = 7854) ----| + private void recordOutboundMessageSentEvent(Span span, + int seqNo, long optionalWireSize, long optionalUncompressedSize) { + AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder(); + attributesBuilder.put("sequence-number", seqNo); + if (optionalUncompressedSize != -1) { + attributesBuilder.put("message-size", optionalUncompressedSize); + } + if (optionalWireSize != -1 && optionalWireSize != optionalUncompressedSize) { + attributesBuilder.put("message-size-compressed", optionalWireSize); + } + span.addEvent("Outbound message sent", attributesBuilder.build()); + } + + private void recordInboundCompressedMessage(Span span, int seqNo, long optionalWireSize) { + AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder(); + attributesBuilder.put("sequence-number", seqNo); + attributesBuilder.put("message-size-compressed", optionalWireSize); + span.addEvent("Inbound compressed message", attributesBuilder.build()); + } + + private void recordInboundMessageSize(Span span, int seqNo, long bytes) { + AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder(); + attributesBuilder.put("sequence-number", seqNo); + attributesBuilder.put("message-size", bytes); + span.addEvent("Inbound message received", attributesBuilder.build()); + } + + private String generateErrorStatusDescription(io.grpc.Status status) { + if (status.getDescription() != null) { + return status.getCode() + ". " + status.getDescription(); + } else { + return status.getCode().toString(); + } + } + + private void endSpanWithStatus(Span span, io.grpc.Status status) { + if (status.isOk()) { + span.setStatus(StatusCode.OK); + } else { + span.setStatus(StatusCode.ERROR, generateErrorStatusDescription(status)); + } + span.end(); + } + + /** + * Convert a full method name to a tracing span name. + * + * @param isServer {@code false} if the span is on the client-side, {@code true} if on the + * server-side + * @param fullMethodName the method name as returned by + * {@link MethodDescriptor#getFullMethodName}. + */ + @VisibleForTesting + static String generateTraceSpanName(boolean isServer, String fullMethodName) { + String prefix = isServer ? "Recv" : "Sent"; + return prefix + "." + fullMethodName.replace('/', '.'); + } +} diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java new file mode 100644 index 00000000000..248e1531990 --- /dev/null +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryTracingModuleTest.java @@ -0,0 +1,582 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.opentelemetry; + +import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; +import static io.grpc.opentelemetry.OpenTelemetryTracingModule.OTEL_TRACING_SCOPE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ClientStreamTracer; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerServiceDefinition; +import io.grpc.ServerStreamTracer; +import io.grpc.Status; +import io.grpc.opentelemetry.OpenTelemetryTracingModule.CallAttemptsTracerFactory; +import io.grpc.testing.GrpcServerRule; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class OpenTelemetryTracingModuleTest { + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + + private static final ClientStreamTracer.StreamInfo STREAM_INFO = + ClientStreamTracer.StreamInfo.newBuilder() + .setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build(); + private static final CallOptions.Key CUSTOM_OPTION = + CallOptions.Key.createWithDefault("option1", "default"); + private static final CallOptions CALL_OPTIONS = + CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue"); + + private static class StringInputStream extends InputStream { + final String string; + + StringInputStream(String string) { + this.string = string; + } + + @Override + public int read() { + // InProcessTransport doesn't actually read bytes from the InputStream. The InputStream is + // passed to the InProcess server and consumed by MARSHALLER.parse(). + throw new UnsupportedOperationException("Should not be called"); + } + } + + private static final MethodDescriptor.Marshaller MARSHALLER = + new MethodDescriptor.Marshaller() { + @Override + public InputStream stream(String value) { + return new StringInputStream(value); + } + + @Override + public String parse(InputStream stream) { + return ((StringInputStream) stream).string; + } + }; + + private final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNKNOWN) + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .setFullMethodName("package1.service2/method3") + .build(); + + @Rule + public final OpenTelemetryRule openTelemetryRule = OpenTelemetryRule.create(); + @Rule + public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor(); + private Tracer tracerRule; + @Mock + private Tracer mockTracer; + @Mock + TextMapPropagator mockPropagator; + @Mock + private Span mockClientSpan; + @Mock + private Span mockAttempSpan; + @Mock + private ServerCall.Listener mockServerCallListener; + @Mock + private ClientCall.Listener mockClientCallListener; + @Mock + private SpanBuilder mockSpanBuilder; + @Mock + private OpenTelemetry mockOpenTelemetry; + @Captor + private ArgumentCaptor eventNameCaptor; + @Captor + private ArgumentCaptor attributesCaptor; + @Captor + private ArgumentCaptor statusCaptor; + + @Before + public void setUp() { + tracerRule = openTelemetryRule.getOpenTelemetry().getTracer(OTEL_TRACING_SCOPE_NAME); + when(mockOpenTelemetry.getTracer(OTEL_TRACING_SCOPE_NAME)).thenReturn(mockTracer); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(mockPropagator)); + when(mockSpanBuilder.startSpan()).thenReturn(mockAttempSpan); + when(mockSpanBuilder.setParent(any())).thenReturn(mockSpanBuilder); + when(mockTracer.spanBuilder(any())).thenReturn(mockSpanBuilder); + } + + // Use mock instead of OpenTelemetryRule to verify inOrder and propagator. + @Test + public void clientBasicTracingMocking() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(mockClientSpan, method); + Metadata headers = new Metadata(); + ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); + clientStreamTracer.createPendingStream(); + clientStreamTracer.streamCreated(Attributes.EMPTY, headers); + + verify(mockTracer).spanBuilder(eq("Attempt.package1.service2.method3")); + verify(mockPropagator).inject(any(), eq(headers), eq(MetadataSetter.getInstance())); + verify(mockClientSpan, never()).end(); + verify(mockAttempSpan, never()).end(); + + clientStreamTracer.outboundMessage(0); + clientStreamTracer.outboundMessageSent(0, 882, -1); + clientStreamTracer.inboundMessage(0); + clientStreamTracer.outboundMessage(1); + clientStreamTracer.outboundMessageSent(1, -1, 27); + clientStreamTracer.inboundMessageRead(0, 255, 90); + + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + + InOrder inOrder = inOrder(mockClientSpan, mockAttempSpan); + inOrder.verify(mockAttempSpan) + .setAttribute("previous-rpc-attempts", 0); + inOrder.verify(mockAttempSpan) + .setAttribute("transparent-retry", false); + inOrder.verify(mockClientSpan).addEvent("Delayed name resolution complete"); + inOrder.verify(mockAttempSpan).addEvent("Delayed LB pick complete"); + inOrder.verify(mockAttempSpan, times(3)).addEvent( + eventNameCaptor.capture(), attributesCaptor.capture() + ); + List events = eventNameCaptor.getAllValues(); + List attributes = attributesCaptor.getAllValues(); + assertEquals( + "Outbound message sent" , + events.get(0)); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 882) + .build(), + attributes.get(0)); + + assertEquals( + "Outbound message sent" , + events.get(1)); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 27) + .build(), + attributes.get(1)); + + assertEquals( + "Inbound compressed message" , + events.get(2)); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 255) + .build(), + attributes.get(2)); + + inOrder.verify(mockAttempSpan).setStatus(StatusCode.OK); + inOrder.verify(mockAttempSpan).end(); + inOrder.verify(mockClientSpan).setStatus(StatusCode.OK); + inOrder.verify(mockClientSpan).end(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void clientBasicTracingRule() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Span clientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(clientSpan, method); + Metadata headers = new Metadata(); + ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); + clientStreamTracer.createPendingStream(); + clientStreamTracer.streamCreated(Attributes.EMPTY, headers); + clientStreamTracer.outboundMessage(0); + clientStreamTracer.outboundMessageSent(0, 882, -1); + clientStreamTracer.inboundMessage(0); + clientStreamTracer.outboundMessage(1); + clientStreamTracer.outboundMessageSent(1, -1, 27); + clientStreamTracer.inboundMessageRead(0, 255, -1); + clientStreamTracer.inboundUncompressedSize(288); + clientStreamTracer.inboundMessageRead(1, 128, 128); + clientStreamTracer.inboundMessage(1); + clientStreamTracer.inboundUncompressedSize(128); + + clientStreamTracer.streamClosed(Status.OK); + callTracer.callEnded(Status.OK); + + List spans = openTelemetryRule.getSpans(); + assertEquals(spans.size(), 2); + SpanData attemptSpanData = spans.get(0); + SpanData clientSpanData = spans.get(1); + assertEquals(attemptSpanData.getName(), "Attempt.package1.service2.method3"); + assertEquals(clientSpanData.getName(), "test-client-span"); + assertEquals(headers.keys(), ImmutableSet.of("traceparent")); + String spanContext = headers.get( + Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER)); + assertEquals(spanContext.substring(3, 3 + TraceId.getLength()), + spans.get(1).getSpanContext().getTraceId()); + + // parent(client) span data + List clientSpanEvents = clientSpanData.getEvents(); + assertEquals(clientSpanEvents.size(), 3); + assertEquals( + "Delayed name resolution complete", + clientSpanEvents.get(0).getName()); + assertTrue(clientSpanEvents.get(0).getAttributes().isEmpty()); + + assertEquals( + "Inbound message received" , + clientSpanEvents.get(1).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size", 288) + .build(), + clientSpanEvents.get(1).getAttributes()); + + assertEquals( + "Inbound message received" , + clientSpanEvents.get(2).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 128) + .build(), + clientSpanEvents.get(2).getAttributes()); + assertEquals(clientSpanData.hasEnded(), true); + + // child(attempt) span data + List attemptSpanEvents = attemptSpanData.getEvents(); + assertEquals(clientSpanEvents.size(), 3); + assertEquals( + "Delayed LB pick complete", + attemptSpanEvents.get(0).getName()); + assertTrue(clientSpanEvents.get(0).getAttributes().isEmpty()); + + assertEquals( + "Outbound message sent" , + attemptSpanEvents.get(1).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 882) + .build(), + attemptSpanEvents.get(1).getAttributes()); + + assertEquals( + "Outbound message sent" , + attemptSpanEvents.get(2).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 27) + .build(), + attemptSpanEvents.get(2).getAttributes()); + + assertEquals( + "Inbound compressed message" , + attemptSpanEvents.get(3).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 255) + .build(), + attemptSpanEvents.get(3).getAttributes()); + + assertEquals(attemptSpanData.hasEnded(), true); + } + + @Test + public void clientInterceptor() { + testClientInterceptors(false); + } + + @Test + public void clientInterceptorNonDefaultOtelContext() { + testClientInterceptors(true); + } + + private void testClientInterceptors(boolean nonDefaultOtelContext) { + final AtomicReference capturedMetadata = new AtomicReference<>(); + grpcServerRule.getServiceRegistry().addService( + ServerServiceDefinition.builder("package1.service2").addMethod( + method, new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + capturedMetadata.set(headers); + call.sendHeaders(new Metadata()); + call.sendMessage("Hello"); + call.close( + Status.PERMISSION_DENIED.withDescription("No you don't"), new Metadata()); + return mockServerCallListener; + } + }).build()); + + final AtomicReference capturedCallOptions = new AtomicReference<>(); + ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + capturedCallOptions.set(callOptions); + return next.newCall(method, callOptions); + } + }; + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + Channel interceptedChannel = + ClientInterceptors.intercept( + grpcServerRule.getChannel(), callOptionsCaptureInterceptor, + tracingModule.getClientInterceptor()); + Span parentSpan = tracerRule.spanBuilder("test-parent-span").startSpan(); + ClientCall call; + + if (nonDefaultOtelContext) { + try (Scope scope = io.opentelemetry.context.Context.current().with(parentSpan) + .makeCurrent()) { + call = interceptedChannel.newCall(method, CALL_OPTIONS); + } + } else { + call = interceptedChannel.newCall(method, CALL_OPTIONS); + } + assertEquals("customvalue", capturedCallOptions.get().getOption(CUSTOM_OPTION)); + assertEquals(1, capturedCallOptions.get().getStreamTracerFactories().size()); + assertTrue( + capturedCallOptions.get().getStreamTracerFactories().get(0) + instanceof CallAttemptsTracerFactory); + + // Make the call + Metadata headers = new Metadata(); + call.start(mockClientCallListener, headers); + + // End the call + call.halfClose(); + call.request(1); + parentSpan.end(); + + verify(mockClientCallListener).onClose(statusCaptor.capture(), any(Metadata.class)); + Status status = statusCaptor.getValue(); + assertEquals(Status.Code.PERMISSION_DENIED, status.getCode()); + assertEquals("No you don't", status.getDescription()); + + List spans = openTelemetryRule.getSpans(); + assertEquals(spans.size(), 3); + + SpanData clientSpan = spans.get(1); + SpanData attemptSpan = spans.get(0); + if (nonDefaultOtelContext) { + assertEquals(clientSpan.getParentSpanContext(), parentSpan.getSpanContext()); + } else { + assertEquals(clientSpan.getParentSpanContext(), + Span.fromContext(Context.root()).getSpanContext()); + } + String spanContext = capturedMetadata.get().get( + Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER)); + // W3C format: 00--- + assertEquals(spanContext.substring(3, 3 + TraceId.getLength()), + attemptSpan.getSpanContext().getTraceId()); + assertEquals(spanContext.substring(3 + TraceId.getLength() + 1, + 3 + TraceId.getLength() + 1 + SpanId.getLength()), + attemptSpan.getSpanContext().getSpanId()); + + assertEquals(attemptSpan.getParentSpanContext(), clientSpan.getSpanContext()); + assertTrue(clientSpan.hasEnded()); + assertEquals(clientSpan.getStatus().getStatusCode(), StatusCode.ERROR); + assertEquals(clientSpan.getStatus().getDescription(), "PERMISSION_DENIED. No you don't"); + assertTrue(attemptSpan.hasEnded()); + assertTrue(attemptSpan.hasEnded()); + assertEquals(attemptSpan.getStatus().getStatusCode(), StatusCode.ERROR); + assertEquals(attemptSpan.getStatus().getDescription(), "PERMISSION_DENIED. No you don't"); + } + + @Test + public void clientStreamNeverCreatedStillRecordTracing() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(mockClientSpan, method); + + callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); + verify(mockClientSpan).end(); + verify(mockClientSpan).setStatus(eq(StatusCode.ERROR), + eq("DEADLINE_EXCEEDED. 3 seconds")); + verifyNoMoreInteractions(mockClientSpan); + } + + @Test + public void serverBasicTracingNoHeaders() { + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule( + openTelemetryRule.getOpenTelemetry()); + ServerStreamTracer.Factory tracerFactory = tracingModule.getServerTracerFactory(); + ServerStreamTracer serverStreamTracer = + tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); + assertSame(Span.fromContext(Context.current()), Span.getInvalid()); + + serverStreamTracer.outboundMessage(0); + serverStreamTracer.outboundMessageSent(0, 882, 998); + serverStreamTracer.inboundMessage(0); + serverStreamTracer.outboundMessage(1); + serverStreamTracer.outboundMessageSent(1, -1, 27); + serverStreamTracer.inboundMessageRead(0, 90, -1); + serverStreamTracer.inboundUncompressedSize(255); + + serverStreamTracer.streamClosed(Status.CANCELLED); + + List spans = openTelemetryRule.getSpans(); + assertEquals(spans.size(), 1); + assertEquals(spans.get(0).getName(), "Recv.package1.service2.method3"); + assertEquals(spans.get(0).getParentSpanContext(), Span.getInvalid().getSpanContext()); + + List events = spans.get(0).getEvents(); + assertEquals(events.size(), 4); + assertEquals( + "Outbound message sent" , + events.get(0).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 882) + .put("message-size", 998) + .build(), + events.get(0).getAttributes()); + + assertEquals( + "Outbound message sent" , + events.get(1).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 1) + .put("message-size", 27) + .build(), + events.get(1).getAttributes()); + + assertEquals( + "Inbound compressed message" , + events.get(2).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size-compressed", 90) + .build(), + events.get(2).getAttributes()); + + assertEquals( + "Inbound message received" , + events.get(3).getName()); + assertEquals( + io.opentelemetry.api.common.Attributes.builder() + .put("sequence-number", 0) + .put("message-size", 255) + .build(), + events.get(3).getAttributes()); + + assertEquals(spans.get(0).hasEnded(), true); + } + + @Test + public void grpcTraceBinPropagator() { + when(mockOpenTelemetry.getPropagators()).thenReturn( + ContextPropagators.create(GrpcTraceBinContextPropagator.defaultInstance())); + ArgumentCaptor contextArgumentCaptor = ArgumentCaptor.forClass(Context.class); + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(mockOpenTelemetry); + Span testClientSpan = tracerRule.spanBuilder("test-client-span").startSpan(); + CallAttemptsTracerFactory callTracer = + tracingModule.newClientCallTracer(testClientSpan, method); + Span testAttemptSpan = tracerRule.spanBuilder("test-attempt-span").startSpan(); + when(mockSpanBuilder.startSpan()).thenReturn(testAttemptSpan); + + Metadata headers = new Metadata(); + ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers); + clientStreamTracer.streamCreated(Attributes.EMPTY, headers); + clientStreamTracer.streamClosed(Status.CANCELLED); + + Metadata.Key key = Metadata.Key.of( + GrpcTraceBinContextPropagator.GRPC_TRACE_BIN_HEADER, Metadata.BINARY_BYTE_MARSHALLER); + assertTrue(Arrays.equals(BinaryFormat.getInstance().toBytes(testAttemptSpan.getSpanContext()), + headers.get(key) + )); + verify(mockSpanBuilder).setParent(contextArgumentCaptor.capture()); + assertEquals(testClientSpan, Span.fromContext(contextArgumentCaptor.getValue())); + + Span serverSpan = tracerRule.spanBuilder("test-server-span").startSpan(); + when(mockSpanBuilder.startSpan()).thenReturn(serverSpan); + ServerStreamTracer.Factory tracerFactory = tracingModule.getServerTracerFactory(); + ServerStreamTracer serverStreamTracer = + tracerFactory.newServerStreamTracer(method.getFullMethodName(), headers); + serverStreamTracer.streamClosed(Status.CANCELLED); + + verify(mockSpanBuilder, times(2)) + .setParent(contextArgumentCaptor.capture()); + assertEquals(testAttemptSpan.getSpanContext(), + Span.fromContext(contextArgumentCaptor.getValue()).getSpanContext()); + } + + @Test + public void generateTraceSpanName() { + assertEquals( + "Sent.io.grpc.Foo", OpenTelemetryTracingModule.generateTraceSpanName( + false, "io.grpc/Foo")); + assertEquals( + "Recv.io.grpc.Bar", OpenTelemetryTracingModule.generateTraceSpanName( + true, "io.grpc/Bar")); + } +}