Skip to content

Commit

Permalink
Merge pull request #582 from tulios/fix-race-condition-when-resetting…
Browse files Browse the repository at this point in the history
…-offsets-v2

Only fetch for partitions with initialized offsets
  • Loading branch information
tulios authored Dec 9, 2019
2 parents 87c21ef + d33fc00 commit 6002347
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,34 @@ module.exports = class ConsumerGroup {
)

const leaders = keys(partitionsPerLeader)
const committedOffsets = this.offsetManager.committedOffsets()

for (const leader of leaders) {
const partitions = partitionsPerLeader[leader].map(partition => ({
partition,
fetchOffset: this.offsetManager.nextOffset(topicPartition.topic, partition).toString(),
maxBytes: maxBytesPerPartition,
}))
const partitions = partitionsPerLeader[leader]
.filter(partition => {
/**
* When recovering from OffsetOutOfRange, each partition can recover
* concurrently, which invalidates resolved and committed offsets as part
* of the recovery mechanism (see OffsetManager.clearOffsets). In concurrent
* scenarios this can initiate a new fetch with invalid offsets.
*
* This was further highlighted by https://github.com/tulios/kafkajs/pull/570,
* which increased concurrency, making this more likely to happen.
*
* This is solved by only making requests for partitions with initialized offsets.
*
* See the following pull request which explains the context of the problem:
* @issue https://github.com/tulios/kafkajs/pull/578
*/
return committedOffsets[topicPartition.topic][partition] != null
})
.map(partition => ({
partition,
fetchOffset: this.offsetManager
.nextOffset(topicPartition.topic, partition)
.toString(),
maxBytes: maxBytesPerPartition,
}))

requestsPerLeader[leader] = requestsPerLeader[leader] || []
requestsPerLeader[leader].push({ topic: topicPartition.topic, partitions })
Expand Down

0 comments on commit 6002347

Please sign in to comment.