Skip to content

Comments

CBG-5153: Implement resync using CBGT for a single node#8057

Draft
RIT3shSapata wants to merge 7 commits intomainfrom
CBG-5153
Draft

CBG-5153: Implement resync using CBGT for a single node#8057
RIT3shSapata wants to merge 7 commits intomainfrom
CBG-5153

Conversation

@RIT3shSapata
Copy link
Contributor

CBG-5153

DRAFT PR

Describe your PR here...

  • Use bullet points if there's more than one thing changed

Pre-review checklist

  • Removed debug logging (fmt.Print, log.Print, ...)
  • Logging sensitive data? Make sure it's tagged (e.g. base.UD(docID), base.MD(dbName))
  • Updated relevant information in the API specifications (such as endpoint descriptions, schemas, ...) in docs/api

Dependencies (if applicable)

  • Link upstream PRs
  • Update Go module dependencies when merged

Integration Tests

@RIT3shSapata RIT3shSapata self-assigned this Feb 5, 2026
Copy link
Collaborator

@torcolvin torcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments before I look more, I think the test is passing right now because it starts the non distributed resync.

Copy link
Collaborator

@torcolvin torcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave a non exhaustive review for any cases where import was used for code, but I don't think I found all cases. Definitely give a look through the code pathways to see if you can find other locations.

// StartShardedDCPFeed initializes and starts a CBGT Manager targeting the provided bucket.
// dbName is used to define a unique path name for local file storage of pindex files
func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg) (*CbgtContext, error) {
func StartShardedDCPFeed(ctx context.Context, dbName string, configGroup string, uuid string, heartbeater Heartbeater, bucket Bucket, spec BucketSpec, scope string, collections []string, numPartitions uint16, cfg cbgt.Cfg, resyncIndex bool) (*CbgtContext, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could take DestType instead of a boolean parameter, which would be generally preferred since it would be more reasonable. The name DestType might not be appropriate, but it could be ShardedDCPFeedType instead?

It might be worth figuring out below what is actually needed to do to split on the feed type and whether to pass these arguments into this function or pass a type parameter. Review this comment after looking through the rest of the code and the comments.

@torcolvin torcolvin mentioned this pull request Feb 18, 2026
4 tasks

// Given a dbName, generate a unique and length-constrained index name for CBGT to use as part of their DCP name.
func GenerateIndexName(dbName string) string {
func GenerateIndexName(dbName string, feedID string) string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pull this into a separate ticket but I think that this value needs to be <200 characters and we need to shorten this.

We can't break code that might work for import but as we make the index name longer, we want to make sure that this code works if there is a long database name.

// standardMetadataID returns either the dbName or a base64 encoded SHA256 hash of the dbName, whichever is shorter.
is an example of how we handle this for a metadataID

I think creating a separate ticket is a good plan for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I mentioned in my comment earlier, I don't think it will ever exceed 34:
#8057 (comment)
we can pair up and discuss it further

// the only index defined, and the name is safe. In that case, continue using legacy index name
// to avoid restarting the import processing from zero
func dcpSafeIndexName(ctx context.Context, c *CbgtContext, dbName string) (safeIndexName, previousUUID string) {
func dcpSafeIndexName(ctx context.Context, c *CbgtContext, dbName string, feedType ShardedDCPFeedType, feedID string) (safeIndexName, previousUUID string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can pass feedType to both functions here, and through the callstack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not understand this, could you please elaborate more?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pass just feedType ShardedDCPFeedType? I think that the feedID should actually always be the same for resync, and doesn't need to be unique per run.

That was my mistake in the earlier specification.


base.StoreDestFactory(loggingCtx, resyncDestKey, resyncDestFunc)

base.InfofCtx(loggingCtx, base.KeyJavascript, "ResyncID: %s Starting DCP resync for bucket: %q ", resyncLoggingID, base.UD(bucket.GetName()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
base.InfofCtx(loggingCtx, base.KeyJavascript, "ResyncID: %s Starting DCP resync for bucket: %q ", resyncLoggingID, base.UD(bucket.GetName()))
base.InfofCtx(loggingCtx, base.KeyAll, "Resync: Starting DCP resync for bucket: %q ", base.UD(bucket.GetName()))

I believe that the loggingCtx should already contain the CorrelationID and so it doesn't need to be duplicated. You should make sure that is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loggingCtx does have correlationID field but it is empty in this scenario. On checking the code, UserLogCtx does not populate the correlationID. So I think this is necessary here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this is out of scope for fix in this PR, let me see about fixing this separately. We should still change the log key to be KeyAll and Resync: is redundant.

Comment on lines +249 to +250
// format: _sync:{m_$}:cfg[groupID:] (collections)
// format: _sync:cfg:[groupID:] (default)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// format: _sync:{m_$}:cfg[groupID:] (collections)
// format: _sync:cfg:[groupID:] (default)
// format: _sync:{m_$}:resync_cfg[groupID:] (collections)
// format: _sync:resync_cfg:[groupID:] (default)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants