Skip to content

Conversation

guillotjulien
Copy link
Contributor

@guillotjulien guillotjulien commented Feb 14, 2025

The new implementation stops relying on the storageStats property that is not being recognized as a valid property when using the $collStats aggregation operation with a Data Federation endpoint.
This end up making it impossible to use the SamplePartitioner, PaginateBySizePartitioner and AutoBucketPartitioner in that situation.

Example:

Caused by: com.mongodb.MongoCommandException: Command failed with error 9 (FailedToParse): '$collStats param 'storageStats' is not valid for Atlas Data Federation, correlationID = 182415f1c5629184818f0150' on server <REDACTED>. The full response is {"ok": 0, "errmsg": "$collStats param 'storageStats' is not valid for Atlas Data Federation, correlationID = 182415f1c5629184818f0150", "code": 9, "codeName": "FailedToParse"}
        at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
        at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:416)
        at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:340)
        at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
        at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
        at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
        at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:206)
        at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:119)
        at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:85)
        at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:75)
        at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:293)
        at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:233)
        at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:215)
        at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:356)
        at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:381)
        at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:355)
        at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:381)
        at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:354)
        at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:213)
        at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
        at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:218)
        at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:199)
        at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:194)
        at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:150)
        at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:44)
        at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
        at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:133)
        at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:90)
        at com.mongodb.client.internal.MongoIterableImpl.first(MongoIterableImpl.java:101)
        at com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.lambda$storageStats$0(PartitionerHelper.java:103)
        at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.withCollection(AbstractMongoConfig.java:164)
        at com.mongodb.spark.sql.connector.config.ReadConfig.withCollection(ReadConfig.java:45)
        at com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.storageStats(PartitionerHelper.java:99)
        ... 85 more

From what I could see, the storageStats property was only used to access avgObjSize, which can be computed from the size and number of documents of a collection.

When connected to a federated Mongo instance, stats are retrieved via the collStats command, whereas the $collStats aggregation operator is used for standard Mongo instances. This difference is due to the collStats command being faster, but deprecated starting from Mongo 6.2. However it doesn't seem to be deprecated for Data Federation as far as I can tell (from https://www.mongodb.com/docs/atlas/data-federation/supported-unsupported/diagnostic-commands/#collstats).

@rozza rozza self-requested a review February 25, 2025 16:36
@rozza
Copy link
Member

rozza commented Feb 25, 2025

Hi @guillotjulien,

Thanks for the PR. I have added a ticket SPARK-442 to track calculating the {{avgObjSize}} if not available.

@rozza rozza added the external label Mar 4, 2025
@rozza rozza marked this pull request as draft March 4, 2025 10:18
@rozza rozza marked this pull request as ready for review March 4, 2025 17:30
Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks worthwhile. I would recommend getting avgObjSize if available else calculating it.

@guillotjulien guillotjulien force-pushed the fix/data-federation-support branch from 453aa29 to bc92d09 Compare March 6, 2025 10:36
@guillotjulien guillotjulien requested a review from rozza March 6, 2025 10:36
@guillotjulien
Copy link
Contributor Author

guillotjulien commented Mar 6, 2025

I extracted the logic to get the average object size in PartitionerHelper so that all partitioners use the same logic.
We're using avgObjSize when available now, otherwise we compute it.

I retested locally, and SamplePartitioner can be used for both standard replicaset, and data federation endpoints.

@guillotjulien
Copy link
Contributor Author

Hi @rozza, any chance you'd have time to look at this again?

@rozza
Copy link
Member

rozza commented Apr 23, 2025

@guillotjulien apologies for the delay. Just an update I aim to do a patch release on the 6th of May and that will incorporate this.

At the moment there looks to be a compile error which I'll make sure is fixed before release.

@guillotjulien
Copy link
Contributor Author

@rozza yes sorry, I missed an import for BsonInt32 when applying the code on that repo. I had trouble running gradlew on my machine, but that's solved now.

After running it locally as per the README, I was able to build the connector and run the tests to completion. I'll push the fixed version (also had a couple of places in my code that Spotless reformatted).

…a Federation

The new implementation stops relying on the storageStats property not being recognized as a valid property when using the $collStats aggregation operation when using a Data Federation endpoint.
This end up making it impossible to use the SamplePartitioner, PaginateBySizePartitioner and AutoBucketPartitioner when using a Data Federation endpoint.

From what I could see, the storageStats property was only used to access avgObjSize, which can be computed from the size and number of documents of a collection.

When connected to a federated Mongo instance, stats are retrieved via the collStats command, whereas the $collStats aggregation operator is used for standard Mongo instances.
This difference is due to the collStats command being faster, but deprecated starting from Mongo 6.2. However it doesn't seem to be deprecated for Data Federation as far as I can tell.
@guillotjulien guillotjulien force-pushed the fix/data-federation-support branch from bc92d09 to adf635e Compare April 23, 2025 11:26
@rozza rozza mentioned this pull request Apr 24, 2025
@rozza
Copy link
Member

rozza commented Apr 24, 2025

Hi @guillotjulien I've opened #133 building upon this approach and hopefully simplifying the logic.

Ross

@rozza rozza closed this Apr 30, 2025
@rozza
Copy link
Member

rozza commented Apr 30, 2025

Closed as this work was the basis of #133

Many thanks @guillotjulien for getting this across the line

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants