Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 13 additions & 25 deletions base/dcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ func TestOneShotDCP(t *testing.T) {
return false
}

// start one shot feed
feedID := t.Name()

collection, err := AsCollection(dataStore)
require.NoError(t, err)
var collectionIDs []uint32
Expand All @@ -72,7 +69,7 @@ func TestOneShotDCP(t *testing.T) {

gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
require.NoError(t, err)
dcpClient, err := NewDCPClient(TestCtx(t), feedID, counterCallback, clientOptions, gocbv2Bucket)
dcpClient, err := NewDCPClient(TestCtx(t), counterCallback, clientOptions, gocbv2Bucket)
require.NoError(t, err)

doneChan, startErr := dcpClient.Start()
Expand Down Expand Up @@ -130,14 +127,13 @@ func TestTerminateDCPFeed(t *testing.T) {
}

// start continuous feed with terminator
feedID := t.Name()

gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
require.NoError(t, err)
options := DCPClientOptions{
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
}
dcpClient, err := NewDCPClient(TestCtx(t), feedID, counterCallback, options, gocbv2Bucket)
dcpClient, err := NewDCPClient(TestCtx(t), counterCallback, options, gocbv2Bucket)
require.NoError(t, err)

// Add documents in a separate goroutine
Expand Down Expand Up @@ -224,8 +220,6 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) {
return false
}

feedID := t.Name()

// Add documents
updatedBody := map[string]any{"foo": "bar"}
for i := range 10000 {
Expand All @@ -250,7 +244,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) {

gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
require.NoError(t, err)
dcpClient, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket)
dcpClient, err := NewDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

doneChan, startErr := dcpClient.Start()
Expand Down Expand Up @@ -284,7 +278,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) {
CollectionIDs: collectionIDs,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
}
dcpClient2, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket)
dcpClient2, err := NewDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

doneChan2, startErr2 := dcpClient2.Start()
Expand All @@ -303,7 +297,7 @@ func TestDCPClientMultiFeedConsistency(t *testing.T) {
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
}

dcpClient3, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket)
dcpClient3, err := NewDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

doneChan3, startErr3 := dcpClient3.Start()
Expand Down Expand Up @@ -352,7 +346,6 @@ func TestContinuousDCPRollback(t *testing.T) {
return false
}

feedID := t.Name()
gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
require.NoError(t, err)

Expand All @@ -375,7 +368,7 @@ func TestContinuousDCPRollback(t *testing.T) {
// timeout for feed to complete
timeout := time.After(20 * time.Second)

dcpClient, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket)
dcpClient, err := NewDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

_, startErr := dcpClient.Start()
Expand Down Expand Up @@ -410,7 +403,7 @@ func TestContinuousDCPRollback(t *testing.T) {
}
require.NoError(t, dcpClient.Close())

dcpClient1, err := NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket)
dcpClient1, err := NewDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)
// function to force the rollback of some vBuckets
dcpClient1.forceRollbackvBucket(vbUUID)
Expand Down Expand Up @@ -473,8 +466,6 @@ func TestResumeStoppedFeed(t *testing.T) {
return false
}

feedID := t.Name()

