Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discard messages for toppars that saw a seek operation during a fetch or batch processing #367

Merged
merged 11 commits into from
Jun 6, 2019
Merged
51 changes: 51 additions & 0 deletions src/consumer/__tests__/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,57 @@ describe('Consumer', () => {
expect(calls).toEqual(1)
})

it('skips messages fetched while seek was called', async () => {
consumer = createConsumer({
cluster: createCluster(),
groupId,
maxWaitTimeInMs: 1000,
logger: newLogger(),
})

const messages = Array(10)
.fill()
.map(() => {
const value = secureRandom()
return { key: `key-${value}`, value: `value-${value}` }
})
await producer.connect()
await producer.send({ acks: 1, topic: topicName, messages })

await consumer.connect()

await consumer.subscribe({ topic: topicName, fromBeginning: true })

const sleep = value => waitFor(delay => delay >= value)
let offsetsConsumed = []

const eachBatch = async ({ batch, heartbeat }) => {
for (const message of batch.messages) {
offsetsConsumed.push(message.offset)
}

await heartbeat()
}

consumer.run({
eachBatch,
})

await waitForConsumerToJoinGroup(consumer)

await waitFor(() => offsetsConsumed.length === messages.length, { delay: 50 })
await sleep(50)

// Hope that we're now in an active fetch state? Something like FETCH_START might help
const seekedOffset = offsetsConsumed[Math.floor(messages.length / 2)]
consumer.seek({ topic: topicName, partition: 0, offset: seekedOffset })
await producer.send({ acks: 1, topic: topicName, messages }) // trigger completion of fetch

await waitFor(() => offsetsConsumed.length > messages.length, { delay: 50 })

expect(offsetsConsumed[messages.length]).toEqual(seekedOffset)
})

describe('transactions', () => {
testIfKafka_0_11('accepts messages from an idempotent producer', async () => {
cluster = createCluster({ allowExperimentalV011: true })
Expand Down
20 changes: 13 additions & 7 deletions src/consumer/consumerGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,17 @@ module.exports = class ConsumerGroup {
({ topic }) => topic === topicName
)

return partitions.map(partitionData => {
const partitionRequestData = topicRequestData.partitions.find(
({ partition }) => partition === partitionData.partition
)
return partitions
.filter(partitionData => !this.seekOffset.has(topicName, partitionData.partition))
.map(partitionData => {
const partitionRequestData = topicRequestData.partitions.find(
({ partition }) => partition === partitionData.partition
)

const fetchedOffset = partitionRequestData.fetchOffset
const fetchedOffset = partitionRequestData.fetchOffset

return new Batch(topicName, fetchedOffset, partitionData)
})
return new Batch(topicName, fetchedOffset, partitionData)
})
})

return flatten(batchesPerPartition)
Expand Down Expand Up @@ -484,4 +486,8 @@ module.exports = class ConsumerGroup {
}
}
}

hasSeekOffset({ topic, partition }) {
return this.seekOffset.has(topic, partition)
}
}
3 changes: 2 additions & 1 deletion src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ module.exports = class Runner {
const { topic, partition } = batch

for (let message of batch.messages) {
if (!this.running) {
if (!this.running || this.consumerGroup.hasSeekOffset({ topic, partition })) {
break
}

Expand Down Expand Up @@ -187,6 +187,7 @@ module.exports = class Runner {
},
uncommittedOffsets: () => this.consumerGroup.uncommittedOffsets(),
isRunning: () => this.running,
isStale: ({ topic, partition }) => this.consumerGroup.hasSeekOffset({ topic, partition }),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe isStale isn't clear enough, what do you think? I can't offer a better name now, let me think about this.

Copy link
Collaborator Author

@JaapRood JaapRood Jun 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming is always hard :/ I tried to reason from a person implementing eachBatch and how the name of the method should hint at what they can or cannot do. In this case we'd want to imply that one could continue processing, but that messages have gone outdated. Maybe isCanceled makes more sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sitting on isCanceled for a bit, I think it might be confusing in combination with isRunning. What would be the difference between the two? While isStale in isolation is maybe less obvious, I do think it avoids that confusion.

})
} catch (e) {
if (!isKafkaJSError(e)) {
Expand Down
4 changes: 4 additions & 0 deletions src/consumer/seekOffsets.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ module.exports = class SeekOffsets extends Map {
super.set([topic, partition], offset)
}

has(topic, partition) {
return Array.from(this.keys()).some(([t, p]) => t === topic && p === partition)
}

pop() {
if (this.size === 0) {
return
Expand Down