From c5ec7ae97b81d503598bba0bb7ba4aa4e79640c5 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Wed, 16 Nov 2022 13:28:06 +0800 Subject: [PATCH] [otelgrpc] refactor otelgrpc to use grpc.StatsHandler Signed-off-by: Ziqi Zhao --- CHANGELOG.md | 1 + .../grpc/otelgrpc/stats_handler.go | 188 ++++++ .../otelgrpc/test/grpc_stats_handler_test.go | 581 ++++++++++++++++++ .../grpc/otelgrpc/test/stats_handler_test.go | 131 ++++ 4 files changed, 901 insertions(+) create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3de2bfa817b..035f64cc725 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added - Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108) +- [otelgrpc] refactor otelgrpc to use grpc.StatsHandler. (#3002) ## [1.17.0-rc.1/0.42.0-rc.1/0.11.0-rc.1] - 2023-05-17 diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go new file mode 100644 index 00000000000..30f4c4926f3 --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -0,0 +1,188 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + +import ( + "context" + "sync/atomic" + + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.12.0" + "go.opentelemetry.io/otel/trace" +) + +type gRPCContext struct { + messagesReceived int64 + messagesSent int64 +} + +// NewServerHandler creates a stats.Handler for gRPC server. +func NewServerHandler(opts ...Option) stats.Handler { + h := &serverHandler{ + config: newConfig(opts), + } + + h.tracer = h.config.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + return h +} + +type serverHandler struct { + *config + tracer trace.Tracer +} + +// TagRPC can attach some information to the given context. +func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + ctx = extract(ctx, h.config.Propagators) + + attrs := []attribute.KeyValue{RPCSystemGRPC} + name, mAttrs := internal.ParseFullMethod(info.FullMethodName) + attrs = append(attrs, mAttrs...) + ctx, _ = h.tracer.Start( + trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attrs...), + ) + + gctx := gRPCContext{} + return context.WithValue(ctx, gRPCContext{}, &gctx) +} + +// HandleRPC processes the RPC stats. +func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + span := trace.SpanFromContext(ctx) + attrs := peerAttr(peerFromCtx(ctx)) + span.SetAttributes(attrs...) + return ctx +} + +// HandleConn processes the Conn stats. +func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { +} + +// NewClientHandler creates a stats.Handler for gRPC client. +func NewClientHandler(opts ...Option) stats.Handler { + h := &clientHandler{ + config: newConfig(opts), + } + + h.tracer = h.config.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + + return h +} + +type clientHandler struct { + *config + tracer trace.Tracer +} + +// TagRPC can attach some information to the given context. +func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + attrs := []attribute.KeyValue{RPCSystemGRPC} + name, mAttrs := internal.ParseFullMethod(info.FullMethodName) + attrs = append(attrs, mAttrs...) + ctx, _ = h.tracer.Start( + ctx, + name, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attrs...), + ) + + gctx := gRPCContext{} + + return inject(context.WithValue(ctx, gRPCContext{}, &gctx), h.config.Propagators) +} + +// HandleRPC processes the RPC stats. +func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { + span := trace.SpanFromContext(ctx) + attrs := peerAttr(cti.RemoteAddr.String()) + span.SetAttributes(attrs...) + return ctx +} + +// HandleConn processes the Conn stats. +func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { + // no-op +} + +func handleRPC(ctx context.Context, rs stats.RPCStats) { + span := trace.SpanFromContext(ctx) + gctx, _ := ctx.Value(gRPCContext{}).(*gRPCContext) + var messageId int64 + + switch rs := rs.(type) { + case *stats.Begin: + case *stats.InPayload: + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + } + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeReceived, + semconv.MessageIDKey.Int64(messageId), + semconv.MessageCompressedSizeKey.Int(rs.CompressedLength), + semconv.MessageUncompressedSizeKey.Int(rs.Length), + ), + ) + case *stats.OutPayload: + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesSent, 1) + } + + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeSent, + semconv.MessageIDKey.Int64(messageId), + semconv.MessageCompressedSizeKey.Int(rs.CompressedLength), + semconv.MessageUncompressedSizeKey.Int(rs.Length), + ), + ) + case *stats.End: + if rs.Error != nil { + s, _ := status.FromError(rs.Error) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(statusCodeAttr(s.Code())) + } else { + span.SetAttributes(statusCodeAttr(grpc_codes.OK)) + } + span.End() + default: + return + } +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go new file mode 100644 index 00000000000..b6afd1a118d --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -0,0 +1,581 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +func TestStatsHandler(t *testing.T) { + clientSR := tracetest.NewSpanRecorder() + clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR)) + + serverSR := tracetest.NewSpanRecorder() + serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR)) + + assert.NoError(t, doCalls( + []grpc.DialOption{ + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))), + }, + []grpc.ServerOption{ + grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))), + }, + )) + + t.Run("ClientSpans", func(t *testing.T) { + checkClientSpans(t, clientSR.Ended()) + }) + + t.Run("ServerSpans", func(t *testing.T) { + checkServerSpans(t, serverSR.Ended()) + }) +} + +func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { + require.Len(t, spans, 5) + + emptySpan := spans[0] + assert.False(t, emptySpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + }, emptySpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("EmptyCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, emptySpan.Attributes()) + + largeSpan := spans[1] + assert.False(t, largeSpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/UnaryCall", largeSpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(271840), + otelgrpc.RPCMessageUncompressedSizeKey.Int(271840), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(314167), + otelgrpc.RPCMessageUncompressedSizeKey.Int(314167), + }, + }, + }, largeSpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("UnaryCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, largeSpan.Attributes()) + + streamInput := spans[2] + assert.False(t, streamInput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27190), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27190), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(12), + otelgrpc.RPCMessageUncompressedSizeKey.Int(12), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1834), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1834), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45912), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45912), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(4), + otelgrpc.RPCMessageUncompressedSizeKey.Int(4), + }, + }, + // client does not record an event for the server response. + }, streamInput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingInputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamInput.Attributes()) + + streamOutput := spans[3] + assert.False(t, streamOutput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(21), + otelgrpc.RPCMessageUncompressedSizeKey.Int(21), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, streamOutput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingOutputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamOutput.Attributes()) + + pingPong := spans[4] + assert.False(t, pingPong.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27196), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27196), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(16), + otelgrpc.RPCMessageUncompressedSizeKey.Int(16), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1839), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1839), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45918), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45918), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, pingPong.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("FullDuplexCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, pingPong.Attributes()) +} + +func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { + require.Len(t, spans, 5) + + emptySpan := spans[0] + assert.False(t, emptySpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + }, emptySpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("EmptyCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, emptySpan.Attributes()) + + largeSpan := spans[1] + assert.False(t, largeSpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/UnaryCall", largeSpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageCompressedSizeKey.Int(271840), + otelgrpc.RPCMessageUncompressedSizeKey.Int(271840), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageCompressedSizeKey.Int(314167), + otelgrpc.RPCMessageUncompressedSizeKey.Int(314167), + }, + }, + }, largeSpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("UnaryCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, largeSpan.Attributes()) + + streamInput := spans[2] + assert.False(t, streamInput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27190), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27190), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(12), + otelgrpc.RPCMessageUncompressedSizeKey.Int(12), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1834), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1834), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45912), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45912), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(4), + otelgrpc.RPCMessageUncompressedSizeKey.Int(4), + }, + }, + // client does not record an event for the server response. + }, streamInput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingInputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamInput.Attributes()) + + streamOutput := spans[3] + assert.False(t, streamOutput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(21), + otelgrpc.RPCMessageUncompressedSizeKey.Int(21), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, streamOutput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingOutputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamOutput.Attributes()) + + pingPong := spans[4] + assert.False(t, pingPong.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27196), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27196), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(16), + otelgrpc.RPCMessageUncompressedSizeKey.Int(16), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1839), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1839), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45918), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45918), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, pingPong.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("FullDuplexCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, pingPong.Attributes()) +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go new file mode 100644 index 00000000000..16c6dbd7906 --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/stats" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +func TestClientStatsHandler(t *testing.T) { + type rpc struct { + tagInfo *stats.RPCTagInfo + connInfo *stats.ConnTagInfo + inPayloads []*stats.InPayload + outPayloads []*stats.OutPayload + end *stats.End + } + + type span struct { + name string + code codes.Code + attrs []attribute.KeyValue + eventsAttr []map[attribute.Key]attribute.Value + } + + sr := tracetest.NewSpanRecorder() + tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr)) + remoteAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8888") + + tcs := []struct { + rpcs []*rpc + span *span + }{ + { + rpcs: []*rpc{ + { + tagInfo: &stats.RPCTagInfo{ + FullMethodName: "/github.com.serviceName/bar", + }, + connInfo: &stats.ConnTagInfo{ + RemoteAddr: remoteAddr, + }, + inPayloads: []*stats.InPayload{ + {Length: 10}, + }, + outPayloads: []*stats.OutPayload{ + + {Length: 10}, + }, + end: &stats.End{Error: nil}, + }, + }, + span: &span{ + name: "github.com.serviceName/bar", + attrs: []attribute.KeyValue{ + semconv.RPCSystemKey.String("grpc"), + semconv.RPCServiceKey.String("github.com.serviceName"), + semconv.RPCMethodKey.String("bar"), + semconv.NetSockPeerAddrKey.String("127.0.0.1"), + semconv.NetSockPeerPortKey.Int(8888), + otelgrpc.GRPCStatusCodeKey.Int64(0), + }, + eventsAttr: []map[attribute.Key]attribute.Value{ + { + otelgrpc.RPCMessageTypeKey: attribute.StringValue("SENT"), + otelgrpc.RPCMessageIDKey: attribute.IntValue(1), + otelgrpc.RPCMessageCompressedSizeKey: attribute.IntValue(0), + otelgrpc.RPCMessageUncompressedSizeKey: attribute.IntValue(10), + }, + { + otelgrpc.RPCMessageTypeKey: attribute.StringValue("RECEIVED"), + otelgrpc.RPCMessageIDKey: attribute.IntValue(1), + otelgrpc.RPCMessageCompressedSizeKey: attribute.IntValue(0), + otelgrpc.RPCMessageUncompressedSizeKey: attribute.IntValue(10), + }, + }, + }, + }, + } + + for _, tc := range tcs { + h := otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tp)) + + for _, rpc := range tc.rpcs { + ctx := h.TagRPC(context.Background(), rpc.tagInfo) + ctx = h.TagConn(ctx, rpc.connInfo) + + for _, out := range rpc.outPayloads { + out.Client = true + h.HandleRPC(ctx, out) + } + for _, in := range rpc.inPayloads { + in.Client = true + h.HandleRPC(ctx, in) + } + rpc.end.Client = true + h.HandleRPC(ctx, rpc.end) + } + + span, ok := getSpanFromRecorder(sr, tc.span.name) + if !assert.True(t, ok, "missing span %q", tc.span.name) { + continue + } + assert.Equal(t, tc.span.code, span.Status().Code) + assert.ElementsMatch(t, tc.span.attrs, span.Attributes()) + assert.Equal(t, tc.span.eventsAttr, eventAttrMap(span.Events())) + } +}