Open
Description
Bug description
using mongoItemWriter on the sharded cluster throws the following error.
org.springframework.data.mongodb.BulkOperationException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}]. ; nested exception is com.mongodb.MongoBulkWriteException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}].
at org.springframework.data.mongodb.core.DefaultBulkOperations.bulkWriteTo(DefaultBulkOperations.java:324) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:560) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
at org.springframework.data.mongodb.core.DefaultBulkOperations.execute(DefaultBulkOperations.java:290) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
at org.springframework.batch.item.data.MongoItemWriter.saveOrUpdate(MongoItemWriter.java:169) ~[main/:na]
at org.springframework.batch.item.data.MongoItemWriter.doWrite(MongoItemWriter.java:138) ~[main/:na]
at org.springframework.batch.item.data.MongoItemWriter$1.beforeCommit(MongoItemWriter.java:198) ~[main/:na]
I found the reason is we couldn't specify 'shard key' by using MongoItemWriter
because of it's saveOrUpdate
.
private void saveOrUpdate(List<? extends T> items) {
BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
MongoConverter mongoConverter = this.template.getConverter();
FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
for (Object item : items) {
Document document = new Document();
mongoConverter.write(item, document);
Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
// here
Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId));
bulkOperations.replaceOne(query, document, upsert);
}
bulkOperations.execute();
}
As far as I know, to use Query
in sharded MongoDB Cluster, we should specify shard key by using addCriteria
as below.
private void saveOrUpdate(List<? extends T> items) {
BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
MongoConverter mongoConverter = this.template.getConverter();
FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
for (Object item : items) {
Document document = new Document();
mongoConverter.write(item, document);
Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
// here
Query query = new Query()
.addCriteria(Criteria.where(ID_KEY).is(objectId))
.addCriteria(Criteria.where(SHARD_KEY).is(keyvalue));
bulkOperations.replaceOne(query, document, upsert);
}
bulkOperations.execute();
}
However, I'm not sure what is the best way to handle this problem since mongodb's shard key could be multiple.
p.s - for save
, just using insert
will be enough if user specify shard key on their @Document
by @Sharded
annotation,
private void save(List<? extends T> items) {
BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
bulkOperations.insert(items);
bulkOperations.execute();
}
Environment
- spring-batch 4.3 & 5.0
- MongoDB sharded cluster (4.2)
- Kotlin (java 17)
Steps to reproduce
- use mongoItemWriter on the sharded cluster
@Bean
@StepScope
fun mongoItemWriter(mongoOperations: MongoOperations): MongoItemWriter<MongoItem> {
return MongoItemWriterBuilder<MongoItem>()
.collection("mongoItem")
.template(mongoOperations)
.build()
}
@Document("mongoItem")
@Sharded(shardKey = ["shard_key"], shardingStrategy = ShardingStrategy.HASH)
data class MongoItem(
@Id
var id: String? = null,
@Field(name = "shard_key")
val shardKey: Long,
val source: String,
)
Expected behavior
- writes data on MongoDB without any error.