Skip to content

[mongo] parallel snapshot for ObjectID key #3293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 6, 2025

Conversation

heavycrystal
Copy link
Contributor

@heavycrystal heavycrystal commented Jul 30, 2025

Need to give WatermarkColumn as input, otherwise falls back to full table partition.
Only ObjectID partitioning is supported, so it is intended for use with _id but should be general.

Also added "adjusted partitions" logic to MySQL

TODO testing

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements parallel snapshot support for MongoDB ObjectID keys by adding partitioning functionality to the MongoDB connector. The implementation allows for parallel data extraction when a WatermarkColumn is specified, falling back to full table partition mode otherwise.

  • Adds ObjectID partition range comparison support in the utils package
  • Implements MongoDB-specific partitioning using MongoDB's $bucketAuto aggregation
  • Updates logging consistency across PostgreSQL and MySQL connectors

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
flow/connectors/utils/partition.go Adds ObjectID comparison logic for partition ranges
flow/connectors/postgres/qrep.go Updates log message to include "[postgres]" prefix
flow/connectors/mysql/qrep.go Standardizes partition calculation and fixes typo
flow/connectors/mongo/qrep.go Implements parallel partitioning with ObjectID support

@@ -98,6 +99,30 @@ func comparePartitionRanges(
return c
}
return cmp.Compare(prevTuple.OffsetNumber, currTuple.OffsetNumber)
// we can compare ObjectIDs, but not sure if doing this is correct
Copy link
Preview

Copilot AI Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment expresses uncertainty about the correctness of the implementation. Either remove the comment if the implementation is correct, or address the uncertainty with proper validation.

Suggested change
// we can compare ObjectIDs, but not sure if doing this is correct
// Comparing ObjectIDs using bytes.Compare is valid because ObjectIDs are 12-byte values
// that can be compared lexicographically to determine their order.

Copilot uses AI. Check for mistakes.

@heavycrystal heavycrystal force-pushed the mongo-parallel-snapshot branch from c7c9c60 to 1916bc8 Compare July 31, 2025 15:59
@heavycrystal heavycrystal marked this pull request as ready for review August 1, 2025 17:16
if totalRows%numRowsPerPartition != 0 {
numPartitions++
}
adjustedPartitions := shared.AdjustNumPartitions(totalRows, numRowsPerPartition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc, is the main difference here limiting maxPartitions to 1000?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and adjusting numRowsPerPartition accordingly

@@ -356,8 +341,6 @@ func (p *PartitionHelper) getPartitionForStartAndEnd(start any, end any) (*proto
return createTimePartition(v, end.(time.Time)), nil
case pgtype.TID:
return createTIDPartition(v, end.(pgtype.TID)), nil
case bson.ObjectID:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not want to use partition helper for Mongo for consistency across sources? also thinking if we ever want to partition on other fields other than _id (i.e. with flattened mode in the future), but not a big deal, we can add later too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionHelper can be used; it's just a bit cludgy to use it here (bson.ObjectID is not the type stored in protobufs and the helper tends to use types to figure out the type of range, which is string)

It doesn't give us any advantage here since we don't coalesce partitions together based on ObjectID range (could be done down the line)

Copy link
Contributor

@jgao54 jgao54 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, few nits

@heavycrystal heavycrystal force-pushed the mongo-parallel-snapshot branch from b62157a to 33a9a53 Compare August 5, 2025 19:11
@heavycrystal heavycrystal merged commit 2158aaa into main Aug 6, 2025
13 of 17 checks passed
@heavycrystal heavycrystal deleted the mongo-parallel-snapshot branch August 6, 2025 19:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants