Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SeekOffsets data structure #1378

Merged
merged 4 commits into from
May 31, 2022
Merged

Fix SeekOffsets data structure #1378

merged 4 commits into from
May 31, 2022

Conversation

bgeisberger
Copy link
Contributor

@bgeisberger bgeisberger commented May 30, 2022

TLDR

In version 2.0.1, most consumer.seek() calls are broken if more than one partition is used. This PR fixes the underlying issue.

Detailed explanation & way to reproduce:

After upgrading kafkajs to version 2.0.1 in our application, a test suddenly failed to pass.

Initially, this test does the following:

  • Start Kafka and create a random topic with three partitions
  • Produce two messages to partition 1
  • Set consumer offset in partition 1 to 1 (i.e., skip the first message) using the admin client method setOffsets(...)
  • Start a consumer and do some follow-up work

However, this fails (in a reproducible way) because the third operation sets the offset not for partition 1, but partition 0 instead. Further investigations of the bug lead me to the conclusion that the issue must lie somewhere within the consumer seek logic, as the internal consumer.seek() call within setOffsets passed the correct partition information. I encountered the updated SeekOffsets data structure eventually and found two issues:

  • The keys are not partition-specific, it stores only one element per topic, because the keys for each partition map to the same. I fixed this by packing the required information into an array in line 3. This directly caused above issue, because pop(TOPIC, 0), which happened before pop(TOPIC, 1), returned the offset for partition 1 instead of 0, and removed it.
  • pop(topic, partition) returns a non-undefined object (with undefined topic and partition) as soon as ANY topic or partition offset information is stored. I didn't investigate in how far this would affect the seek call in consumerGroup.js line 662, but I guess it is not how it should be called.

@Nevon
Copy link
Collaborator

Nevon commented May 30, 2022

The patch makes perfect sense. The second argument to JSON.stringify is a replacer function, not an additional item to serialize. This is clearly a mistake.

Would you be able to write a test that fails without your patch and succeeds with it, so that we don't introduce this same mistake in the future? Or even just modify this test to trigger the issue you found:

it('updates the partition offset to the given offset', async () => {
await consumer.connect()
await producer.connect()
const key1 = secureRandom()
const message1 = { key: `key-${key1}`, value: `value-${key1}` }
const key2 = secureRandom()
const message2 = { key: `key-${key2}`, value: `value-${key2}` }
const key3 = secureRandom()
const message3 = { key: `key-${key3}`, value: `value-${key3}` }
await producer.send({ acks: 1, topic: topicName, messages: [message1, message2, message3] })
await consumer.subscribe({ topic: topicName, fromBeginning: true })
const messagesConsumed = []
// must be called after run because the ConsumerGroup must be initialized
consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
consumer.seek({ topic: topicName, partition: 0, offset: 1 })
await waitForConsumerToJoinGroup(consumer)
await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({ offset: '1' }),
}),
expect.objectContaining({
topic: topicName,
partition: 0,
message: expect.objectContaining({ offset: '2' }),
}),
])
})

@Nevon Nevon merged commit 4fb5978 into tulios:master May 31, 2022
@Nevon
Copy link
Collaborator

Nevon commented May 31, 2022

Thank you for your contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants