Skip to content

Add support for sharding keys in MongoItemWriter #4282

Open
@soohyun0131-lee

Description

@soohyun0131-lee

Bug description

using mongoItemWriter on the sharded cluster throws the following error.

org.springframework.data.mongodb.BulkOperationException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}]. ; nested exception is com.mongodb.MongoBulkWriteException: Bulk write operation error on server x.x.x.x:50011. Write errors: [BulkWriteError{index=0, code=61, message='Failed to target upsert by query :: could not extract exact shard key', details={}}]. 
	at org.springframework.data.mongodb.core.DefaultBulkOperations.bulkWriteTo(DefaultBulkOperations.java:324) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
	at org.springframework.data.mongodb.core.MongoTemplate.execute(MongoTemplate.java:560) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
	at org.springframework.data.mongodb.core.DefaultBulkOperations.execute(DefaultBulkOperations.java:290) ~[spring-data-mongodb-3.3.3.jar:3.3.3]
	at org.springframework.batch.item.data.MongoItemWriter.saveOrUpdate(MongoItemWriter.java:169) ~[main/:na]
	at org.springframework.batch.item.data.MongoItemWriter.doWrite(MongoItemWriter.java:138) ~[main/:na]
	at org.springframework.batch.item.data.MongoItemWriter$1.beforeCommit(MongoItemWriter.java:198) ~[main/:na]

I found the reason is we couldn't specify 'shard key' by using MongoItemWriter because of it's saveOrUpdate.

private void saveOrUpdate(List<? extends T> items) {
    BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
    MongoConverter mongoConverter = this.template.getConverter();
    FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
    for (Object item : items) {
        Document document = new Document();
        mongoConverter.write(item, document);
        Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
        // here
        Query query = new Query().addCriteria(Criteria.where(ID_KEY).is(objectId));
        bulkOperations.replaceOne(query, document, upsert);
    }
    bulkOperations.execute();
}

As far as I know, to use Query in sharded MongoDB Cluster, we should specify shard key by using addCriteria as below.

private void saveOrUpdate(List<? extends T> items) {
    BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
    MongoConverter mongoConverter = this.template.getConverter();
    FindAndReplaceOptions upsert = new FindAndReplaceOptions().upsert();
    for (Object item : items) {
        Document document = new Document();
        mongoConverter.write(item, document);
        Object objectId = document.get(ID_KEY) != null ? document.get(ID_KEY) : new ObjectId();
        // here
        Query query = new Query()
            .addCriteria(Criteria.where(ID_KEY).is(objectId))
            .addCriteria(Criteria.where(SHARD_KEY).is(keyvalue));
        bulkOperations.replaceOne(query, document, upsert);
    }
    bulkOperations.execute();
}

However, I'm not sure what is the best way to handle this problem since mongodb's shard key could be multiple.

p.s - for save, just using insertwill be enough if user specify shard key on their @Document by @Sharded annotation,

private void save(List<? extends T> items) {
    BulkOperations bulkOperations = initBulkOperations(BulkMode.ORDERED, items.get(0));
    bulkOperations.insert(items);
    bulkOperations.execute();
}

Environment

  • spring-batch 4.3 & 5.0
  • MongoDB sharded cluster (4.2)
  • Kotlin (java 17)

Steps to reproduce

  • use mongoItemWriter on the sharded cluster
    @Bean
    @StepScope
    fun mongoItemWriter(mongoOperations: MongoOperations): MongoItemWriter<MongoItem> {
        return MongoItemWriterBuilder<MongoItem>()
            .collection("mongoItem")
            .template(mongoOperations)
            .build()
    }
@Document("mongoItem")
@Sharded(shardKey = ["shard_key"], shardingStrategy = ShardingStrategy.HASH)
data class MongoItem(
    @Id
    var id: String? = null,
    @Field(name = "shard_key")
    val shardKey: Long,
    val source: String,
)

Expected behavior

  • writes data on MongoDB without any error.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions