Skip to content

Commit

Permalink
Allow setting per-request custom metrics in gRPC stats handler.
Browse files Browse the repository at this point in the history
Similar to #5876 but for gRPC.
  • Loading branch information
dhowden committed Sep 5, 2024
1 parent 96fdd2e commit 02f9e15
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
35 changes: 27 additions & 8 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

import (
"context"

"google.golang.org/grpc/stats"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -36,14 +38,15 @@ type Filter func(*stats.RPCTagInfo) bool

// config is a group of options for this instrumentation.
type config struct {
Filter Filter
InterceptorFilter InterceptorFilter
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider
SpanStartOptions []trace.SpanStartOption
SpanAttributes []attribute.KeyValue
MetricAttributes []attribute.KeyValue
Filter Filter
InterceptorFilter InterceptorFilter
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider
SpanStartOptions []trace.SpanStartOption
SpanAttributes []attribute.KeyValue
MetricAttributes []attribute.KeyValue
MetricAttributesFn func(ctx context.Context, payload any) []attribute.KeyValue

ReceivedEvent bool
SentEvent bool
Expand Down Expand Up @@ -285,3 +288,19 @@ func (o metricAttributesOption) apply(c *config) {
func WithMetricAttributes(a ...attribute.KeyValue) Option {
return metricAttributesOption{a: a}
}

type metricAttributesFnOption struct {
f func(ctx context.Context, payload any) []attribute.KeyValue
}

func (o metricAttributesFnOption) apply(c *config) {
if o.f != nil {
c.MetricAttributesFn = o.f
}
}

// WithMetricAttributesFn returns an Option to add custom attributes to the metrics
// based on the incoming request.
func WithMetricAttributesFn(f func(ctx context.Context, payload any) []attribute.KeyValue) Option {
return metricAttributesFnOption{f: f}
}
18 changes: 15 additions & 3 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,19 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
}

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
// Run this once on InPayload and record the attributes for the entire RPC.
if c.MetricAttributesFn != nil {
fnMetricAttrs := c.MetricAttributesFn(ctx, rs.Payload)

// Record them for the entire RPC.
gctx.metricAttrs = append(gctx.metricAttrs, fnMetricAttrs...)

// For this run we need to manually add them to the metricAttrs slice.
metricAttrs = append(metricAttrs, fnMetricAttrs...)
}

c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

Expand All @@ -164,6 +173,7 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
),
)
}

case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
Expand All @@ -180,11 +190,12 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
),
)
}
case *stats.OutTrailer:

case *stats.OutHeader:
if p, ok := peer.FromContext(ctx); ok {
span.SetAttributes(peerAttr(p.Addr.String())...)
}

case *stats.End:
var rpcStatusAttr attribute.KeyValue

Expand Down Expand Up @@ -216,6 +227,7 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...)
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...)
}

default:
return
}
Expand Down

0 comments on commit 02f9e15

Please sign in to comment.