Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelgrpc: Add metrics support to NewServerHandler and NewClientHandler #4356

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add metrics support (No-op, OTLP and Prometheus) to `go.opentelemetry.io/contrib/exporters/autoexport`. (#4229, #4479)
- Add support for `console` span exporter and metrics exporter in `go.opentelemetry.io/contrib/exporters/autoexport`. (#4486)
- Set unit and description on all instruments in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#4500)
- Add metric support for `grpc.StatsHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356)

### Changed

Expand Down
48 changes: 44 additions & 4 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@
ReceivedEvent bool
SentEvent bool

meter metric.Meter
rpcServerDuration metric.Int64Histogram
tracer trace.Tracer
meter metric.Meter

rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}

// Option applies an option value for a config.
Expand All @@ -56,7 +62,7 @@
}

// newConfig returns a config configured with all the passed Options.
func newConfig(opts []Option) *config {
func newConfig(opts []Option, role string) *config {
c := &config{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
Expand All @@ -66,19 +72,53 @@
o.apply(c)
}

c.tracer = c.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

c.meter = c.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

var err error
c.rpcServerDuration, err = c.meter.Int64Histogram("rpc.server.duration",
c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 99 in instrumentation/google.golang.org/grpc/otelgrpc/config.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/config.go#L98-L99

Added lines #L98 - L99 were not covered by tests

c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 106 in instrumentation/google.golang.org/grpc/otelgrpc/config.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/config.go#L105-L106

Added lines #L105 - L106 were not covered by tests

c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 113 in instrumentation/google.golang.org/grpc/otelgrpc/config.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/config.go#L112-L113

Added lines #L112 - L113 were not covered by tests

c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 120 in instrumentation/google.golang.org/grpc/otelgrpc/config.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/config.go#L119-L120

Added lines #L119 - L120 were not covered by tests

return c
}

Expand Down
10 changes: 5 additions & 5 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -255,7 +255,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -325,7 +325,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -387,7 +387,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {

elapsedTime := time.Since(before).Milliseconds()
attr = append(attr, grpcStatusCodeAttr)
cfg.rpcServerDuration.Record(ctx, elapsedTime, metric.WithAttributes(attr...))
cfg.rpcDuration.Record(ctx, float64(elapsedTime), metric.WithAttributes(attr...))

return resp, err
}
Expand Down Expand Up @@ -446,7 +446,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *s
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
// requests.
// Deprecated: Unnecessary public func.
func Inject(ctx context.Context, md *metadata.MD, opts ...Option) {
c := newConfig(opts)
c := newConfig(opts, "")

Check warning on line 59 in instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go#L59

Added line #L59 was not covered by tests
c.Propagators.Inject(ctx, &metadataSupplier{
metadata: md,
})
Expand All @@ -78,7 +78,7 @@
// This function is meant to be used on incoming requests.
// Deprecated: Unnecessary public func.
func Extract(ctx context.Context, md *metadata.MD, opts ...Option) (baggage.Baggage, trace.SpanContext) {
c := newConfig(opts)
c := newConfig(opts, "")

Check warning on line 81 in instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go#L81

Added line #L81 was not covered by tests
ctx = c.Propagators.Extract(ctx, &metadataSupplier{
metadata: md,
})
Expand Down
116 changes: 80 additions & 36 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
import (
"context"
"sync/atomic"
"time"

grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -33,24 +36,32 @@
type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
}

type serverHandler struct {
*config
}

// NewServerHandler creates a stats.Handler for gRPC server.
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts),
config: newConfig(opts, "server"),
}

h.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)
return h
}

type serverHandler struct {
*config
tracer trace.Tracer
// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)
return ctx
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
}

// TagRPC can attach some information to the given context.
Expand All @@ -66,46 +77,30 @@
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)
return ctx
h.handleRPC(ctx, rs)
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
type clientHandler struct {
*config
}

// NewClientHandler creates a stats.Handler for gRPC client.
func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts),
config: newConfig(opts, "client"),
}

h.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

return h
}

type clientHandler struct {
*config
tracer trace.Tracer
}

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
Expand All @@ -117,14 +112,16 @@
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
}

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
h.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -140,17 +137,22 @@
// no-op
}

func handleRPC(ctx context.Context, rs stats.RPCStats) {
func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageId int64
metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
wctx := withoutCancel(ctx)

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeReceived,
Expand All @@ -162,6 +164,7 @@
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

span.AddEvent("message",
Expand All @@ -172,16 +175,57 @@
semconv.MessageUncompressedSizeKey.Int(rs.Length),
),
)
case *stats.OutTrailer:
case *stats.End:
var rpcStatusAttr attribute.KeyValue

if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))

Check warning on line 185 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L185

Added line #L185 was not covered by tests
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
}
span.SetAttributes(rpcStatusAttr)
span.End()

metricAttrs = append(metricAttrs, rpcStatusAttr)
pellared marked this conversation as resolved.
Show resolved Hide resolved
c.rpcDuration.Record(wctx, float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
c.rpcRequestsPerRPC.Record(wctx, gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(wctx, gctx.messagesSent, metric.WithAttributes(metricAttrs...))

default:
return
}
}

func withoutCancel(parent context.Context) context.Context {
if parent == nil {
panic("cannot create context from nil parent")

Check warning on line 204 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L204

Added line #L204 was not covered by tests
}
return withoutCancelCtx{parent}
}

type withoutCancelCtx struct {
c context.Context
}

func (withoutCancelCtx) Deadline() (deadline time.Time, ok bool) {
return

Check warning on line 214 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L213-L214

Added lines #L213 - L214 were not covered by tests
}

func (withoutCancelCtx) Done() <-chan struct{} {
return nil

Check warning on line 218 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L217-L218

Added lines #L217 - L218 were not covered by tests
}

func (withoutCancelCtx) Err() error {
return nil
}

func (w withoutCancelCtx) Value(key any) any {
return w.c.Value(key)

Check warning on line 226 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L225-L226

Added lines #L225 - L226 were not covered by tests
}

func (w withoutCancelCtx) String() string {
return "withoutCancel"

Check warning on line 230 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L229-L230

Added lines #L229 - L230 were not covered by tests
}
Loading