Skip to content

Commit aa6468c

Browse files
committed
Server Side CSM Observability Changes
1 parent 7202443 commit aa6468c

File tree

2 files changed

+128
-2
lines changed

2 files changed

+128
-2
lines changed

stats/opentelemetry/opentelemetry.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ func DialOption(o Options) grpc.DialOption {
157157
return joinDialOptions(grpc.WithChainUnaryInterceptor(csh.unaryInterceptor), grpc.WithChainStreamInterceptor(csh.streamInterceptor), grpc.WithStatsHandler(csh))
158158
}
159159

160+
var joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
161+
160162
// ServerOption returns a server option which enables OpenTelemetry
161163
// instrumentation code for a grpc.Server.
162164
//
@@ -172,7 +174,7 @@ func DialOption(o Options) grpc.DialOption {
172174
func ServerOption(o Options) grpc.ServerOption {
173175
ssh := &serverStatsHandler{o: o}
174176
ssh.initializeMetrics()
175-
return grpc.StatsHandler(ssh)
177+
return joinServerOptions(grpc.ChainUnaryInterceptor(ssh.unaryInterceptor), grpc.ChainStreamInterceptor(ssh.streamInterceptor), grpc.StatsHandler(ssh))
176178
}
177179

178180
// callInfo is information pertaining to the lifespan of the RPC client side.

stats/opentelemetry/server_metrics.go

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"google.golang.org/grpc"
2525
"google.golang.org/grpc/internal"
26+
"google.golang.org/grpc/metadata"
2627
"google.golang.org/grpc/stats"
2728
"google.golang.org/grpc/status"
2829

@@ -55,6 +56,118 @@ func (ssh *serverStatsHandler) initializeMetrics() {
5556
ssh.serverMetrics.callDuration = createFloat64Histogram(setOfMetrics, "grpc.server.call.duration", meter, metric.WithUnit("s"), metric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), metric.WithExplicitBucketBoundaries(DefaultLatencyBounds...))
5657
}
5758