// Add documents
updatedBody := map[string]any{"foo": "bar"}
for i := range 10000 {
Expand Down Expand Up @@ -505,7 +496,7 @@ func TestResumeStoppedFeed(t *testing.T) {
gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
require.NoError(t, err)

dcpClient, err = NewDCPClient(ctx, feedID, counterCallback, dcpClientOpts, gocbv2Bucket)
dcpClient, err = NewDCPClient(ctx, counterCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

doneChan, startErr := dcpClient.Start()
Expand Down Expand Up @@ -539,7 +530,7 @@ func TestResumeStoppedFeed(t *testing.T) {
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
}

dcpClient2, err := NewDCPClient(ctx, feedID, secondCallback, dcpClientOpts, gocbv2Bucket)
dcpClient2, err := NewDCPClient(ctx, secondCallback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

doneChan2, startErr2 := dcpClient2.Start()
Expand Down Expand Up @@ -571,7 +562,6 @@ func TestBadAgentPriority(t *testing.T) {
bucket := GetTestBucket(t)
defer bucket.Close(ctx)

feedID := "fakeID"
panicCallback := func(event sgbucket.FeedEvent) bool {
t.Error(t, "Should not hit this callback")
return false
Expand All @@ -583,7 +573,7 @@ func TestBadAgentPriority(t *testing.T) {
gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
require.NoError(t, err)

dcpClient, err := NewDCPClient(TestCtx(t), feedID, panicCallback, dcpClientOpts, gocbv2Bucket)
dcpClient, err := NewDCPClient(TestCtx(t), panicCallback, dcpClientOpts, gocbv2Bucket)
require.Error(t, err)
require.Nil(t, dcpClient)
}
Expand All @@ -604,7 +594,6 @@ func TestDCPOutOfRangeSequence(t *testing.T) {
return false
}

feedID := t.Name()
dcpClientOpts := DCPClientOptions{
FailOnRollback: false,
OneShot: true,
Expand All @@ -619,7 +608,7 @@ func TestDCPOutOfRangeSequence(t *testing.T) {
gocbv2Bucket, err := AsGocbV2Bucket(bucket)
require.NoError(t, err)

dcpClient, err := NewDCPClient(ctx, feedID, callback, dcpClientOpts, gocbv2Bucket)
dcpClient, err := NewDCPClient(ctx, callback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

doneChan, startErr := dcpClient.Start()
Expand All @@ -646,7 +635,7 @@ func TestDCPOutOfRangeSequence(t *testing.T) {
InitialMetadata: metadata,
}

dcpClient, err = NewDCPClient(ctx, feedID, callback, dcpClientOpts, gocbv2Bucket)
dcpClient, err = NewDCPClient(ctx, callback, dcpClientOpts, gocbv2Bucket)
require.NoError(t, err)

_, startErr = dcpClient.Start()
Expand Down Expand Up @@ -722,7 +711,7 @@ func TestDCPFeedEventTypes(t *testing.T) {
return true
}

dcpClient, err := NewDCPClient(ctx, t.Name(), callback, clientOptions, gocbv2Bucket)
dcpClient, err := NewDCPClient(ctx, callback, clientOptions, gocbv2Bucket)
require.NoError(t, err)

doneChan, startErr := dcpClient.Start()
Expand Down Expand Up @@ -808,7 +797,6 @@ func TestDCPClientAgentConfig(t *testing.T) {
defer func() { gocbv2Bucket.Spec.Server = oldBucketSpecServer }()
gocbv2Bucket.Spec.Server += tc.serverSuffix
dcpClient, err := NewDCPClient(ctx,
"fakeFeedID",
func(sgbucket.FeedEvent) bool { return true },
DCPClientOptions{MetadataStoreType: DCPMetadataStoreInMemory},
gocbv2Bucket)
Expand Down
11 changes: 7 additions & 4 deletions base/dcp_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func makeFeedEvent(key []byte, value []byte, dataType uint8, cas uint64, expiry
return event
}

// Create a prefix that will be used to create the dcp stream name, which must be globally unique
// GenerateDcpStreamName creates a prefix that will be used to create the dcp stream name, which must be globally unique
// in order to avoid https://issues.couchbase.com/browse/MB-24237. It's also useful to have the Sync Gateway
// version number / commit for debugging purposes
func GenerateDcpStreamName(feedID string) (string, error) {
Expand All @@ -270,12 +270,15 @@ func GenerateDcpStreamName(feedID string) (string, error) {

commitTruncated := StringPrefix(GitCommit, 7)

return fmt.Sprintf(
feedName := fmt.Sprintf(
"%v-v-%v-commit-%v-uuid-%v",
feedID,
ProductAPIVersion,
commitTruncated,
u.String(),
), nil

)
if len(feedName) > 200 {
return "", fmt.Errorf("Generated DCP feed name is too long: %d characters. Max length is 200 characters. Generated name: %s", len(feedName), feedName)
}
return feedName, nil
}
24 changes: 14 additions & 10 deletions base/gocb_dcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var ErrVbUUIDMismatch = errors.New("VbUUID mismatch when failOnRollback set")

type GoCBDCPClient struct {
ctx context.Context
ID string // unique ID for DCPClient - used for DCP stream name, must be unique
dcpStreamName string // DCP stream name, must be unique
agent *gocbcore.DCPAgent // SDK DCP agent, manages connections and calls back to DCPClient stream observer implementation
callback sgbucket.FeedEventCallbackFunc // Callback invoked on DCP mutations/deletions
workers []*DCPWorker // Workers for concurrent processing of incoming mutations and callback. vbuckets are partitioned across workers
Expand All @@ -68,6 +68,7 @@ type GoCBDCPClient struct {
}

type DCPClientOptions struct {
FeedID string // Optional description for a DCP feed
NumWorkers int
OneShot bool
FailOnRollback bool // When true, the DCP client will terminate on DCP rollback
Expand All @@ -80,17 +81,17 @@ type DCPClientOptions struct {
CheckpointPrefix string
}

func NewDCPClient(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*GoCBDCPClient, error) {
func NewDCPClient(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket) (*GoCBDCPClient, error) {

numVbuckets, err := bucket.GetMaxVbno()
if err != nil {
return nil, fmt.Errorf("Unable to determine maxVbNo when creating DCP client: %w", err)
}

return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets)
return newDCPClientWithForBuckets(ctx, callback, options, bucket, numVbuckets)
}

func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {
func newDCPClientWithForBuckets(ctx context.Context, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {

numWorkers := DefaultNumWorkers
if options.NumWorkers > 0 {
Expand All @@ -105,12 +106,16 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke
return nil, fmt.Errorf("callers must specify a checkpoint prefix when persisting metadata")
}
}
dcpStreamName, err := GenerateDcpStreamName(options.FeedID)
if err != nil {
return nil, fmt.Errorf("error generating DCP stream name: %w", err)
}
client := &GoCBDCPClient{
ctx: ctx,
dcpStreamName: dcpStreamName,
workers: make([]*DCPWorker, numWorkers),
numVbuckets: numVbuckets,
callback: callback,
ID: ID,
spec: bucket.GetSpec(),
supportsCollections: bucket.IsSupported(sgbucket.BucketStoreFeatureCollections),
terminator: make(chan bool),
Expand All @@ -128,12 +133,11 @@ func newDCPClientWithForBuckets(ctx context.Context, ID string, callback sgbucke
client.activeVbuckets[vbNo] = struct{}{}
}

checkpointPrefix := fmt.Sprintf("%s:%v", client.checkpointPrefix, ID)
switch options.MetadataStoreType {
case DCPMetadataStoreCS:
// TODO: Change GetSingleDataStore to a metadata Store?
metadataStore := bucket.DefaultDataStore()
client.metadata = NewDCPMetadataCS(ctx, metadataStore, numVbuckets, numWorkers, checkpointPrefix)
client.metadata = NewDCPMetadataCS(ctx, metadataStore, numVbuckets, numWorkers, options.CheckpointPrefix)
case DCPMetadataStoreInMemory:
client.metadata = NewDCPMetadataMem(numVbuckets)
default:
Expand Down Expand Up @@ -367,7 +371,7 @@ func (dc *GoCBDCPClient) initAgent(spec BucketSpec) error {
flags := memd.DcpOpenFlagProducer
flags |= memd.DcpOpenFlagIncludeXattrs
var agentErr error
dc.agent, agentErr = gocbcore.CreateDcpAgent(agentConfig, dc.ID, flags)
dc.agent, agentErr = gocbcore.CreateDcpAgent(agentConfig, dc.dcpStreamName, flags)
if agentErr != nil {
return fmt.Errorf("Unable to start DCP client - error creating agent: %w", agentErr)
}
Expand Down Expand Up @@ -670,8 +674,8 @@ func (dc *GoCBDCPClient) StartWorkersForTest(t *testing.T) {
}

// NewDCPClientForTest is a test-only function to create a DCP client with a specific number of vbuckets.
func NewDCPClientForTest(ctx context.Context, t *testing.T, ID string, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {
return newDCPClientWithForBuckets(ctx, ID, callback, options, bucket, numVbuckets)
func NewDCPClientForTest(ctx context.Context, t *testing.T, callback sgbucket.FeedEventCallbackFunc, options DCPClientOptions, bucket *GocbV2Bucket, numVbuckets uint16) (*GoCBDCPClient, error) {
return newDCPClientWithForBuckets(ctx, callback, options, bucket, numVbuckets)
}

var _ gocbcore.StreamObserver = &GoCBDCPClient{}
2 changes: 1 addition & 1 deletion base/gocb_dcp_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName stri
CollectionIDs: collectionIDs,
AgentPriority: gocbcore.DcpAgentPriorityMed,
CheckpointPrefix: args.CheckpointPrefix,
FeedID: args.ID,
}

if args.Backfill == sgbucket.FeedNoBackfill {
Expand All @@ -113,7 +114,6 @@ func StartGocbDCPFeed(ctx context.Context, bucket *GocbV2Bucket, bucketName stri

dcpClient, err := NewDCPClient(
ctx,
feedName,
callback,
options,
bucket)
Expand Down
Loading
Loading