Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.ArrayList;
import java.util.List;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.conversions.Bson;
import org.jetbrains.annotations.ApiStatus;
Expand Down Expand Up @@ -150,8 +149,7 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

double avgObjSizeInBytes =
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats);
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);

BsonDocument usersCollectionFilter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.mongodb.spark.sql.connector.read.MongoInputPartition;
import java.util.List;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.jetbrains.annotations.ApiStatus;

/**
Expand Down Expand Up @@ -55,7 +54,7 @@ public PaginateBySizePartitioner() {}
@Override
public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig) {
MongoConfig partitionerOptions = readConfig.getPartitionerOptions();
int partitionSizeBytes = Assertions.validateConfig(
int partitionSizeInBytes = Assertions.validateConfig(
partitionerOptions.getInt(PARTITION_SIZE_MB_CONFIG, PARTITION_SIZE_MB_DEFAULT),
i -> i > 0,
() ->
Expand All @@ -69,18 +68,17 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

double avgObjSizeInBytes =
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
if (avgObjSizeInBytes >= partitionSizeBytes) {
double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats);
if (avgObjSizeInBytes >= partitionSizeInBytes) {
LOGGER.warn(
"Average document size `{}` is greater than the partition size `{}`. Please increase the partition size."
+ "Returning a single partition.",
avgObjSizeInBytes,
partitionSizeBytes);
partitionSizeInBytes);
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

int numDocumentsPerPartition = (int) Math.floor(partitionSizeBytes / avgObjSizeInBytes);
int numDocumentsPerPartition = (int) Math.floor(partitionSizeInBytes / avgObjSizeInBytes);
BsonDocument matchQuery = PartitionerHelper.matchQuery(readConfig.getAggregationPipeline());
long count;
if (matchQuery.isEmpty() && storageStats.containsKey("count")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.mongodb.spark.sql.connector.read.partitioner;

import static com.mongodb.spark.sql.connector.read.partitioner.Partitioner.LOGGER;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

import com.mongodb.MongoCommandException;
Expand All @@ -31,14 +32,19 @@
import java.util.stream.Collectors;
import org.apache.spark.sql.connector.read.Scan;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonType;
import org.bson.BsonValue;

/** Partitioner helper class, contains various utility methods used by the partitioner instances. */
public final class PartitionerHelper {

private static final List<BsonDocument> COLL_STATS_AGGREGATION_PIPELINE =
singletonList(BsonDocument.parse("{'$collStats': {'storageStats': { } } }"));
private static final List<BsonDocument> COLL_STATS_AGGREGATION_PIPELINE = asList(
BsonDocument.parse("{'$collStats': {'storageStats': { } } }"),
BsonDocument.parse(
"{'$project': {'size': '$storageStats.size', 'count': '$storageStats.count' } }"));
private static final List<BsonDocument> COLL_STATS_DATA_FEDERATION_AGGREGATION_PIPELINE =
singletonList(BsonDocument.parse("{'$collStats': {'count': { } } }"));
private static final BsonDocument PING_COMMAND = BsonDocument.parse("{ping: 1}");
public static final Partitioner SINGLE_PARTITIONER = new SinglePartitionPartitioner();

Expand Down Expand Up @@ -101,21 +107,43 @@ public static List<BsonDocument> createPartitionPipeline(
public static BsonDocument storageStats(final ReadConfig readConfig) {
LOGGER.info("Getting collection stats for: {}", readConfig.getNamespace().getFullName());
try {
return readConfig
.withCollection(
coll -> Optional.ofNullable(coll.aggregate(COLL_STATS_AGGREGATION_PIPELINE)
.allowDiskUse(readConfig.getAggregationAllowDiskUse())
.comment(readConfig.getComment())
.first())
.orElseGet(BsonDocument::new))
.getDocument("storageStats", new BsonDocument());
} catch (RuntimeException ex) {
if (ex instanceof MongoCommandException
&& (ex.getMessage().contains("not found.")
|| ((MongoCommandException) ex).getCode() == 26)) {
return readConfig.withCollection(
coll -> Optional.ofNullable(coll.aggregate(COLL_STATS_AGGREGATION_PIPELINE)
.allowDiskUse(readConfig.getAggregationAllowDiskUse())
.comment(readConfig.getComment())
.first())
.orElseGet(BsonDocument::new));
} catch (MongoCommandException ex) {
if (ex.getMessage().contains("not found.") || ex.getCode() == 26) {
LOGGER.info("Could not find collection: {}", readConfig.getCollectionName());
return new BsonDocument();
}

// Atlas Data Federation does not support the storageStats property and requires
// special handling to return the federated collection stats.
if (ex.getMessage().contains("Data Federation") || ex.getCode() == 9) {
return storageStatsDataFederation(readConfig);
}
throw new MongoSparkException("Partitioner calling collStats command failed", ex);
} catch (RuntimeException ex) {
throw new MongoSparkException("Partitioner calling collStats command failed", ex);
}
}

private static BsonDocument storageStatsDataFederation(final ReadConfig readConfig) {
try {
return readConfig.withCollection(
coll -> Optional.ofNullable(coll.aggregate(COLL_STATS_DATA_FEDERATION_AGGREGATION_PIPELINE)
.allowDiskUse(readConfig.getAggregationAllowDiskUse())
.comment(readConfig.getComment())
.map(collStats -> collStats.append(
"size",
collStats
.getDocument("partition", new BsonDocument())
.getNumber("size", new BsonInt32(0))))
.first())
Copy link
Contributor

@guillotjulien guillotjulien Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with that approach is that when you have multiple partitions (S3 or Mongo clusters) backing up your federated collection, you'll only use the info from the first partition. Whereas the collStats command gives you the total size of all partitions.

Example:

[
  {
    ns: 'my_db.whatever',
    partition: {
      format: 'PARQUET',
      attributes: { hash: '26146' },
      size: 33190,
      source: 's3://whatever-bucket/_hash=26146/part-00006-f18185ed-bbfe-46d0-9772-d5fe80b2a1e8.c000.snappy.parquet?delimiter=%2F&region=eu-west-1'
    },
    count: 1
  },
  {
    ns: 'my_db.whatever',
    partition: {
      format: 'PARQUET',
      attributes: { hash: '26131' },
      size: 31093,
      source: 's3://whatever-bucket/_hash=26131/part-00005-28509ae5-f592-453b-a2e2-6e64bca01f27.c000.snappy.parquet?delimiter=%2F&region=eu-west-1'
    },
    count: 1
  },
  ...
]

I think this can be mitigated by summing the partition size and count of all partitions, then returning a BsonDocument from that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guillotjulien - according to the atlas docs

The following example shows $collStats syntax for retrieving the total number of documents in the partitions.

use s3Db
db.abc.aggregate([ {$collStats: {"count" : {} }} ])

Have you found that is not the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rozza it does indeed, but it gives you one document per partition as shown in the example above. So if you want the total, you'd need to compute the sum of counts of all partitions.

e.g.

[
  { $collStats: { "count" : {} } },
  { $group: { _id: null, totalCount: { $sum: "$count" }, totalSize: { $sum: "$partition.size" } }  }
]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guillotjulien I've added that to the latest commit.

.orElseGet(BsonDocument::new));
} catch (RuntimeException ex) {
throw new MongoSparkException("Partitioner calling collStats command failed", ex);
}
}
Expand All @@ -138,5 +166,21 @@ public static List<String> getPreferredLocations(final ReadConfig readConfig) {
.collect(Collectors.toList());
}

/**
* Returns the average document size in a collection, either using {@code avgObjSize}
* or calculated from document count and collection size.
*
* @param storageStats the storage stats of a collection
* @return the average document size in a collection
*/
public static double averageDocumentSize(final BsonDocument storageStats) {
if (storageStats.containsKey("avgObjSize")) {
return storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
}
double size = storageStats.getNumber("size", new BsonInt32(0)).doubleValue();
double count = storageStats.getNumber("count", new BsonInt32(0)).doubleValue();
return Math.floor(size / count);
}

private PartitionerHelper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.conversions.Bson;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -105,8 +104,7 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
count = readConfig.withCollection(coll ->
coll.countDocuments(matchQuery, new CountOptions().comment(readConfig.getComment())));
}
double avgObjSizeInBytes =
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats);
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);

if (numDocumentsPerPartition >= count) {
Expand Down