Skip to content

Commit

Permalink
GOCBC-1657: Update couchbase2 tracing to send trace over grpc
Browse files Browse the repository at this point in the history
Motivation
----------
CNG supports allowing us to tie up SDK and CNG trace via the
relevant grpc property. We should wire this up.

Changes
--------
Add new interface for getting otel spans out of the provided
span to the SDK, if it is an otel span.
Update context passed into grpc layer to include span info.

Change-Id: I66cdb4bb8b936d0f2c4c7dfc01f78bfc48b28d3c
Reviewed-on: https://review.couchbase.org/c/gocb/+/214329
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
chvck committed Aug 14, 2024
1 parent 029cd49 commit fcf55fc
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 104 deletions.
38 changes: 30 additions & 8 deletions client_ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"github.com/couchbaselabs/gocbconnstr/v2"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -63,12 +65,27 @@ func (c *psConnectionMgr) buildConfig(cluster *Cluster) error {

logger := newZapLogger()

var tp trace.TracerProvider
if c.tracer != nil {
if tracer, ok := c.tracer.(OtelAwareRequestTracer); ok {
tp = tracer.Provider()
}
}
var mp metric.MeterProvider
if c.meter != nil {
if meter, ok := c.meter.meter.(OtelAwareMeter); ok {
mp = meter.Provider()
}
}

c.config = &gocbcoreps.DialOptions{
Username: creds[0].Username,
Password: creds[0].Password,
InsecureSkipVerify: cluster.securityConfig.TLSSkipVerify,
ClientCertificate: cluster.securityConfig.TLSRootCAs,
Logger: logger,
TracerProvider: tp,
MeterProvider: mp,
}

return nil
Expand Down Expand Up @@ -332,7 +349,7 @@ func (p *psOpManagerProvider) NewManager(parentSpan RequestSpan, opName string,

type psOpManager interface {
IsIdempotent() bool
TraceSpanContext() RequestSpanContext
TraceSpan() RequestSpan
OperationID() string
RetryStrategy() RetryStrategy
OpName() string
Expand Down Expand Up @@ -398,15 +415,17 @@ func (m *psOpManagerDefault) NewSpan(name string) RequestSpan {
}

func (m *psOpManagerDefault) Finish(noMetrics bool) {
retries := m.RetryInfo().RetryAttempts()
m.span.SetAttribute(spanAttribRetries, retries)
m.span.End()

if !noMetrics {
m.meter.ValueRecord(m.service, m.opName, m.createdTime)
}
}

func (m *psOpManagerDefault) TraceSpanContext() RequestSpanContext {
return m.span.Context()
func (m *psOpManagerDefault) ValueRecord() {
m.meter.ValueRecord(m.service, m.opName, m.createdTime)
}

func (m *psOpManagerDefault) TraceSpan() RequestSpan {
Expand Down Expand Up @@ -481,6 +500,12 @@ func (m *psOpManagerDefault) Context() context.Context {
return m.ctx
}

func wrapPSOpCtx[ReqT any, RespT any](ctx context.Context, m psOpManager,
req ReqT,
fn func(context.Context, ReqT, ...grpc.CallOption) (RespT, error)) (RespT, error) {
return wrapPSOpCtxWithPeek(ctx, m, req, m.TraceSpan(), fn, nil)
}

func wrapPSOp[ReqT any, RespT any](m psOpManager, req ReqT,
fn func(context.Context, ReqT, ...grpc.CallOption) (RespT, error)) (RespT, error) {
ctx, cancel := context.WithTimeout(m.Context(), m.Timeout())
Expand All @@ -489,16 +514,13 @@ func wrapPSOp[ReqT any, RespT any](m psOpManager, req ReqT,
return wrapPSOpCtx(ctx, m, req, fn)
}

func wrapPSOpCtx[ReqT any, RespT any](ctx context.Context, m psOpManager, req ReqT, fn func(context.Context, ReqT, ...grpc.CallOption) (RespT, error)) (RespT, error) {
return wrapPSOpCtxWithPeek(ctx, m, req, fn, nil)
}

func wrapPSOpCtxWithPeek[ReqT any, RespT any](ctx context.Context,
m psOpManager,
req ReqT,
parentSpan RequestSpan,
fn func(context.Context, ReqT, ...grpc.CallOption) (RespT, error),
peekResult func(RespT) error) (RespT, error) {
retryReq := newRetriableRequestPS(m.OpName(), m.IsIdempotent(), m.TraceSpanContext(), m.OperationID(), m.RetryStrategy())
retryReq := newRetriableRequestPS(m.OpName(), m.IsIdempotent(), parentSpan, m.OperationID(), m.RetryStrategy())
m.SetRetryRequest(retryReq)

res, err := handleRetriableRequest(ctx, m.CreatedAt(), m.Tracer(), req, retryReq, fn, m.RetryReasonFor, peekResult)
Expand Down
1 change: 1 addition & 0 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ const (
spanAttribDBSystemValue = "couchbase"
spanAttribOperationIDKey = "db.couchbase.operation_id"
spanAttribOperationKey = "db.operation"
spanAttribRetries = "db.couchbase.retries"
spanAttribLocalIDKey = "db.couchbase.local_id"
spanAttribNetTransport = "net.transport"
spanAttribNetHostNameKey = "net.host.name"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ require (
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/zap v1.27.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda
google.golang.org/grpc v1.63.2
Expand All @@ -24,8 +26,6 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
Expand Down
8 changes: 6 additions & 2 deletions kvopmanager_ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type kvOpManagerPs struct {
createdTime time.Time
meter *meterWrapper
compressor *compressor

req *retriableRequestPs
}

func (m *kvOpManagerPs) getTimeout() time.Duration {
Expand Down Expand Up @@ -134,6 +136,8 @@ func (m *kvOpManagerPs) SetIsIdempotent(idempotent bool) {
}

func (m *kvOpManagerPs) Finish(noMetrics bool) {
retries := m.RetryInfo().RetryAttempts()
m.span.SetAttribute(spanAttribRetries, retries)
m.span.End()

if !noMetrics {
Expand Down Expand Up @@ -228,11 +232,11 @@ func (m *kvOpManagerPs) Tracer() RequestTracer {
}

func (m *kvOpManagerPs) RetryInfo() retriedRequestInfo {
return nil
return m.req
}

func (m *kvOpManagerPs) SetRetryRequest(req *retriableRequestPs) {
// we don't store this as we don't need it.
m.req = req
}

func (m *kvOpManagerPs) OperationID() string {
Expand Down
6 changes: 6 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocb

import (
"github.com/couchbase/gocbcore/v10"
"go.opentelemetry.io/otel/metric"
"sync"
"time"
)
Expand All @@ -12,6 +13,11 @@ type Meter interface {
ValueRecorder(name string, tags map[string]string) (ValueRecorder, error)
}

type OtelAwareMeter interface {
Wrapped() metric.Meter
Provider() metric.MeterProvider
}

// Counter is used for incrementing a synchronous count metric.
type Counter interface {
IncrementBy(num uint64)
Expand Down
151 changes: 76 additions & 75 deletions queryindexprovider_ps.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package gocb

import (
"context"
"errors"
"strings"
"sync"
"time"

"github.com/couchbase/goprotostellar/genproto/admin_query_v1"
Expand Down Expand Up @@ -205,11 +207,19 @@ func (qpc *queryIndexProviderPs) GetAllIndexes(c *Collection, bucketName string,
return nil, err
}

return qpc.getAllIndexes(c, bucketName, manager, opts)
return qpc.getAllIndexes(c, bucketName, manager, manager.TraceSpan(), &getAllIndexesOptions{
ScopeName: opts.ScopeName,
CollectionName: opts.CollectionName,
})
}

type getAllIndexesOptions struct {
ScopeName string
CollectionName string
}

func (qpc *queryIndexProviderPs) getAllIndexes(c *Collection, bucketName string, manager *psOpManagerDefault,
opts *GetAllQueryIndexesOptions) ([]QueryIndex, error) {
func (qpc *queryIndexProviderPs) getAllIndexes(c *Collection, bucketName string, manager *psOpManagerDefault, parentSpan RequestSpan,
opts *getAllIndexesOptions) ([]QueryIndex, error) {
bucket, scope, collection := qpc.makeKeyspace(c, bucketName, opts.ScopeName, opts.CollectionName)

req := &admin_query_v1.GetAllIndexesRequest{
Expand All @@ -218,7 +228,10 @@ func (qpc *queryIndexProviderPs) getAllIndexes(c *Collection, bucketName string,
CollectionName: collection,
}

resp, err := wrapPSOp(manager, req, qpc.provider.GetAllIndexes)
ctx, cancel := context.WithTimeout(manager.Context(), manager.Timeout())
defer cancel()

resp, err := wrapPSOpCtxWithPeek(ctx, manager, req, parentSpan, qpc.provider.GetAllIndexes, nil)
if err != nil {
return nil, qpc.handleError(err)
}
Expand Down Expand Up @@ -324,38 +337,58 @@ func (qpc *queryIndexProviderPs) BuildDeferredIndexes(c *Collection, bucketName
return indexNames, nil
}

func checkIndexesActivePs(indexes []QueryIndex, checkList []string) (bool, error) {
var checkIndexes []QueryIndex
for i := 0; i < len(checkList); i++ {
indexName := checkList[i]
type waitForIndexOnlineOptions struct {
ScopeName string
CollectionName string
}

func (qpc *queryIndexProviderPs) waitForIndexOnline(c *Collection, indexName, bucketName string, manager *psOpManagerDefault, opts *waitForIndexOnlineOptions) error {
span := manager.NewSpan("manager_query_wait_for_index_online")
span.SetAttribute("db.operation", "WaitForIndexOnline")
defer span.End()

for j := 0; j < len(indexes); j++ {
if indexes[j].Name == indexName {
checkIndexes = append(checkIndexes, indexes[j])
break
}
}
bucket, scope, collection := qpc.makeKeyspace(c, bucketName, opts.ScopeName, opts.CollectionName)
scopeName := ""
if scope != nil {
scopeName = *scope
}
collectionName := ""
if collection != nil {
collectionName = *scope
}

if len(checkIndexes) != len(checkList) {
return false, ErrIndexNotFound
req := &admin_query_v1.WaitForIndexOnlineRequest{
BucketName: bucket,
ScopeName: scopeName,
CollectionName: collectionName,
Name: indexName,
}

for i := 0; i < len(checkIndexes); i++ {
if checkIndexes[i].State != string(queryIndexStateOnline) {
logDebugf("Index not online: %s is in state %s", checkIndexes[i].Name, checkIndexes[i].State)
return false, nil
}
ctx, cancel := context.WithTimeout(manager.Context(), manager.Timeout())
defer cancel()

_, err := wrapPSOpCtxWithPeek(ctx, manager, req, span, qpc.provider.WaitForIndexOnline, nil)
if err != nil {
return qpc.handleError(err)
}
return true, nil

return nil
}

func (qpc *queryIndexProviderPs) WatchIndexes(c *Collection, bucketName string, watchList []string, timeout time.Duration, opts *WatchQueryIndexOptions,
) error {
manager := qpc.newOpManager(opts.ParentSpan, "manager_query_watch_indexes", map[string]interface{}{})
defer manager.Finish(false)

manager.SetContext(opts.Context)
ctx := opts.Context
if ctx == nil {
ctx = context.Background()
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

manager.SetContext(ctx)
manager.SetIsIdempotent(true)
manager.SetRetryStrategy(opts.RetryStrategy)
manager.SetTimeout(timeout)
Expand All @@ -368,63 +401,31 @@ func (qpc *queryIndexProviderPs) WatchIndexes(c *Collection, bucketName string,
watchList = append(watchList, "#primary")
}

start := time.Now()
deadline := start.Add(timeout)

curInterval := 50 * time.Millisecond
for {
if deadline.Before(time.Now()) {
return &TimeoutError{
InnerError: ErrUnambiguousTimeout,
TimeObserved: time.Since(start),
}
}

span := manager.NewSpan("manager_query_get_all_indexes")
var firstErr error
var errLock sync.Mutex

indexes, err := qpc.getAllIndexes(
c,
bucketName,
manager,
&GetAllQueryIndexesOptions{
Timeout: time.Until(deadline),
RetryStrategy: opts.RetryStrategy,
ParentSpan: span,
ScopeName: opts.ScopeName,
var wg sync.WaitGroup
for _, index := range watchList {
wg.Add(1)
go func(indexName string) {
err := qpc.waitForIndexOnline(c, indexName, bucketName, manager, &waitForIndexOnlineOptions{
CollectionName: opts.CollectionName,
Context: opts.Context,
ScopeName: opts.ScopeName,
})
span.End()
if err != nil {
return err
}

allOnline, err := checkIndexesActivePs(indexes, watchList)
if err != nil {
return err
}

if allOnline {
break
}

curInterval += 500 * time.Millisecond
if curInterval > 1000 {
curInterval = 1000
}

// Make sure we don't sleep past our overall deadline, if we adjust the
// deadline then it will be caught at the top of this loop as a timeout.
sleepDeadline := time.Now().Add(curInterval)
if sleepDeadline.After(deadline) {
sleepDeadline = deadline
}

// wait till our next poll interval
time.Sleep(time.Until(sleepDeadline))
if err != nil {
errLock.Lock()
if firstErr == nil {
firstErr = err
}
errLock.Unlock()
cancel()
}
wg.Done()
}(index)
}
wg.Wait()

return nil
return firstErr
}

func (qpc *queryIndexProviderPs) normaliseCollectionKeyspace(c *Collection) (string, string) {
Expand Down
Loading

0 comments on commit fcf55fc

Please sign in to comment.