Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions base/collection_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package base

import (
"errors"
"slices"

sgbucket "github.com/couchbase/sg-bucket"
)
Expand All @@ -20,6 +21,9 @@ var ErrCollectionsUnsupported = errors.New("collections not supported")

type ScopeAndCollectionName = sgbucket.DataStoreNameImpl

// CollectionNames represent a map of scope names to collection names.
type CollectionNames map[string][]string

func DefaultScopeAndCollectionName() ScopeAndCollectionName {
return ScopeAndCollectionName{Scope: DefaultScope, Collection: DefaultCollection}
}
Expand All @@ -45,3 +49,24 @@ func (s ScopeAndCollectionNames) ScopeAndCollectionNames() []string {
func FullyQualifiedCollectionName(bucketName, scopeName, collectionName string) string {
return bucketName + "." + scopeName + "." + collectionName
}

// Add adds the provided collections to map. This does not deduplicate collections.
func (c CollectionNames) Add(ds ...sgbucket.DataStoreName) {
for _, d := range ds {
if _, ok := c[d.ScopeName()]; !ok {
c[d.ScopeName()] = []string{d.CollectionName()}
} else {
c[d.ScopeName()] = append(c[d.ScopeName()], d.CollectionName())
}
}
}

// NewCollectionNames creates a new CollectionNames from specified collections. Does not deduplicate collections.
func NewCollectionNames(ds ...sgbucket.DataStoreName) CollectionNames {
c := make(CollectionNames, 1)
c.Add(ds...)
for _, collections := range c {
slices.Sort(collections)
}
return c
}
14 changes: 8 additions & 6 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type CbgtContext struct {

// StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket.
// dbName is used to define a unique path name for local file storage of pindex files
func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg) (*CbgtContext, error) {
func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg) (*CbgtContext, error) {
// Ensure we don't try to start collections-enabled feed if there are any pre-collection SG nodes in the cluster.
minVersion, err := getMinNodeVersion(cfg)
if err != nil {
Expand All @@ -76,13 +76,15 @@ func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string,
}
}

cbgtContext, err := initCBGTManager(ctx, bucket, spec, cfg, uuid, dbName)
b, err := AsGocbV2Bucket(bucket)
if err != nil {
return nil, err
return nil, fmt.Errorf("error asserting bucket as gocb v2 bucket: %w", err)
}

// Add logging info before passing ctx down
ctx = CorrelationIDLogCtx(ctx, DCPImportFeedID)
cbgtContext, err := initCBGTManager(ctx, bucket, b.GetSpec(), cfg, uuid, dbName)
if err != nil {
return nil, err
}

// Start Manager. Registers this node in the cfg
err = cbgtContext.StartManager(ctx, dbName, configGroup, bucket, scope, collections, numPartitions)
Expand Down Expand Up @@ -457,7 +459,7 @@ func (c *CbgtContext) RemoveFeedCredentials(dbName string) {
// CBG-4394: removing root certs for the bucket should be done, but it is keyed based on the bucket UUID, and multiple dbs can use the same bucket
}

// Format of dest key for retrieval of import dest from cbgtDestFactories
// ImportDestKey is used for retrieval of import dest from cbgtDestFactories
func ImportDestKey(dbName string, scope string, collections []string) string {
sort.Strings(collections)
collectionString := ""
Expand Down
7 changes: 5 additions & 2 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,11 @@ func CreateBucketScopesAndCollections(ctx context.Context, bucketSpec BucketSpec
cm := cluster.Bucket(bucketSpec.BucketName).Collections()

for scopeName, collections := range scopes {
if err := cm.CreateScope(scopeName, nil); err != nil && !errors.Is(err, gocb.ErrScopeExists) {
return fmt.Errorf("failed to create scope %s: %w", scopeName, err)
// CreateScope will return an ambiguous error if the _default scope already exists, expect it always exists
if scopeName != DefaultScope {
if err := cm.CreateScope(scopeName, nil); err != nil && !errors.Is(err, gocb.ErrScopeExists) {
return fmt.Errorf("failed to create scope %s: %w", scopeName, err)
}
}
DebugfCtx(ctx, KeySGTest, "Created scope %s", scopeName)
for _, collectionName := range collections {
Expand Down
75 changes: 39 additions & 36 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ResyncManagerDCP struct {
ResyncID string
VBUUIDs []uint64
useXattrs bool
ResyncedCollections map[string][]string
ResyncedCollections base.CollectionNames
resyncCollectionInfo
lock sync.RWMutex
}
Expand All @@ -42,9 +42,6 @@ type resyncCollectionInfo struct {
collectionIDs []uint32
}

// ResyncCollections contains map of scope names with collection names against which resync needs to run
type ResyncCollections map[string][]string

var _ BackgroundManagerProcessI = &ResyncManagerDCP{}

func NewResyncManagerDCP(metadataStore base.DataStore, useXattrs bool, metaKeys *base.MetadataKeys) *BackgroundManager {
Expand All @@ -60,9 +57,16 @@ func NewResyncManagerDCP(metadataStore base.DataStore, useXattrs bool, metaKeys
}
}

// Init processes the options to start a resync process and sets them as struct memebers.
func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clusterStatus []byte) error {
db := options["database"].(*Database)
resyncCollections := options["collections"].(ResyncCollections)
db, ok := options["database"].(*Database)
if !ok {
return errors.New("database option is required and must be of type *Database")
}
resyncCollections, ok := options["collections"].(base.CollectionNames)
if !ok {
return errors.New("collections option is required and must be of type base.CollectionNames")
}

// Get collectionIds and store in manager for use in DCP client later
collectionIDs, hasAllCollections, collectionNames, err := getCollectionIdsAndNames(db, resyncCollections)
Expand Down Expand Up @@ -105,10 +109,20 @@ func (r *ResyncManagerDCP) Init(ctx context.Context, options map[string]any, clu
return nil
}

// Run starts a DCP feed to process documents for resync.
func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, persistClusterStatusCallback updateStatusCallbackFunc, terminator *base.SafeTerminator) error {
db := options["database"].(*Database)
regenerateSequences := options["regenerateSequences"].(bool)
resyncCollections := options["collections"].(ResyncCollections)
db, ok := options["database"].(*Database)
if !ok {
return errors.New("database option is required and must be of type *Database")
}
regenerateSequences, ok := options["regenerateSequences"].(bool)
if !ok {
return errors.New("regenerateSequences option is required and must be of type bool")
}
resyncCollections, ok := options["collections"].(base.CollectionNames)
if !ok {
return errors.New("collections option is required and must be of type CollectionNames")
}

resyncLoggingID := "Resync: " + r.ResyncID

Expand Down Expand Up @@ -286,40 +300,28 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers
return nil
}

func getCollectionIdsAndNames(db *Database, resyncCollections ResyncCollections) ([]uint32, bool, map[string][]string, error) {
collectionIDs := make([]uint32, 0)
var hasAllCollections bool
scopeAndCollection := make(map[string][]string)

// getCollectionIdsAndNames returns collection names. If no collections are specified, it returns all collections. The
// ids for all collections are returned.
func getCollectionIdsAndNames(db *Database, resyncCollections base.CollectionNames) (collectionIDs []uint32, hasAllCollections bool, collectionNames base.CollectionNames, err error) {
if len(resyncCollections) == 0 {
hasAllCollections = true
for collectionID := range db.CollectionByID {
collectionIDs = append(collectionIDs, collectionID)
}
for scopeName, collectionNames := range db.CollectionNames {
var resyncCollectionNames []string
for collName := range collectionNames {
resyncCollectionNames = append(resyncCollectionNames, collName)
}
scopeAndCollection[scopeName] = resyncCollectionNames
}
} else {
hasAllCollections = false

for scopeName, collectionsName := range resyncCollections {
var resyncCollectionNames []string
for _, collectionName := range collectionsName {
collection, err := db.GetDatabaseCollection(scopeName, collectionName)
if err != nil {
return nil, hasAllCollections, nil, fmt.Errorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact())
}
collectionIDs = append(collectionIDs, collection.GetCollectionID())
resyncCollectionNames = append(resyncCollectionNames, collectionName)
return collectionIDs, hasAllCollections, db.collectionNames(), nil
}
hasAllCollections = false

for scopeName, collectionsName := range resyncCollections {
for _, collectionName := range collectionsName {
collection, err := db.GetDatabaseCollection(scopeName, collectionName)
if err != nil {
return nil, hasAllCollections, nil, fmt.Errorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact())
}
scopeAndCollection[scopeName] = resyncCollectionNames
collectionIDs = append(collectionIDs, collection.GetCollectionID())
}
}
return collectionIDs, hasAllCollections, scopeAndCollection, nil
return collectionIDs, hasAllCollections, resyncCollections, nil
}

func (r *ResyncManagerDCP) ResetStatus() {
Expand All @@ -339,7 +341,8 @@ func (r *ResyncManagerDCP) SetStatus(docChanged, docProcessed int64) {
r.DocsProcessed.Set(docProcessed)
}

func (r *ResyncManagerDCP) SetCollectionStatus(collectionNames map[string][]string) {
// SetCollectionStatus sets the active collection names being resynced.
func (r *ResyncManagerDCP) SetCollectionStatus(collectionNames base.CollectionNames) {
r.lock.Lock()
defer r.lock.Unlock()

Expand Down
27 changes: 9 additions & 18 deletions db/background_mgr_resync_dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestResyncDCPInit(t *testing.T) {

options := make(map[string]any)
options["database"] = db
options["collections"] = ResyncCollections{}
options["collections"] = base.NewCollectionNames()
if testCase.forceReset {
options["reset"] = true
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestResyncManagerDCPStopInMidWay(t *testing.T) {
options := map[string]any{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"collections": base.NewCollectionNames(),
}

err := db.ResyncManager.Start(ctx, options)
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestResyncManagerDCPStart(t *testing.T) {
options := map[string]any{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"collections": base.NewCollectionNames(),
}
require.NoError(t, db.ResyncManager.Start(ctx, options))
stats := waitForResyncState(t, db, BackgroundProcessStateCompleted)
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestResyncManagerDCPStart(t *testing.T) {
options := map[string]any{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"collections": base.NewCollectionNames(),
}

err := db.ResyncManager.Start(ctx, options)
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestResyncManagerDCPRunTwice(t *testing.T) {
options := map[string]any{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"collections": base.NewCollectionNames(),
}

err := db.ResyncManager.Start(ctx, options)
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) {
options := map[string]any{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"collections": base.NewCollectionNames(),
}

err := db.ResyncManager.Start(ctx, options)
Expand Down Expand Up @@ -419,11 +419,7 @@ func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) {
options := map[string]any{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{
dbCollections[0].ScopeName: []string{
dbCollections[0].Name,
},
},
"collections": base.NewCollectionNames(dbCollections[0].dataStore),
}

err := db.ResyncManager.Start(ctx, options)
Expand Down Expand Up @@ -453,12 +449,7 @@ func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) {
firstDocsChanged := stats.DocsChanged

require.GreaterOrEqual(t, len(dbCollections), 2)
options["collections"] = ResyncCollections{
dbCollections[0].ScopeName: []string{
dbCollections[0].Name,
dbCollections[1].Name,
},
}
options["collections"] = db.collectionNames()

// Resume process
err = db.ResyncManager.Start(ctx, options)
Expand Down Expand Up @@ -606,7 +597,7 @@ func runResync(t *testing.T, ctx context.Context, db *Database, collection *Data
options := map[string]any{
"database": db,
"regenerateSequences": false,
"collections": ResyncCollections{},
"collections": base.NewCollectionNames(),
}

require.NoError(t, db.ResyncManager.Start(ctx, options))
Expand Down
20 changes: 20 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type DatabaseContext struct {
CachedCCVEnabled atomic.Bool // If set, the cached value of the CCV Enabled flag (this is not expected to transition from true->false, but could go false->true)
numVBuckets uint16 // Number of vbuckets in the bucket
SameSiteCookieMode http.SameSite

scopeName string // name of the single scope for the database
}

type Scope struct {
Expand Down Expand Up @@ -534,7 +536,11 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
syncFunctionsChanged := false
// Create new backing store map to map from collection ID's to their associated rev cache backing stores for rev cache document loads
collectionIDToRevCacheBackingStore := make(map[uint32]RevisionCacheBackingStore)
if len(options.Scopes) > 1 {
return nil, fmt.Errorf("Multiple scopes %v are not supported on a single database", maps.Keys(options.Scopes))
}
for scopeName, scope := range options.Scopes {
dbContext.scopeName = scopeName
dbContext.Scopes[scopeName] = Scope{
Collections: make(map[string]*DatabaseCollection, len(scope.Collections)),
}
Expand Down Expand Up @@ -2557,6 +2563,20 @@ func (db *DatabaseContext) EnableAllowConflicts(tb testing.TB) {
db.Options.AllowConflicts = base.Ptr(true)
}

// useShardedDCP returns true if the database supports sharded DCP feeds.
func (db *DatabaseContext) useShardedDCP() bool {
return base.IsEnterpriseEdition() && !db.usingRosmar()
}

// collectionNames returns the names of the collections on this database.
func (db *DatabaseContext) collectionNames() base.CollectionNames {
names := base.NewCollectionNames()
for _, col := range db.CollectionByID {
names.Add(col.dataStore)
}
return names
}

// GetSameSiteCookieMode returns the http.SameSite mode based on the unsupported database options. Returns an error if
// an invalid string is set.
func (o *UnsupportedOptions) GetSameSiteCookieMode() (http.SameSite, error) {
Expand Down
17 changes: 8 additions & 9 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4303,9 +4303,12 @@ func TestGetDatabaseCollectionWithUserDefaultCollection(t *testing.T) {
bucket := base.GetTestBucket(t)
defer bucket.Close(base.TestCtx(t))

ds, err := bucket.GetNamedDataStore(0)
require.NoError(t, err)
require.NotNil(t, ds)
ctx := base.TestCtx(t)
customCollection := base.NewScopeAndCollectionName(base.DefaultScope, "customCollection")
if !base.UnitTestUrlIsWalrus() {
require.NoError(t, base.CreateBucketScopesAndCollections(ctx, bucket.BucketSpec, base.NewCollectionNames(customCollection)))
defer assert.NoError(t, bucket.DropDataStore(customCollection))
}

testCases := []struct {
name string
Expand All @@ -4321,14 +4324,10 @@ func TestGetDatabaseCollectionWithUserDefaultCollection(t *testing.T) {
err: false,
options: DatabaseContextOptions{
Scopes: map[string]ScopeOptions{
ds.ScopeName(): ScopeOptions{
Collections: map[string]CollectionOptions{
ds.CollectionName(): {},
},
},
base.DefaultScope: ScopeOptions{
Collections: map[string]CollectionOptions{
base.DefaultCollection: {},
base.DefaultCollection: {},
customCollection.CollectionName(): {},
},
},
},
Expand Down
Loading
Loading