Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(spanner): add support of client side native metrics collection and export #10419

Merged
merged 25 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2744ff7
chore(spanner): add support of client side native metrics collection …
rahul2393 Aug 6, 2024
ed7ca5e
fix build
rahul2393 Aug 22, 2024
c8ae705
fix header issue
rahul2393 Aug 23, 2024
4770944
fix tests
rahul2393 Aug 23, 2024
bff82e2
fix client_uid, client_name and method signature.
rahul2393 Aug 23, 2024
0511472
fix directpath_used var too correct extract peerInfo, and capture str…
rahul2393 Aug 24, 2024
a47ffb0
remove dep on grpc_go_middleware
rahul2393 Aug 27, 2024
14bb8bf
use grpc connection target to check if direct path is enabled.
rahul2393 Aug 27, 2024
3a8e96a
refactor as per suggestions
rahul2393 Sep 2, 2024
8f3dbc3
fix header
rahul2393 Sep 2, 2024
4b050ea
fix tests
rahul2393 Sep 2, 2024
cbafe39
return wrappedStream from stream interceptor to fix peerInfo not avai…
rahul2393 Sep 4, 2024
9d8ee5e
fix tests
rahul2393 Sep 4, 2024
da6b8ce
fix tests
rahul2393 Sep 4, 2024
1dceb29
skip holding client connection on wrapper
rahul2393 Sep 4, 2024
de67be7
Merge branch 'main' into client_side_metrics
rahul2393 Sep 4, 2024
956a510
Merge branch 'main' into client_side_metrics
rahul2393 Sep 5, 2024
a7cf8eb
Merge remote-tracking branch 'origin' into client_side_metrics
rahul2393 Sep 20, 2024
d432e67
do not emil metrics if env is not set
rahul2393 Sep 20, 2024
5a61105
fix tests
rahul2393 Sep 20, 2024
cba2d24
mark env to enable native metrics as experimental
rahul2393 Sep 20, 2024
cd61438
Merge branch 'main' into client_side_metrics
rahul2393 Sep 24, 2024
78cd730
add resource label client_hash and updated export duration to 1 minutes
rahul2393 Sep 25, 2024
8d9db29
fix tests
rahul2393 Sep 25, 2024
a988837
Merge branch 'main' into client_side_metrics
rahul2393 Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix header
  • Loading branch information
rahul2393 committed Sep 2, 2024
commit 8f3dbc309cd04e9dd37322c523d5afb1ee9b655e
1 change: 1 addition & 0 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
return stream(
contextWithOutgoingMetadata(ctx, sh.getMetadata(), t.disableRouteToLeader),
sh.session.logger,
t.sp.sc.metricsTracerFactory,
rpc,
t.setTimestamp,
t.release)
Expand Down
78 changes: 34 additions & 44 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,15 +591,14 @@ func metricsInterceptor() grpc.UnaryClientInterceptor {

mt.method = method
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = attemptTracer{}
mt.currOp.currAttempt = &attemptTracer{}
mt.currOp.currAttempt.setStartTime(time.Now())
if strings.HasPrefix(cc.Target(), "google-c2p") {
mt.currOp.setDirectPathEnabled(true)
}

peerInfo := &peer.Peer{}
ctx = peer.NewContext(ctx, peerInfo)

opts = append(opts, grpc.Peer(peerInfo))
err := invoker(ctx, method, req, reply, cc, opts...)

statusCode, _ := status.FromError(err)
Expand All @@ -612,6 +611,7 @@ func metricsInterceptor() grpc.UnaryClientInterceptor {
isDirectPathUsed = true
}
}

mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed)
recordAttemptCompletion(mt)
return err
Expand All @@ -635,33 +635,27 @@ func metricsStreamInterceptor() grpc.StreamClientInterceptor {

mt.method = method
mt.currOp.incrementAttemptCount()
mt.currOp.currAttempt = attemptTracer{}
mt.currOp.currAttempt = &attemptTracer{}
mt.currOp.currAttempt.setStartTime(time.Now())
if strings.HasPrefix(cc.Target(), "google-c2p") {
mt.currOp.setDirectPathEnabled(true)
}

peerInfo := &peer.Peer{}
ctx = peer.NewContext(ctx, peerInfo)

clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
statusCode, _ := status.FromError(err)
mt.currOp.currAttempt.setStatus(statusCode.Code().String())
recordAttemptCompletion(mt)
return nil, err
}

