Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get correct next offset for compacted topics #577

Merged
merged 8 commits into from
Dec 12, 2019

Conversation

Nevon
Copy link
Collaborator

@Nevon Nevon commented Dec 4, 2019

FYI @jlek

The intention is that if you get a batch consisting only of messages from before the requested offset (n), we resolve offset n, which means that on next fetch you'll get n+1, allowing you to move forward.

We can't just resolve highwaterMark + 1, because then you'd skip past any messages after the current batch.

I haven't been able to reproduce #562 locally, and I don't have a Fetch buffer fixture to write a unit test from, so @jlek is going to try to validate this against his production workload.

…re the requested offset

Apparently this can happen when you get compacted topics, which can
cause you to get stuck, because there's no way to resolve any offsets.
What we do now in that case is to resolve the fetched offset, which will
advance by one.

We can't just use the highwatermark because you can get a batch
consisting only of these "old" compacted records while still having lots
of valid messages afterwards (so you don't want to skip to the end of
the partition).

Hopefully fixes #562
@Nevon Nevon added the bug label Dec 4, 2019
@Nevon Nevon requested a review from tulios December 4, 2019 10:51
@Nevon
Copy link
Collaborator Author

Nevon commented Dec 5, 2019

@jlek verified that the latest fix (stupid mistake on my part, assuming that longFetchedOffset was an instance variable, when it was local to the constructor) works for him.

@tulios
Copy link
Owner

tulios commented Dec 5, 2019

There is a broker test failing; I think it's a new default configuration from the Kafka 2.3 update

@Nevon
Copy link
Collaborator Author

Nevon commented Dec 5, 2019

There is a broker test failing; I think it's a new default configuration from the Kafka 2.3 update

It was just that I hadn't update the azure pipeline conf to use Kafka 2.3. Fixed in 21b8264

@Nevon
Copy link
Collaborator Author

Nevon commented Dec 5, 2019

So this works, but if you have N compacted messages to get through, it'll take you N fetches because we only move forward one offset at a time (we resolve the requested offset after all).

It would be better if we calculated the offset to resolve by taking the firstOffset + lastOffsetDelta on the batch level. A naive solution would be something like:

class Batch {
  lastOffset() {
    if (this.isEmptyDueToLogCompactedMessages()) {
      // We know there's at least one message there, because we check it in
      // `isEmptyDueToLogCompactedMessages`
      const { firstOffset, lastOffsetDelta } = this.rawMessages[0].batchContext
      return firstOffset + lastOffsetDelta
    }

    if (this.isEmptyIncludingFiltered()) {
      return Long.fromValue(this.highWatermark)
        .add(-1)
        .toString()
    }

    return this.messagesWithinOffset[this.messagesWithinOffset.length - 1].offset
  }
}

But I'm not sure how to make that work with MessageSets rather than RecordBatches

@Nevon Nevon merged commit b15e0f1 into master Dec 12, 2019
@Nevon Nevon deleted the get-correct-next-offset-for-compacted-topics branch December 12, 2019 15:17
@Nevon
Copy link
Collaborator Author

Nevon commented Dec 12, 2019

This is out in the pre-release version 1.12.0-beta.12.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants