Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public boolean isAtLeastFiveDotZero() {
return getMaxWireVersion() >= 12;
}

public boolean isAtLeastSevernDotZero() {
return getMaxWireVersion() >= 21;
}

private int getMaxWireVersion() {
MongoClient mongoClient = HELPER.getMongoClient();
ClusterType clusterType = mongoClient.getClusterDescription().getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public class AutoBucketPartitionerTest extends PartitionerTestCase {
@Override
List<String> defaultReadConfigOptions() {
return asList(
ReadConfig.PARTITIONER_CONFIG,
PARTITIONER.getClass().getName(),
ReadConfig.PARTITIONER_OPTIONS_PREFIX + PARTITION_CHUNK_SIZE_MB_CONFIG,
"1",
ReadConfig.PARTITIONER_OPTIONS_PREFIX + SamplePartitioner.SAMPLES_PER_PARTITION_CONFIG,
"10");
"1");
}

@Test
Expand Down Expand Up @@ -178,6 +178,7 @@ void testCreatesExpectedPartitionsWithUsersPipeline() {

@Test
void testUsingCompoundPartitionFieldThatContainsDuplicates() {
assumeTrue(isAtLeastSevernDotZero());
ReadConfig readConfig = createReadConfig(
"compound", PARTITIONER_OPTIONS_PREFIX + PARTITION_FIELD_LIST_CONFIG, "pk,dups");
loadSampleData(250, 10, readConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class ReadConfig extends AbstractMongoConfig {
* @see #PARTITIONER_CONFIG
*/
public static final String PARTITIONER_DEFAULT =
"com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner";
"com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner";

/**
* The prefix for specific partitioner based configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.Field;
import com.mongodb.client.model.Filters;
import com.mongodb.connection.ServerDescription;
import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.MongoConfig;
import com.mongodb.spark.sql.connector.config.ReadConfig;
Expand All @@ -41,7 +42,7 @@
/**
* Auto Bucket Partitioner
*
* <p>A $sample based partitioner that provides support for all collection types.
* <p>A sample based partitioner that provides support for all collection types.
* Supports partitioning across single or multiple fields, including nested fields.
*
* <p>The logic for the partitioner is as follows:</p>
Expand Down Expand Up @@ -100,6 +101,7 @@ public final class AutoBucketPartitioner implements Partitioner {
private static final String ID = "_id";
private static final String MIN = "min";
private static final String MAX = "max";
private static final int SEVEN_DOT_ZERO_WIRE_VERSION = 21;

public static final String PARTITION_FIELD_LIST_CONFIG = "fieldList";
private static final List<String> PARTITION_FIELD_LIST_DEFAULT = singletonList(ID);
Expand Down Expand Up @@ -189,6 +191,16 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

Integer serverMaxWireVersion =
readConfig.withClient(c -> c.getClusterDescription().getServerDescriptions().stream()
.map(ServerDescription::getMaxWireVersion)
.max(Integer::compare)
.orElse(0));
if (serverMaxWireVersion < SEVEN_DOT_ZERO_WIRE_VERSION) {
LOGGER.warn(
"Note: The AutoBucketPartitioner requires MongoDB 7.0 or greater, if the dataset contains documents with duplicated keys.");
}

return createMongoInputPartitions(
buckets,
readConfig.getAggregationPipeline(),
Expand Down