isDirectPathUsed := false
if peerInfo.Addr != nil {
remoteIP := peerInfo.Addr.String()
if strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) {
isDirectPathUsed = true
peerInfo, ok := peer.FromContext(clientStream.Context())
if ok {
if peerInfo.Addr != nil {
remoteIP := peerInfo.Addr.String()
if strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) {
isDirectPathUsed = true
}
}
}
statusCode, _ := status.FromError(err)
mt.currOp.currAttempt.setStatus(statusCode.Code().String())
mt.currOp.currAttempt.setDirectPathUsed(isDirectPathUsed)
recordAttemptCompletion(mt)
return clientStream, nil
return clientStream, err
}
}

Expand Down Expand Up @@ -1123,26 +1117,27 @@ func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {

// BatchWriteResponseIterator is an iterator over BatchWriteResponse structures returned from BatchWrite RPC.
type BatchWriteResponseIterator struct {
ctx context.Context
stream sppb.Spanner_BatchWriteClient
err error
dataReceived bool
replaceSession func(ctx context.Context) error
rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error)
release func(error)
cancel func()
ctx context.Context
stream sppb.Spanner_BatchWriteClient
err error
dataReceived bool
meterTracerFactory *builtinMetricsTracerFactory
replaceSession func(ctx context.Context) error
rpc func(ctx context.Context) (sppb.Spanner_BatchWriteClient, error)
release func(error)
cancel func()
}

