Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(spanner): add support of client side native metrics collection and export #10419

Merged
merged 25 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2744ff7
chore(spanner): add support of client side native metrics collection …
rahul2393 Aug 6, 2024
ed7ca5e
fix build
rahul2393 Aug 22, 2024
c8ae705
fix header issue
rahul2393 Aug 23, 2024
4770944
fix tests
rahul2393 Aug 23, 2024
bff82e2
fix client_uid, client_name and method signature.
rahul2393 Aug 23, 2024
0511472
fix directpath_used var too correct extract peerInfo, and capture str…
rahul2393 Aug 24, 2024
a47ffb0
remove dep on grpc_go_middleware
rahul2393 Aug 27, 2024
14bb8bf
use grpc connection target to check if direct path is enabled.
rahul2393 Aug 27, 2024
3a8e96a
refactor as per suggestions
rahul2393 Sep 2, 2024
8f3dbc3
fix header
rahul2393 Sep 2, 2024
4b050ea
fix tests
rahul2393 Sep 2, 2024
cbafe39
return wrappedStream from stream interceptor to fix peerInfo not avai…
rahul2393 Sep 4, 2024
9d8ee5e
fix tests
rahul2393 Sep 4, 2024
da6b8ce
fix tests
rahul2393 Sep 4, 2024
1dceb29
skip holding client connection on wrapper
rahul2393 Sep 4, 2024
de67be7
Merge branch 'main' into client_side_metrics
rahul2393 Sep 4, 2024
956a510
Merge branch 'main' into client_side_metrics
rahul2393 Sep 5, 2024
a7cf8eb
Merge remote-tracking branch 'origin' into client_side_metrics
rahul2393 Sep 20, 2024
d432e67
do not emil metrics if env is not set
rahul2393 Sep 20, 2024
5a61105
fix tests
rahul2393 Sep 20, 2024
cba2d24
mark env to enable native metrics as experimental
rahul2393 Sep 20, 2024
cd61438
Merge branch 'main' into client_side_metrics
rahul2393 Sep 24, 2024
78cd730
add resource label client_hash and updated export duration to 1 minutes
rahul2393 Sep 25, 2024
8d9db29
fix tests
rahul2393 Sep 25, 2024
a988837
Merge branch 'main' into client_side_metrics
rahul2393 Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix client_uid, client_name and method signature.
  • Loading branch information
