-
Notifications
You must be signed in to change notification settings - Fork 318
Auto Bucket Partitioner #120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
A new `$sample` based partitioner that provides support for all collection types. Supports partitioning across single or multiple fields, including nested fields. The logic for the partitioner is as follows: - Calculate the number of documents per partition. Runs a `$collStats` aggregation to get the average document size. - Determines the total count of documents. Uses the `$collStats` count or by running a `countDocuments` query if the user supplies their own `aggregation.pipeline` configuration. - Determines the number of partitions. Calculated as: `count / number of documents per partition` - Determines the number of documents to $sample. Calculated as: `samples per partition * number of partitions`. - Creates the aggregation pipeline to generate the partitions. ``` [{$match: <the $match stage of the users aggregation pipeline - iff the first stage is a $match>}, {$sample: <number of documents to $sample>}, {$addFields: {<partition key projection field>: {<'i': '$fieldList[i]' ...>}} // Only added iff fieldList.size() > 1 {$bucketAuto: { groupBy: <partition key projection field>, buckets: <number of partitions> } } ] ``` Configurations: - `fieldList`: The field list to be used for partitioning. Either a single field name or a list of comma separated fields. Defaults to: "_id". - `chunkSize`: The average size (MB) for each partition. Note: Uses the average document size to determine the number of documents per partition so partitions may not be even. Defaults to: 64. - `samplesPerPartition`: The number of samples to take per partition. Defaults to: 10. - `partitionKeyProjectionField`: The field name to use for a projected field that contains all the fields used to partition the collection. Defaults to: "__idx". Recommended to only change if there already is a "__idx" field in the document. Partitions are calculated as logical ranges. When using sharded clusters these will map closely to ranged chunks. When using with hashed shard keys these logical ranges require broadcast operations. Similar to the SamplePartitioner however uses the $bucketAuto aggregation stage to generate the partition bounds. SPARK-356
stIncMale
requested changes
Jul 8, 2024
src/integrationTest/java/com/mongodb/spark/sql/connector/mongodb/MongoSparkConnectorHelper.java
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
stIncMale
requested changes
Jul 9, 2024
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Outdated
Show resolved
Hide resolved
src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java
Show resolved
Hide resolved
…/AutoBucketPartitioner.java Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
stIncMale
approved these changes
Jul 10, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
A
$sample
based partitioner that provides support for all collection types. Supports partitioning across single or multiple fields, including nested fields.The logic for the partitioner is as follows:
$collStats
aggregation to get the average document size.$collStats
count or by running acountDocuments
query if the user supplies their ownaggregation.pipeline
configuration.count / number of documents per partition
samples per partition * number of partitions
.Configurations:
fieldList
: The field list to be used for partitioning.Either a single field name or a list of comma separated fields.
Defaults to: "_id".
chunkSize
: The average size (MB) for each partition.Note: Uses the average document size to determine the number of documents per partition so
partitions may not be even.
Defaults to: 64.
samplesPerPartition
: The number of samples to take per partition.Defaults to: 10.
partitionKeyProjectionField
: The field name to use for a projected field that contains all thefields used to partition the collection.
Defaults to: "__idx".
Recommended to only change if there already is a "__idx" field in the document.
Partitions are calculated as logical ranges. When using sharded clusters these will map closely to ranged chunks.
When using with hashed shard keys these logical ranges require broadcast operations.
Similar to the SamplePartitioner however uses the $bucketAuto aggregation stage to generate the partition bounds.
SPARK-356