Skip to content

Commit

Permalink
spanner: implement generation and propagation of "x-goog-spanner-requ…
Browse files Browse the repository at this point in the history
…est-id" Header

This change adds sending over the "x-goog-spanner-request-id" header
for every unary and streaming call, in the form:

	<processId>.<clientId>.<requestCountForClient>.<channelId>.<rpcCountForRequest>

where:
* processId is a randomly generated uint32 singleton for the lifetime of a process
* clientId is the monotonically increasing id/number of gRPC Spanner clients created
* requestCountForClient is the monotonically increasing number of requests made by the client
* channelId currently at 1 is the Id of the client for Go
* rpcCountForRequest is the number of RPCs/retries within a specific request

This header is to be sent on both unary and streaming calls and it'll
help debug latencies for customers. After this change, the next phase shall
be providing a mechanism for customers to consume the requestID and log it
along with the documentation for how to accomplish that.

Updates #11073
  • Loading branch information
odeke-em committed Nov 20, 2024
1 parent 627c69a commit 9a9f77c
Show file tree
Hide file tree
Showing 6 changed files with 1,353 additions and 20 deletions.
4 changes: 4 additions & 0 deletions spanner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (e *Error) decorate(info string) {
// APIError error having given error code as its status.
func spannerErrorf(code codes.Code, format string, args ...interface{}) error {
msg := fmt.Sprintf(format, args...)
return spannerError(code, msg)
}

func spannerError(code codes.Code, msg string) error {
wrapped, _ := apierror.FromError(status.Error(code, msg))
return &Error{
Code: code,
Expand Down
137 changes: 121 additions & 16 deletions spanner/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ package spanner

import (
"context"
"fmt"
"math/rand"
"strings"
"sync/atomic"
"time"

vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/internal"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -65,8 +70,26 @@ type spannerClient interface {
// grpcSpannerClient is the gRPC API implementation of the transport-agnostic
// spannerClient interface.
type grpcSpannerClient struct {
id int
raw *vkit.Client
metricsTracerFactory *builtinMetricsTracerFactory

// These fields are used to uniquely track x-goog-spanner-request-id where:
// raw(*vkit.Client) is the channel, and channelID is derived from the ordinal
// count of unique *vkit.Client as retrieved from the session pool.
channelID uint64
// nthRequest shall always be incremented on every fresh request.
nthRequest *atomic.Uint32
// This id uniquely defines the RPC being issued and in
// the case of retries it should be incremented.
rpcID *atomic.Uint64
}

func (g *grpcSpannerClient) prepareRequestIDTrackers(clientID int, channelID uint64) {
g.id = clientID // The ID derived from the SpannerClient.
g.channelID = channelID
g.nthRequest = new(atomic.Uint32)
g.rpcID = new(atomic.Uint64)
}

var (
Expand All @@ -82,7 +105,22 @@ func newGRPCSpannerClient(ctx context.Context, sc *sessionClient, opts ...option
return nil, err
}

// Retrieve the channelID for each spannerClient.
// It is assumed that this method is invoked
// under a lock already.
channelID, ok := sc.channelIDMap[raw]
if !ok {
if sc.channelIDMap == nil {
sc.channelIDMap = make(map[*vkit.Client]uint64)
}
channelID = uint64(len(sc.channelIDMap)) + 1
sc.channelIDMap[raw] = channelID
}

g := &grpcSpannerClient{raw: raw, metricsTracerFactory: sc.metricsTracerFactory}
clientID := sc.nthClient
g.prepareRequestIDTrackers(clientID, channelID)

clientInfo := []string{"gccl", internal.Version}
if sc.userAgent != "" {
agentWithVersion := strings.SplitN(sc.userAgent, "/", 2)
Expand Down Expand Up @@ -118,7 +156,7 @@ func (g *grpcSpannerClient) CreateSession(ctx context.Context, req *spannerpb.Cr
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.CreateSession(ctx, req, opts...)
resp, err := g.raw.CreateSession(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -128,7 +166,7 @@ func (g *grpcSpannerClient) BatchCreateSessions(ctx context.Context, req *spanne
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.BatchCreateSessions(ctx, req, opts...)
resp, err := g.raw.BatchCreateSessions(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -138,45 +176,112 @@ func (g *grpcSpannerClient) GetSession(ctx context.Context, req *spannerpb.GetSe
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.GetSession(ctx, req, opts...)
resp, err := g.raw.GetSession(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) ListSessions(ctx context.Context, req *spannerpb.ListSessionsRequest, opts ...gax.CallOption) *vkit.SessionIterator {
return g.raw.ListSessions(ctx, req, opts...)
return g.raw.ListSessions(ctx, req, g.optsWithNextRequestID(opts)...)
}

func (g *grpcSpannerClient) DeleteSession(ctx context.Context, req *spannerpb.DeleteSessionRequest, opts ...gax.CallOption) error {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
err := g.raw.DeleteSession(ctx, req, opts...)
err := g.raw.DeleteSession(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return err
}

var randIdForProcess uint32

func init() {
randIdForProcess = rand.New(rand.NewSource(time.Now().UnixNano())).Uint32()
}

const xSpannerRequestIDHeader = "x-goog-spanner-request-id"

// optsWithNextRequestID bundles priors with a new header "x-goog-spanner-request-id"
func (g *grpcSpannerClient) optsWithNextRequestID(priors []gax.CallOption) []gax.CallOption {
return append(priors, &wrapRetryer{g})
}

type wrapRetryer struct {
gsc *grpcSpannerClient
}

var _ gax.CallOption = (*wrapRetryer)(nil)

func (g *grpcSpannerClient) appendRequestIDToGRPCOptions(priors []grpc.CallOption, nthRequest, nthRPC uint32) []grpc.CallOption {
// TODO: Decide if each integer header field should be padded? To what width?
// Should we just let fields fill up naturally in a bid to reduce bandwidth?
// Should we have a standardized endianness: Little Endian or Big Endian?
requestID := fmt.Sprintf("%d.%d.%d.%d.%d", randIdForProcess, g.id, nthRequest, g.channelID, nthRPC)
md := metadata.MD{xSpannerRequestIDHeader: []string{requestID}}
return append(priors, grpc.Header(&md))
}

func (wr *wrapRetryer) Resolve(cs *gax.CallSettings) {
nthRequest := wr.gsc.nextNthRequest()
nthRPC := uint32(1)
originalGRPCOptions := cs.GRPC
// Create the first request header.
cs.GRPC = wr.gsc.appendRequestIDToGRPCOptions(originalGRPCOptions, nthRequest, nthRPC)

if cs.Retry == nil {
// If there was no retrier, we can simply just insert the
// requestID along with simply incrementing the rpcID and
// return without having to mutate the original GRPC options.
return
}

// Otherwise in this case for each retry, we need to increment nthRPC on every
// retry and re-insert the requestID header to the original cs.GRPC callOptions.
originalRetryer := cs.Retry()
newRetryer := func() gax.Retryer {
return (wrapRetryFn)(func(err error) (pause time.Duration, shouldRetry bool) {
nthRPC++
cs.GRPC = wr.gsc.appendRequestIDToGRPCOptions(originalGRPCOptions, nthRequest, nthRPC)
return originalRetryer.Retry(err)
})
}
cs.Retry = newRetryer
}

type wrapRetryFn func(err error) (time.Duration, bool)

var _ gax.Retryer = (wrapRetryFn)(nil)

func (fn wrapRetryFn) Retry(err error) (time.Duration, bool) {
return fn(err)
}

func (g *grpcSpannerClient) nextNthRequest() uint32 {
return g.nthRequest.Add(1)
}

func (g *grpcSpannerClient) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (*spannerpb.ResultSet, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.ExecuteSql(ctx, req, opts...)
resp, err := g.raw.ExecuteSql(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) ExecuteStreamingSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest, opts ...gax.CallOption) (spannerpb.Spanner_ExecuteStreamingSqlClient, error) {
return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
return g.raw.ExecuteStreamingSql(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...)
}

func (g *grpcSpannerClient) ExecuteBatchDml(ctx context.Context, req *spannerpb.ExecuteBatchDmlRequest, opts ...gax.CallOption) (*spannerpb.ExecuteBatchDmlResponse, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.ExecuteBatchDml(ctx, req, opts...)
resp, err := g.raw.ExecuteBatchDml(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -186,21 +291,21 @@ func (g *grpcSpannerClient) Read(ctx context.Context, req *spannerpb.ReadRequest
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.Read(ctx, req, opts...)
resp, err := g.raw.Read(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) StreamingRead(ctx context.Context, req *spannerpb.ReadRequest, opts ...gax.CallOption) (spannerpb.Spanner_StreamingReadClient, error) {
return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
return g.raw.StreamingRead(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...)
}

func (g *grpcSpannerClient) BeginTransaction(ctx context.Context, req *spannerpb.BeginTransactionRequest, opts ...gax.CallOption) (*spannerpb.Transaction, error) {
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.BeginTransaction(ctx, req, opts...)
resp, err := g.raw.BeginTransaction(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -210,7 +315,7 @@ func (g *grpcSpannerClient) Commit(ctx context.Context, req *spannerpb.CommitReq
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.Commit(ctx, req, opts...)
resp, err := g.raw.Commit(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -220,7 +325,7 @@ func (g *grpcSpannerClient) Rollback(ctx context.Context, req *spannerpb.Rollbac
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
err := g.raw.Rollback(ctx, req, opts...)
err := g.raw.Rollback(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return err
Expand All @@ -230,7 +335,7 @@ func (g *grpcSpannerClient) PartitionQuery(ctx context.Context, req *spannerpb.P
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.PartitionQuery(ctx, req, opts...)
resp, err := g.raw.PartitionQuery(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
Expand All @@ -240,12 +345,12 @@ func (g *grpcSpannerClient) PartitionRead(ctx context.Context, req *spannerpb.Pa
mt := g.newBuiltinMetricsTracer(ctx)
defer recordOperationCompletion(mt)
ctx = context.WithValue(ctx, metricsTracerKey, mt)
resp, err := g.raw.PartitionRead(ctx, req, opts...)
resp, err := g.raw.PartitionRead(ctx, req, g.optsWithNextRequestID(opts)...)
statusCode, _ := status.FromError(err)
mt.currOp.setStatus(statusCode.Code().String())
return resp, err
}

func (g *grpcSpannerClient) BatchWrite(ctx context.Context, req *spannerpb.BatchWriteRequest, opts ...gax.CallOption) (spannerpb.Spanner_BatchWriteClient, error) {
return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, opts...)
return g.raw.BatchWrite(peer.NewContext(ctx, &peer.Peer{}), req, g.optsWithNextRequestID(opts)...)
}
4 changes: 4 additions & 0 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
MethodExecuteBatchDml string = "EXECUTE_BATCH_DML"
MethodStreamingRead string = "EXECUTE_STREAMING_READ"
MethodBatchWrite string = "BATCH_WRITE"
MethodPartitionQuery string = "PARTITION_QUERY"
)

// StatementResult represents a mocked result on the test server. The result is
Expand Down Expand Up @@ -1107,6 +1108,9 @@ func (s *inMemSpannerServer) Rollback(ctx context.Context, req *spannerpb.Rollba
}

func (s *inMemSpannerServer) PartitionQuery(ctx context.Context, req *spannerpb.PartitionQueryRequest) (*spannerpb.PartitionResponse, error) {
if err := s.simulateExecutionTime(MethodPartitionQuery, req); err != nil {
return nil, err
}
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
Expand Down
Loading

0 comments on commit 9a9f77c

Please sign in to comment.