Skip to content

Commit 52ee505

Browse files
committed
Responded to Doug's comments
1 parent 7202443 commit 52ee505

File tree

2 files changed

+44
-38
lines changed

2 files changed

+44
-38
lines changed

stats/opentelemetry/client_metrics.go

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import (
2727
"google.golang.org/grpc/stats"
2828
"google.golang.org/grpc/status"
2929

30-
"go.opentelemetry.io/otel/attribute"
31-
"go.opentelemetry.io/otel/metric"
30+
otelattribute "go.opentelemetry.io/otel/attribute"
31+
otelmetric "go.opentelemetry.io/otel/metric"
3232
)
3333

3434
type clientStatsHandler struct {
@@ -51,11 +51,11 @@ func (csh *clientStatsHandler) initializeMetrics() {
5151

5252
setOfMetrics := csh.o.MetricsOptions.Metrics.metrics
5353

54-
csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, metric.WithUnit("attempt"), metric.WithDescription("Number of client call attempts started."))
55-
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a client call attempt."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
56-
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes sent per client call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
57-
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, metric.WithUnit("By"), metric.WithDescription("Compressed message bytes received per call attempt."), metric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
58-
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
54+
csh.clientMetrics.attemptStarted = createInt64Counter(setOfMetrics, "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started."))
55+
csh.clientMetrics.attemptDuration = createFloat64Histogram(setOfMetrics, "grpc.client.attempt.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
56+
csh.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
57+
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(setOfMetrics, "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...))
58+
csh.clientMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
5959
}
6060

6161
func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
@@ -67,9 +67,10 @@ func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method stri
6767

6868
if csh.o.MetricsOptions.pluginOption != nil {
6969
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
70-
val := md.Get("x-envoy-peer-metadata")
71-
if len(val) == 1 {
72-
ctx = metadata.AppendToOutgoingContext(ctx, metadataExchangeKey, val[0])
70+
for k, v := range md {
71+
if len(v) == 1 {
72+
ctx = metadata.AppendToOutgoingContext(ctx, k, v[0])
73+
}
7374
}
7475
}
7576

@@ -108,6 +109,16 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
108109
method: csh.determineMethod(method, opts...),
109110
}
110111
ctx = setCallInfo(ctx, ci)
112+
113+
if csh.o.MetricsOptions.pluginOption != nil {
114+
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
115+
for k, v := range md {
116+
if len(v) == 1 {
117+
ctx = metadata.AppendToOutgoingContext(ctx, k, v[0])
118+
}
119+
}
120+
}
121+
111122
startTime := time.Now()
112123

113124
callback := func(err error) {
@@ -120,7 +131,7 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
120131
func (csh *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) {
121132
s := status.Convert(err)
122133
callLatency := float64(time.Since(startTime)) / float64(time.Second)
123-
csh.clientMetrics.callDuration.Record(ctx, callLatency, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", canonicalString(s.Code()))))
134+
csh.clientMetrics.callDuration.Record(ctx, callLatency, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.status", canonicalString(s.Code()))))
124135
}
125136

126137
// TagConn exists to satisfy stats.Handler.
@@ -141,12 +152,12 @@ func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf
141152
if labels = istats.GetLabels(ctx); labels == nil {
142153
labels = &istats.Labels{
143154
TelemetryLabels: make(map[string]string),
144-
} // Create optional labels map only if first stats handler in possible chain for a channel.
155+
}
156+
ctx = istats.SetLabels(ctx, labels)
145157
}
146-
ctx = istats.SetLabels(ctx, labels)
147158
mi := &metricsInfo{ // populates information about RPC start.
148159
startTime: time.Now(),
149-
xDSLabels: labels.TelemetryLabels,
160+
xdsLabels: labels.TelemetryLabels,
150161
method: info.FullMethodName,
151162
}
152163
ri := &rpcInfo{
@@ -173,25 +184,24 @@ func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCS
173184
return
174185
}
175186

176-
csh.clientMetrics.attemptStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target)))
187+
csh.clientMetrics.attemptStarted.Add(ctx, 1, otelmetric.WithAttributes(otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.target", ci.target)))
177188
case *stats.OutPayload:
178189
atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
179190
case *stats.InPayload:
180191
atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
181192
case *stats.InHeader:
182-
csh.getLabelsFromPluginOption(mi, st.Header)
193+
csh.setLabelsFromPluginOption(mi, st.Header)
183194
case *stats.InTrailer:
184-
csh.getLabelsFromPluginOption(mi, st.Trailer)
195+
csh.setLabelsFromPluginOption(mi, st.Trailer)
185196
case *stats.End:
186197
csh.processRPCEnd(ctx, mi, st)
187198
default:
188199
}
189200
}
190201

