Bug description
using mongoItemWriter on the sharded cluster throws the following error.
org.springframework.data.mongodb.BulkOperationException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}]. ; nested exception is com.mongodb.MongoBulkWriteException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}].
at org.springframework.data.mongodb.core.DefaultBulkOperations.bulkWriteTo(DefaultBulkOperations.java:324) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:560) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
at org.springframework.data.mongodb.core.DefaultBulkOperations.execute(DefaultBulkOperations.java:290) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
at org.springframework.batch.item.data.MongoItemWriter.saveOrUpdate(MongoItemWriter.java:169) ~[main/:na]
at org.springframework.batch.item.data.MongoItemWriter.doWrite(MongoItemWriter.java:138) ~[main/:na]
at org.springframework.batch.item.data.MongoItemWriter$1.beforeCommit(MongoItemWriter.java:198) ~[main/:na]
I found the reason is we couldn't specify 'shard key' by using MongoItemWriter because of it's saveOrUpdate.
private void saveOrUpdate(List<? extends T> items) {
BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
MongoConverter mongoConverter = this.template.getConverter();
FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
for (Object item : items) {
Document document = new Document();
mongoConverter.write(item, document);
Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
// here
Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId));
bulkOperations.replaceOne(query, document, upsert);
}
bulkOperations.execute();
}
As far as I know, to use Query in sharded MongoDB Cluster, we should specify shard key by using addCriteria as below.
private void saveOrUpdate(List<? extends T> items) {
BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
MongoConverter mongoConverter = this.template.getConverter();
FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
for (Object item : items) {
Document document = new Document();
mongoConverter.write(item, document);
Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
// here
Query query = new Query()
.addCriteria(Criteria.where(ID_KEY).is(objectId))
.addCriteria(Criteria.where(SHARD_KEY).is(keyvalue));
bulkOperations.replaceOne(query, document, upsert);
}
bulkOperations.execute();
}
However, I'm not sure what is the best way to handle this problem since mongodb's shard key could be multiple.
p.s - for save, just using insertwill be enough if user specify shard key on their @Document by @Sharded annotation,
private void save(List<? extends T> items) {
BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
bulkOperations.insert(items);
bulkOperations.execute();
}
Environment
- spring-batch 4.3 & 5.0
- MongoDB sharded cluster (4.2)
- Kotlin (java 17)
Steps to reproduce
- use mongoItemWriter on the sharded cluster
@Bean
@StepScope
fun mongoItemWriter(mongoOperations: MongoOperations): MongoItemWriter<MongoItem> {
return MongoItemWriterBuilder<MongoItem>()
.collection("mongoItem")
.template(mongoOperations)
.build()
}
@Document("mongoItem")
@Sharded(shardKey = ["shard_key"], shardingStrategy = ShardingStrategy.HASH)
data class MongoItem(
@Id
var id: String? = null,
@Field(name = "shard_key")
val shardKey: Long,
val source: String,
)
Expected behavior
- writes data on MongoDB without any error.
Bug description
using mongoItemWriter on the sharded cluster throws the following error.
I found the reason is we couldn't specify 'shard key' by using
MongoItemWriterbecause of it'ssaveOrUpdate.As far as I know, to use
Queryin sharded MongoDB Cluster, we should specify shard key by usingaddCriteriaas below.However, I'm not sure what is the best way to handle this problem since mongodb's shard key could be multiple.
p.s - for
save, just usinginsertwill be enough if user specify shard key on their@Documentby@Shardedannotation,Environment
Steps to reproduce
@Bean @StepScope fun mongoItemWriter(mongoOperations: MongoOperations): MongoItemWriter<MongoItem> { return MongoItemWriterBuilder<MongoItem>() .collection("mongoItem") .template(mongoOperations) .build() }Expected behavior