// Next returns the next result. Its second return value is iterator.Done if
// there are no more results. Once Next returns Done, all subsequent calls
// will return Done.
func (r *BatchWriteResponseIterator) Next() (*sppb.BatchWriteResponse, error) {
var mt *builtinMetricsTracer
mt := r.meterTracerFactory.createBuiltinMetricsTracer(r.ctx)
defer func() {
if mt != nil && mt.builtInEnabled && mt.method != "" {
if mt.method != "" {
statusCode, _ := convertToGrpcStatusErr(r.err)
mt.currOp.setStatus(statusCode.String())
recordOperationCompletion(mt)
recordOperationCompletion(&mt)
}
}()
for {
Expand All @@ -1156,12 +1151,6 @@ func (r *BatchWriteResponseIterator) Next() (*sppb.BatchWriteResponse, error) {
r.stream, r.err = r.rpc(r.ctx)
continue
}
ctx := r.stream.Context()
v := ctx.Value(metricsTracerKey)
if v != nil {
mt = v.(*builtinMetricsTracer)
mt.currOp.setStartTime(time.Now())
}

// Read from the stream.
var response *sppb.BatchWriteResponse
Expand Down Expand Up @@ -1266,13 +1255,13 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup

mgsPb, err := mutationGroupsProto(mgs)
if err != nil {
return &BatchWriteResponseIterator{err: err}
return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err}
}

var sh *sessionHandle
sh, err = c.idleSessions.take(ctx)
if err != nil {
return &BatchWriteResponseIterator{err: err}
return &BatchWriteResponseIterator{meterTracerFactory: c.metricsTracerFactory, err: err}
}

rpc := func(ct context.Context) (sppb.Spanner_BatchWriteClient, error) {
Expand Down Expand Up @@ -1318,11 +1307,12 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
ctx, cancel := context.WithCancel(ctx)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchWriteResponseIterator")
return &BatchWriteResponseIterator{
ctx: ctx,
rpc: rpc,
replaceSession: replaceSession,
release: release,
cancel: cancel,
ctx: ctx,
meterTracerFactory: c.metricsTracerFactory,
rpc: rpc,
replaceSession: replaceSession,
release: release,
cancel: cancel,
}
}

Expand Down
74 changes: 53 additions & 21 deletions spanner/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -134,21 +136,30 @@ func (g *grpcSpannerClient) CreateSession(ctx context.Context, req *spannerpb.Cr
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.CreateSession(ctx, req, opts...)
resp, err := g.raw.CreateSession(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) BatchCreateSessions(ctx context.Context, req *spannerpb.BatchCreateSessionsRequest, opts ...gax.CallOption) (*spannerpb.BatchCreateSessionsResponse, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.BatchCreateSessions(ctx, req, opts...)
resp, err := g.raw.BatchCreateSessions(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) GetSession(ctx context.Context, req *spannerpb.GetSessionRequest, opts ...gax.CallOption) (*spannerpb.Session, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.GetSession(ctx, req, opts...)
resp, err := g.raw.GetSession(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) ListSessions(ctx context.Context, req *spannerpb.ListSessionsRequest, opts ...gax.CallOption) *vkit.SessionIterator {
Expand All @@ -159,79 +170,100 @@ func (g *grpcSpannerClient) DeleteSession(ctx context.Context, req *spannerpb.De
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.DeleteSession(ctx, req, opts...)
err := g.raw.DeleteSession(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return err
}

func (g *grpcSpannerClient) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (*spannerpb.ResultSet, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.ExecuteSql(ctx, req, opts...)
resp, err := g.raw.ExecuteSql(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) ExecuteStreamingSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (spannerpb.Spanner_ExecuteStreamingSqlClient, error) {
mt := g.newBuiltinMetricsTracer(ctx)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.ExecuteStreamingSql(ctx, req, opts...)
return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
}

func (g *grpcSpannerClient) ExecuteBatchDml(ctx context.Context, req *spannerpb.ExecuteBatchDmlRequest, opts ...gax.CallOption) (*spannerpb.ExecuteBatchDmlResponse, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.ExecuteBatchDml(ctx, req, opts...)
resp, err := g.raw.ExecuteBatchDml(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) Read(ctx context.Context, req *spannerpb.ReadRequest, opts ...gax.CallOption) (*spannerpb.ResultSet, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.Read(ctx, req, opts...)
resp, err := g.raw.Read(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) StreamingRead(ctx context.Context, req *spannerpb.ReadRequest, opts ...gax.CallOption) (spannerpb.Spanner_StreamingReadClient, error) {
mt := g.newBuiltinMetricsTracer(ctx)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.StreamingRead(ctx, req, opts...)
return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
}

func (g *grpcSpannerClient) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest, opts ...gax.CallOption) (*spannerpb.Transaction, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.BeginTransaction(ctx, req, opts...)
resp, err := g.raw.BeginTransaction(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) Commit(ctx context.Context, req *spannerpb.CommitRequest, opts ...gax.CallOption) (*spannerpb.CommitResponse, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.Commit(ctx, req, opts...)
resp, err := g.raw.Commit(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) Rollback(ctx context.Context, req *spannerpb.RollbackRequest, opts ...gax.CallOption) error {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.Rollback(ctx, req, opts...)
err := g.raw.Rollback(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return err
}

func (g *grpcSpannerClient) PartitionQuery(ctx context.Context, req *spannerpb.PartitionQueryRequest, opts ...gax.CallOption) (*spannerpb.PartitionResponse, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.PartitionQuery(ctx, req, opts...)
resp, err := g.raw.PartitionQuery(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) PartitionRead(ctx context.Context, req *spannerpb.PartitionReadRequest, opts ...gax.CallOption) (*spannerpb.PartitionResponse, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.PartitionRead(ctx, req, opts...)
resp, err := g.raw.PartitionRead(ctx, req, opts...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) BatchWrite(ctx context.Context, req *spannerpb.BatchWriteRequest, opts ...gax.CallOption) (spannerpb.Spanner_BatchWriteClient, error) {
mt := g.newBuiltinMetricsTracer(ctx)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
return g.raw.BatchWrite(ctx, req, opts...)
return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
}
16 changes: 16 additions & 0 deletions spanner/metric_monitoring_exporter_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package spanner

import (
Expand Down
9 changes: 4 additions & 5 deletions spanner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,9 @@ type builtinMetricsTracer struct {
instrumentOperationCount metric.Int64Counter // Counter for the number of operations.
instrumentAttemptCount metric.Int64Counter // Counter for the number of attempts.

method string // The method being traced.
isStreaming bool // Indicates if the operation is a streaming operation.
method string // The method being traced.

currOp opTracer // The current operation tracer.
currOp *opTracer // The current operation tracer.
}

// opTracer is used to record metrics for the entire operation, including retries.
Expand All @@ -304,7 +303,7 @@ type opTracer struct {

directPathEnabled bool // Indicates if DirectPath is enabled for the operation.

currAttempt attemptTracer // The current attempt tracer.
currAttempt *attemptTracer // The current attempt tracer.
}

// attemptTracer is used to record metrics for a single attempt within an operation.
Expand Down Expand Up @@ -361,7 +360,7 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
ctx: ctx,
builtInEnabled: tf.enabled,

currOp: currOpTracer,
currOp: &currOpTracer,
clientAttributes: tf.clientAttributes,

instrumentOperationLatencies: tf.operationLatencies,
Expand Down
Loading
Loading