Background:
I have a consumer (RdKafka.KafkaConsumer) that can take up to half an hour to process a single message (though the average is much lower (around 150ms per bulk). I set up my consumer with:
"max.poll.interval.ms": 86400000, // (24 hours),
"session.timeout.ms": 45000,
"enable.auto.offset.store": false,
"enable.auto.commit": true,
"partition.assignment.strategy": "cooperative-sticky", // I tried roundrobin as well,
"rebalance_cb": true
I manually store offsets when the processing of a message bulk is completed.
I'm using this.client.setDefaultConsumeTimeout(0); and I have a loop that calls client.consume(1, (err, messages) => {...}) in a loop until a set bulk size (1,000) is reached (due to #262).
When the bulk size is reached I await the processing of the bulk before entering the consume loop again. This is why I use such a high value for max.poll.interval.ms. Otherwise my consumer would encounter Local: Maximum application poll interval (max.poll.interval.ms) exceeded while it is properly processing messages.
I have 30 processes consuming from a topic with 30 partitions.
Problem:
I was deploying new code (unrelated to this problem) to my consumers using a rolling deployment strategy. During the deployment 2 new (with the new code) consumers got assigned 1 partition each and started working on messages that took a long time to process (220s and 430s). They seem to have missed one rebalance event and didn't rejoin the group like the other 28 new consumers. After they finished processing the messages they rejoined the group, but were assigned all of the partitions (each consumer got 15 partitions). I encountered this problem on every other deploy (likely related to whether a message with long processing time was being consumed at the time).
Is the rebalance process dependent on calling consume? Is this a bug in the library, or is this a necessary evil related to setting max.poll.interval.ms to a high value? I can provide some debug logs if necessary.
Background:
I have a consumer (
RdKafka.KafkaConsumer) that can take up to half an hour to process a single message (though the average is much lower (around 150ms per bulk). I set up my consumer with:I manually store offsets when the processing of a message bulk is completed.
I'm using
this.client.setDefaultConsumeTimeout(0);and I have a loop that callsclient.consume(1, (err, messages) => {...})in a loop until a set bulk size (1,000) is reached (due to #262).When the bulk size is reached I
awaitthe processing of the bulk before entering the consume loop again. This is why I use such a high value formax.poll.interval.ms. Otherwise my consumer would encounterLocal: Maximum application poll interval (max.poll.interval.ms) exceededwhile it is properly processing messages.I have 30 processes consuming from a topic with 30 partitions.
Problem:
I was deploying new code (unrelated to this problem) to my consumers using a rolling deployment strategy. During the deployment 2 new (with the new code) consumers got assigned 1 partition each and started working on messages that took a long time to process (220s and 430s). They seem to have missed one rebalance event and didn't rejoin the group like the other 28 new consumers. After they finished processing the messages they rejoined the group, but were assigned all of the partitions (each consumer got 15 partitions). I encountered this problem on every other deploy (likely related to whether a message with long processing time was being consumed at the time).
Is the rebalance process dependent on calling
consume? Is this a bug in the library, or is this a necessary evil related to settingmax.poll.interval.msto a high value? I can provide some debug logs if necessary.