191-
func (csh *clientStatsHandler) getLabelsFromPluginOption(mi *metricsInfo, incomingMetadata metadata.MD) {
192-
if !mi.labelsReceived && csh.o.MetricsOptions.pluginOption != nil {
193-
mi.labels = csh.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
194-
mi.labelsReceived = true
202+
func (csh *clientStatsHandler) setLabelsFromPluginOption(mi *metricsInfo, incomingMetadata metadata.MD) {
203+
if mi.pluginOptionLabels != nil && csh.o.MetricsOptions.pluginOption != nil {
204+
mi.pluginOptionLabels = csh.o.MetricsOptions.pluginOption.GetLabels(incomingMetadata)
195205
}
196206
}
197207

@@ -208,23 +218,23 @@ func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInf
208218
st = canonicalString(s.Code())
209219
}
210220

211-
attributes := []attribute.KeyValue{
212-
attribute.String("grpc.method", ci.method),
213-
attribute.String("grpc.target", ci.target),
214-
attribute.String("grpc.status", st),
221+
attributes := []otelattribute.KeyValue{
222+
otelattribute.String("grpc.method", ci.method),
223+
otelattribute.String("grpc.target", ci.target),
224+
otelattribute.String("grpc.status", st),
215225
}
216226

217-
for k, v := range mi.labels {
218-
attributes = append(attributes, attribute.String(k, v))
227+
for k, v := range mi.pluginOptionLabels {
228+
attributes = append(attributes, otelattribute.String(k, v))
219229
}
220230

221231
for _, o := range csh.o.MetricsOptions.OptionalLabels {
222-
if val, ok := mi.xDSLabels[o]; ok {
223-
attributes = append(attributes, attribute.String(o, val))
232+
if val, ok := mi.xdsLabels[o]; ok {
233+
attributes = append(attributes, otelattribute.String(o, val))
224234
}
225235
}
226236

227-
clientAttributeOption := metric.WithAttributes(attributes...)
237+
clientAttributeOption := otelmetric.WithAttributes(attributes...)
228238
csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
229239
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption)
230240
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption)

stats/opentelemetry/opentelemetry.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ import (
3333
"go.opentelemetry.io/otel/metric/noop"
3434
)
3535

36-
// metadataExchangeKey is the key for HTTP metadata exchange.
37-
const metadataExchangeKey = "x-envoy-peer-metadata"
38-
3936
var logger = grpclog.Component("otel-plugin")
4037

4138
var canonicalString = internal.CanonicalString.(func(codes.Code) string)
@@ -131,8 +128,8 @@ type MetricsOptions struct {
131128
// This only applies for server side metrics.
132129
MethodAttributeFilter func(string) bool
133130

134-
// OptionalLabels are labels received from xDS that this component should
135-
// add to metrics that record after receiving incoming metadata.
131+
// OptionalLabels are labels received from LB Policies that this component
132+
// should add to metrics that record after receiving incoming metadata.
136133
OptionalLabels []string
137134

138135
// pluginOption is used to get labels to attach to certain metrics, if set.
@@ -232,9 +229,8 @@ type metricsInfo struct {
232229
startTime time.Time
233230
method string
234231

235-
labelsReceived bool
236-
labels map[string]string // labels to attach to metrics emitted
237-
xDSLabels map[string]string
232+
pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted
233+
xdsLabels map[string]string
238234
}
239235

240236
type clientMetrics struct {

0 commit comments

Comments
 (0)