-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Get correct next offset for compacted topics #577
Conversation
…re the requested offset Apparently this can happen when you get compacted topics, which can cause you to get stuck, because there's no way to resolve any offsets. What we do now in that case is to resolve the fetched offset, which will advance by one. We can't just use the highwatermark because you can get a batch consisting only of these "old" compacted records while still having lots of valid messages afterwards (so you don't want to skip to the end of the partition). Hopefully fixes #562
@jlek verified that the latest fix (stupid mistake on my part, assuming that |
There is a broker test failing; I think it's a new default configuration from the Kafka 2.3 update |
….com:tulios/kafkajs into get-correct-next-offset-for-compacted-topics
It was just that I hadn't update the azure pipeline conf to use Kafka 2.3. Fixed in 21b8264 |
So this works, but if you have It would be better if we calculated the offset to resolve by taking the class Batch {
lastOffset() {
if (this.isEmptyDueToLogCompactedMessages()) {
// We know there's at least one message there, because we check it in
// `isEmptyDueToLogCompactedMessages`
const { firstOffset, lastOffsetDelta } = this.rawMessages[0].batchContext
return firstOffset + lastOffsetDelta
}
if (this.isEmptyIncludingFiltered()) {
return Long.fromValue(this.highWatermark)
.add(-1)
.toString()
}
return this.messagesWithinOffset[this.messagesWithinOffset.length - 1].offset
}
} But I'm not sure how to make that work with MessageSets rather than RecordBatches |
This is out in the pre-release version |
FYI @jlek
The intention is that if you get a batch consisting only of messages from before the requested offset (n), we resolve offset n, which means that on next fetch you'll get n+1, allowing you to move forward.
We can't just resolve
highwaterMark + 1
, because then you'd skip past any messages after the current batch.I haven't been able to reproduce #562 locally, and I don't have a Fetch buffer fixture to write a unit test from, so @jlek is going to try to validate this against his production workload.