-
Notifications
You must be signed in to change notification settings - Fork 127
[mongo] parallel snapshot for ObjectID key #3293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements parallel snapshot support for MongoDB ObjectID keys by adding partitioning functionality to the MongoDB connector. The implementation allows for parallel data extraction when a WatermarkColumn is specified, falling back to full table partition mode otherwise.
- Adds ObjectID partition range comparison support in the utils package
- Implements MongoDB-specific partitioning using MongoDB's
$bucketAuto
aggregation - Updates logging consistency across PostgreSQL and MySQL connectors
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
File | Description |
---|---|
flow/connectors/utils/partition.go | Adds ObjectID comparison logic for partition ranges |
flow/connectors/postgres/qrep.go | Updates log message to include "[postgres]" prefix |
flow/connectors/mysql/qrep.go | Standardizes partition calculation and fixes typo |
flow/connectors/mongo/qrep.go | Implements parallel partitioning with ObjectID support |
flow/connectors/utils/partition.go
Outdated
@@ -98,6 +99,30 @@ func comparePartitionRanges( | |||
return c | |||
} | |||
return cmp.Compare(prevTuple.OffsetNumber, currTuple.OffsetNumber) | |||
// we can compare ObjectIDs, but not sure if doing this is correct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment expresses uncertainty about the correctness of the implementation. Either remove the comment if the implementation is correct, or address the uncertainty with proper validation.
// we can compare ObjectIDs, but not sure if doing this is correct | |
// Comparing ObjectIDs using bytes.Compare is valid because ObjectIDs are 12-byte values | |
// that can be compared lexicographically to determine their order. |
Copilot uses AI. Check for mistakes.
c7c9c60
to
1916bc8
Compare
flow/connectors/mysql/qrep.go
Outdated
if totalRows%numRowsPerPartition != 0 { | ||
numPartitions++ | ||
} | ||
adjustedPartitions := shared.AdjustNumPartitions(totalRows, numRowsPerPartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc, is the main difference here limiting maxPartitions
to 1000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, and adjusting numRowsPerPartition accordingly
@@ -356,8 +341,6 @@ func (p *PartitionHelper) getPartitionForStartAndEnd(start any, end any) (*proto | |||
return createTimePartition(v, end.(time.Time)), nil | |||
case pgtype.TID: | |||
return createTIDPartition(v, end.(pgtype.TID)), nil | |||
case bson.ObjectID: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we not want to use partition helper for Mongo for consistency across sources? also thinking if we ever want to partition on other fields other than _id (i.e. with flattened mode in the future), but not a big deal, we can add later too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionHelper
can be used; it's just a bit cludgy to use it here (bson.ObjectID
is not the type stored in protobufs and the helper tends to use types to figure out the type of range, which is string
)
It doesn't give us any advantage here since we don't coalesce partitions together based on ObjectID range (could be done down the line)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, few nits
b62157a
to
33a9a53
Compare
Need to give
WatermarkColumn
as input, otherwise falls back to full table partition.Only
ObjectID
partitioning is supported, so it is intended for use with_id
but should be general.Also added "adjusted partitions" logic to MySQL
TODO testing