59+
// attachLabelsTransport stream intercepts SetHeader and SendHeader calls of the
60+
// underlying ServerTransportStream to attach metadataExchangeLabels.
61+
type attachLabelsTransportStream struct {
62+
grpc.ServerTransportStream
63+
64+
attachedLabels atomic.Bool
65+
metadataExchangeLabels metadata.MD
66+
}
67+
68+
func (alts *attachLabelsTransportStream) SetHeader(md metadata.MD) error {
69+
if !alts.attachedLabels.Swap(true) {
70+
val := alts.metadataExchangeLabels.Get("x-envoy-peer-metadata")
71+
md.Append("x-envoy-peer-metadata", val...)
72+
}
73+
return alts.ServerTransportStream.SetHeader(md)
74+
}
75+
76+
func (alts *attachLabelsTransportStream) SendHeader(md metadata.MD) error {
77+
if !alts.attachedLabels.Swap(true) {
78+
val := alts.metadataExchangeLabels.Get("x-envoy-peer-metadata")
79+
md.Append("x-envoy-peer-metadata", val...)
80+
}
81+
82+
return alts.ServerTransportStream.SendHeader(md)
83+
}
84+
85+
func (ssh *serverStatsHandler) unaryInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
86+
var metadataExchangeLabels metadata.MD
87+
if ssh.o.MetricsOptions.pluginOption != nil {
88+
metadataExchangeLabels = ssh.o.MetricsOptions.pluginOption.GetMetadata()
89+
}
90+
91+
sts := grpc.ServerTransportStreamFromContext(ctx)
92+
93+
alts := &attachLabelsTransportStream{
94+
ServerTransportStream: sts,
95+
metadataExchangeLabels: metadataExchangeLabels,
96+
}
97+
ctx = grpc.NewContextWithServerTransportStream(ctx, alts)
98+
99+
any, err := handler(ctx, req)
100+
if err != nil { // error returned, so trailers only
101+
if !alts.attachedLabels.Swap(true) {
102+
alts.SetTrailer(alts.metadataExchangeLabels)
103+
}
104+
logger.Infof("RPC failed with error: %v", err)
105+
} else { // headers will be written; a message was sent
106+
if !alts.attachedLabels.Swap(true) {
107+
alts.SetHeader(alts.metadataExchangeLabels)
108+
}
109+
}
110+
111+
return any, err
112+
}
113+
114+
// attachLabelsStream embeds a grpc.ServerStream, and intercepts the
115+
// SetHeader/SendHeader/SendMsg/SendTrailer call to attach metadata exchange
116+
// labels.
117+
type attachLabelsStream struct {
118+
grpc.ServerStream
119+
120+
attachedLabels atomic.Bool
121+
metadataExchangeLabels metadata.MD
122+
}
123+
124+
func (als *attachLabelsStream) SetHeader(md metadata.MD) error {
125+
if !als.attachedLabels.Swap(true) {
126+
val := als.metadataExchangeLabels.Get("x-envoy-peer-metadata")
127+
md.Append("x-envoy-peer-metadata", val...)
128+
}
129+
130+
return als.ServerStream.SetHeader(md)
131+
}
132+
133+
func (als *attachLabelsStream) SendHeader(md metadata.MD) error {
134+
if !als.attachedLabels.Swap(true) {
135+
val := als.metadataExchangeLabels.Get("x-envoy-peer-metadata")
136+
md.Append("x-envoy-peer-metadata", val...)
137+
}
138+
139+
return als.ServerStream.SendHeader(md)
140+
}
141+
142+
func (als *attachLabelsStream) SendMsg(m any) error {
143+
if !als.attachedLabels.Swap(true) {
144+
als.ServerStream.SetHeader(als.metadataExchangeLabels)
145+
}
146+
return als.ServerStream.SendMsg(m)
147+
}
148+
149+
func (ssh *serverStatsHandler) streamInterceptor(srv any, ss grpc.ServerStream, ssi *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
150+
var metadataExchangeLabels metadata.MD
151+
if ssh.o.MetricsOptions.pluginOption != nil {
152+
metadataExchangeLabels = ssh.o.MetricsOptions.pluginOption.GetMetadata()
153+
}
154+
als := &attachLabelsStream{
155+
ServerStream: ss,
156+
metadataExchangeLabels: metadataExchangeLabels,
157+
}
158+
err := handler(srv, als)
159+
if err != nil {
160+
logger.Infof("RPC failed with error: %v", err)
161+
}
162+
163+
// Add metadata exchange labels to trailers if never sent in headers,
164+
// irrespective of whether or not RPC failed.
165+
if !als.attachedLabels.Load() {
166+
als.SetTrailer(als.metadataExchangeLabels)
167+
}
168+
return err
169+
}
170+
58171
// TagConn exists to satisfy stats.Handler.
59172
func (ssh *serverStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
60173
return ctx
@@ -105,6 +218,10 @@ func (ssh *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats)
105218
func (ssh *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, mi *metricsInfo) {
106219
switch st := s.(type) {
107220
case *stats.InHeader:
221+
if !mi.labelsReceived && ssh.o.MetricsOptions.pluginOption != nil {
222+
mi.labels = ssh.o.MetricsOptions.pluginOption.GetLabels(st.Header)
223+
mi.labelsReceived = true
224+
}
108225
ssh.serverMetrics.callStarted.Add(ctx, 1, metric.WithAttributes(attribute.String("grpc.method", mi.method)))
109226
case *stats.OutPayload:
110227
atomic.AddInt64(&mi.sentCompressedBytes, int64(st.CompressedLength))
@@ -123,8 +240,15 @@ func (ssh *serverStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInf
123240
s, _ := status.FromError(e.Error)
124241
st = canonicalString(s.Code())
125242
}
126-
serverAttributeOption := metric.WithAttributes(attribute.String("grpc.method", mi.method), attribute.String("grpc.status", st))
243+
attributes := []attribute.KeyValue{
244+
attribute.String("grpc.method", mi.method),
245+
attribute.String("grpc.status", st),
246+
}
247+
for k, v := range mi.labels {
248+
attributes = append(attributes, attribute.String(k, v))
249+
}
127250

251+
serverAttributeOption := metric.WithAttributes(attributes...)
128252
ssh.serverMetrics.callDuration.Record(ctx, latency, serverAttributeOption)
129253
ssh.serverMetrics.callSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), serverAttributeOption)
130254
ssh.serverMetrics.callRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), serverAttributeOption)

0 commit comments

Comments
 (0)