rahul2393 committed Sep 2, 2024
commit bff82e23cb32a76e05de12ebe2d50c89be1dab2f
30 changes: 15 additions & 15 deletions spanner/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (g *grpcSpannerClient) CreateSession(ctx context.Context, req *spannerpb.Cr
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).CreateSession[0:len((*g.raw.CallOptions).CreateSession):len((*g.raw.CallOptions).CreateSession)], opts...)
var resp *spannerpb.Session
err := gaxInvokeWithRecorder(ctx, mt, "CreateSession", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.CreateSession", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.CreateSession(ctx, req, settings.GRPC...)
return err
Expand All @@ -163,7 +163,7 @@ func (g *grpcSpannerClient) BatchCreateSessions(ctx context.Context, req *spanne
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).BatchCreateSessions[0:len((*g.raw.CallOptions).BatchCreateSessions):len((*g.raw.CallOptions).BatchCreateSessions)], opts...)
var resp *spannerpb.BatchCreateSessionsResponse
err := gaxInvokeWithRecorder(ctx, mt, "BatchCreateSessions", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.BatchCreateSessions", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.BatchCreateSessions(ctx, req, settings.GRPC...)
return err
Expand All @@ -185,7 +185,7 @@ func (g *grpcSpannerClient) GetSession(ctx context.Context, req *spannerpb.GetSe
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).GetSession[0:len((*g.raw.CallOptions).GetSession):len((*g.raw.CallOptions).GetSession)], opts...)
var resp *spannerpb.Session
err := gaxInvokeWithRecorder(ctx, mt, "GetSession", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.GetSession", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.GetSession(ctx, req, settings.GRPC...)
return err
Expand All @@ -210,7 +210,7 @@ func (g *grpcSpannerClient) DeleteSession(ctx context.Context, req *spannerpb.De
hds = append(g.xGoogHeaders, hds...)
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).DeleteSession[0:len((*g.raw.CallOptions).DeleteSession):len((*g.raw.CallOptions).DeleteSession)], opts...)
err := gaxInvokeWithRecorder(ctx, mt, "DeleteSession", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.DeleteSession", func(ctx context.Context, settings gax.CallSettings) error {
var err error
_, err = g.client.DeleteSession(ctx, req, settings.GRPC...)
return err
Expand All @@ -229,7 +229,7 @@ func (g *grpcSpannerClient) ExecuteSql(ctx context.Context, req *spannerpb.Execu
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).ExecuteSql[0:len((*g.raw.CallOptions).ExecuteSql):len((*g.raw.CallOptions).ExecuteSql)], opts...)
var resp *spannerpb.ResultSet
err := gaxInvokeWithRecorder(ctx, mt, "ExecuteSql", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.ExecuteSql", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.ExecuteSql(ctx, req, settings.GRPC...)
return err
Expand All @@ -251,7 +251,7 @@ func (g *grpcSpannerClient) ExecuteStreamingSql(ctx context.Context, req *spanne
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).ExecuteStreamingSql[0:len((*g.raw.CallOptions).ExecuteStreamingSql):len((*g.raw.CallOptions).ExecuteStreamingSql)], opts...)
var resp spannerpb.Spanner_ExecuteStreamingSqlClient
err := gaxInvokeWithRecorder(ctx, mt, "ExecuteStreamingSql", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.ExecuteStreamingSql", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.ExecuteStreamingSql(ctx, req, settings.GRPC...)
return err
Expand All @@ -273,7 +273,7 @@ func (g *grpcSpannerClient) ExecuteBatchDml(ctx context.Context, req *spannerpb.
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).ExecuteBatchDml[0:len((*g.raw.CallOptions).ExecuteBatchDml):len((*g.raw.CallOptions).ExecuteBatchDml)], opts...)
var resp *spannerpb.ExecuteBatchDmlResponse
err := gaxInvokeWithRecorder(ctx, mt, "ExecuteBatchDml", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.ExecuteBatchDml", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.ExecuteBatchDml(ctx, req, settings.GRPC...)
return err
Expand All @@ -295,7 +295,7 @@ func (g *grpcSpannerClient) Read(ctx context.Context, req *spannerpb.ReadRequest
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).Read[0:len((*g.raw.CallOptions).Read):len((*g.raw.CallOptions).Read)], opts...)
var resp *spannerpb.ResultSet
err := gaxInvokeWithRecorder(ctx, mt, "Read", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.Read", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.Read(ctx, req, settings.GRPC...)
return err
Expand All @@ -317,7 +317,7 @@ func (g *grpcSpannerClient) StreamingRead(ctx context.Context, req *spannerpb.Re
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).StreamingRead[0:len((*g.raw.CallOptions).StreamingRead):len((*g.raw.CallOptions).StreamingRead)], opts...)
var resp spannerpb.Spanner_StreamingReadClient
err := gaxInvokeWithRecorder(ctx, mt, "StreamingRead", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.StreamingRead", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.StreamingRead(ctx, req, settings.GRPC...)
return err
Expand All @@ -339,7 +339,7 @@ func (g *grpcSpannerClient) BeginTransaction(ctx context.Context, req *spannerpb
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).BeginTransaction[0:len((*g.raw.CallOptions).BeginTransaction):len((*g.raw.CallOptions).BeginTransaction)], opts...)
var resp *spannerpb.Transaction
err := gaxInvokeWithRecorder(ctx, mt, "BeginTransaction", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.BeginTransaction", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.BeginTransaction(ctx, req, settings.GRPC...)
return err
Expand All @@ -361,7 +361,7 @@ func (g *grpcSpannerClient) Commit(ctx context.Context, req *spannerpb.CommitReq
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).Commit[0:len((*g.raw.CallOptions).Commit):len((*g.raw.CallOptions).Commit)], opts...)
var resp *spannerpb.CommitResponse
err := gaxInvokeWithRecorder(ctx, mt, "Commit", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.Commit", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.Commit(ctx, req, settings.GRPC...)
return err
Expand All @@ -382,7 +382,7 @@ func (g *grpcSpannerClient) Rollback(ctx context.Context, req *spannerpb.Rollbac
hds = append(g.xGoogHeaders, hds...)
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).Rollback[0:len((*g.raw.CallOptions).Rollback):len((*g.raw.CallOptions).Rollback)], opts...)
err := gaxInvokeWithRecorder(ctx, mt, "Rollback", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.Rollback", func(ctx context.Context, settings gax.CallSettings) error {
var err error
_, err = g.client.Rollback(ctx, req, settings.GRPC...)
return err
Expand All @@ -401,7 +401,7 @@ func (g *grpcSpannerClient) PartitionQuery(ctx context.Context, req *spannerpb.P
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).PartitionQuery[0:len((*g.raw.CallOptions).PartitionQuery):len((*g.raw.CallOptions).PartitionQuery)], opts...)
var resp *spannerpb.PartitionResponse
err := gaxInvokeWithRecorder(ctx, mt, "PartitionQuery", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.PartitionQuery", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.PartitionQuery(ctx, req, settings.GRPC...)
return err
Expand All @@ -423,7 +423,7 @@ func (g *grpcSpannerClient) PartitionRead(ctx context.Context, req *spannerpb.Pa
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).PartitionRead[0:len((*g.raw.CallOptions).PartitionRead):len((*g.raw.CallOptions).PartitionRead)], opts...)
var resp *spannerpb.PartitionResponse
err := gaxInvokeWithRecorder(ctx, mt, "PartitionRead", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.PartitionRead", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.PartitionRead(ctx, req, settings.GRPC...)
return err
Expand All @@ -445,7 +445,7 @@ func (g *grpcSpannerClient) BatchWrite(ctx context.Context, req *spannerpb.Batch
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
opts = append((*g.raw.CallOptions).BatchWrite[0:len((*g.raw.CallOptions).BatchWrite):len((*g.raw.CallOptions).BatchWrite)], opts...)
var resp spannerpb.Spanner_BatchWriteClient
err := gaxInvokeWithRecorder(ctx, mt, "BatchWrite", func(ctx context.Context, settings gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, mt, "Spanner.BatchWrite", func(ctx context.Context, settings gax.CallSettings) error {
var err error
resp, err = g.client.BatchWrite(ctx, req, settings.GRPC...)
return err
Expand Down
14 changes: 10 additions & 4 deletions spanner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var (
// duration between two metric exports
defaultSamplePeriod = 5 * time.Minute

clientName = fmt.Sprintf("go-spanner v%v", internal.Version)
clientName = fmt.Sprintf("spanner-go/%v", internal.Version)

bucketBounds = []float64{0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, 16.0, 20.0, 25.0, 30.0, 40.0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See internal discussion - please sync with Surbhi on the default buckets we use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0, 650.0,
Expand All @@ -86,6 +86,12 @@ var (
// All the built-in metrics have same attributes except 'status' and 'streaming'
// These attributes need to be added to only few of the metrics
metricsDetails = map[string]metricInfo{
metricNameOperationCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: false,
},
metricNameOperationLatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
Expand Down Expand Up @@ -113,7 +119,7 @@ var (
if err != nil {
return "", err
}
return "go-" + uuid.NewString() + "@" + hostname, nil
return uuid.NewString() + "@" + strconv.FormatInt(int64(os.Getpid()), 10) + "@" + hostname, nil
}

exporterOpts = []option.ClientOption{}
Expand Down Expand Up @@ -382,8 +388,8 @@ func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) ([]attribut
return attrKeyValues, fmt.Errorf("unable to create attributes list for unknown metric: %v", metricName)
}

attrKeyValues = append(attrKeyValues, attribute.Bool(metricLabelKeyDirectPathEnabled, mt.currOp.directPathEnabled))
attrKeyValues = append(attrKeyValues, attribute.Bool(metricLabelKeyDirectPathUsed, mt.currOp.currAttempt.directPathUsed))
attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyDirectPathEnabled, strconv.FormatBool(mt.currOp.directPathEnabled)))
attrKeyValues = append(attrKeyValues, attribute.String(metricLabelKeyDirectPathUsed, strconv.FormatBool(mt.currOp.currAttempt.directPathUsed)))

rpcStatus := mt.currOp.status
if mDetails.recordedPerAttempt {
Expand Down