Skip to content

Commit 9634185

Browse files
committed
Introduce grpc interceptors for basic metrics
These will surface the following: - H: grpc_server_unary_request_duraion - C: grpc_server_unary_requests_received - C: grpc_server_unary_requests_completed - H: grpc_server_stream_request_duration - C: grpc_server_stream_requests_received - C: grpc_server_stream_requests_completed - C: grpc_server_stream_messages_sent - C: grpc_server_stream_messages_received All metrics include service and method labels/dimensions and metrics recorded at the end of a request/stream include the status code at completion. FAB-12710 #done Change-Id: I2bd8bbbd9f50d50340659cded25e89d85890b5e7 Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
1 parent 6054cdd commit 9634185

File tree

8 files changed

+950
-0
lines changed

8 files changed

+950
-0
lines changed

common/grpcmetrics/fakes/echo_service.go

Lines changed: 165 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package grpcmetrics_test
8+
9+
import (
10+
"testing"
11+
12+
"github.com/hyperledger/fabric/common/grpcmetrics/testpb"
13+
. "github.com/onsi/ginkgo"
14+
. "github.com/onsi/gomega"
15+
)
16+
17+
//go:generate protoc --go_out=plugins=grpc:. testpb/echo.proto
18+
19+
func TestGrpcmetrics(t *testing.T) {
20+
RegisterFailHandler(Fail)
21+
RunSpecs(t, "Grpcmetrics Suite")
22+
}
23+
24+
//go:generate counterfeiter -o fakes/echo_service.go --fake-name EchoServiceServer . echoServiceServer
25+
26+
type echoServiceServer interface {
27+
testpb.EchoServiceServer
28+
}

common/grpcmetrics/interceptor.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package grpcmetrics
8+
9+
import (
10+
"context"
11+
"strings"
12+
"time"
13+
14+
"github.com/hyperledger/fabric/common/metrics"
15+
"google.golang.org/grpc"
16+
)
17+
18+
type UnaryMetrics struct {
19+
RequestDuration metrics.Histogram
20+
RequestsReceived metrics.Counter
21+
RequestsCompleted metrics.Counter
22+
}
23+
24+
func UnaryServerInterceptor(um *UnaryMetrics) grpc.UnaryServerInterceptor {
25+
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
26+
service, method := serviceMethod(info.FullMethod)
27+
um.RequestsReceived.With("service", service, "method", method).Add(1)
28+
29+
startTime := time.Now()
30+
resp, err := handler(ctx, req)
31+
duration := time.Since(startTime)
32+
33+
um.RequestDuration.With(
34+
"service", service, "method", method, "code", grpc.Code(err).String(),
35+
).Observe(float64(duration) / float64(time.Second))
36+
um.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1)
37+
38+
return resp, err
39+
}
40+
}
41+
42+
type StreamMetrics struct {
43+
RequestDuration metrics.Histogram
44+
RequestsReceived metrics.Counter
45+
RequestsCompleted metrics.Counter
46+
MessagesSent metrics.Counter
47+
MessagesReceived metrics.Counter
48+
}
49+
50+
func StreamServerInterceptor(sm *StreamMetrics) grpc.StreamServerInterceptor {
51+
return func(svc interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
52+
sm := sm
53+
service, method := serviceMethod(info.FullMethod)
54+
sm.RequestsReceived.With("service", service, "method", method).Add(1)
55+
56+
wrappedStream := &serverStream{
57+
ServerStream: stream,
58+
messagesSent: sm.MessagesSent.With("service", service, "method", method),
59+
messagesReceived: sm.MessagesReceived.With("service", service, "method", method),
60+
}
61+
62+
startTime := time.Now()
63+
err := handler(svc, wrappedStream)
64+
duration := time.Since(startTime)
65+
66+
sm.RequestDuration.With(
67+
"service", service, "method", method, "code", grpc.Code(err).String(),
68+
).Observe(float64(duration) / float64(time.Second))
69+
sm.RequestsCompleted.With("service", service, "method", method, "code", grpc.Code(err).String()).Add(1)
70+
71+
return err
72+
}
73+
}
74+
75+
func serviceMethod(fullMethod string) (service, method string) {
76+
normalizedMethod := strings.Replace(fullMethod, ".", "_", -1)
77+
parts := strings.SplitN(normalizedMethod[1:], "/", 2)
78+
return parts[0], parts[1]
79+
}
80+
81+
type serverStream struct {
82+
grpc.ServerStream
83+
messagesSent metrics.Counter
84+
messagesReceived metrics.Counter
85+
}
86+
87+
func (ss *serverStream) SendMsg(msg interface{}) error {
88+
ss.messagesSent.Add(1)
89+
return ss.ServerStream.SendMsg(msg)
90+
}
91+
92+
func (ss *serverStream) RecvMsg(msg interface{}) error {
93+
err := ss.ServerStream.RecvMsg(msg)
94+
if err == nil {
95+
ss.messagesReceived.Add(1)
96+
}
97+
return err
98+
}

0 commit comments

Comments
 (0)