Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer.seek commits an offset, causing possible data and metadata loss #395

Closed
JaapRood opened this issue Jun 14, 2019 · 16 comments · Fixed by #1012
Closed

Consumer.seek commits an offset, causing possible data and metadata loss #395

JaapRood opened this issue Jun 14, 2019 · 16 comments · Fixed by #1012

Comments

@JaapRood
Copy link
Collaborator

JaapRood commented Jun 14, 2019

In debugging one of our stream processing tasks we noticed that consumer.seek commits an offset in order to affect the consumer's fetching position. This mixes controlling the consumer position with recording progress and can lead to offset-metadata getting lost.

Offset metadata can be very useful when working with replicating to and from local stores, especially for achieving exactly-once processing, and is used by KafkaStreams for exactly this purpose (emphasis mine):

  • When committing a task, after the state stores have all been flushed and before produce write the committed offset, we can choose to trigger the persistent state stores' checkpoint mechanism to make a checkpoint for each persistent stores.
  • When writing the committed offset into the input-offset-topic, also pass in the reference (e.g. local state directory / files locations) of all the persistent store checkpoints into the offset metadata as part of the state triplet.
  • Then upon resuming, if the fetched last transaction contains the state store checkpoints information, we can skip the restoration step of the state stores from changelog but use the checkpointed image directly.

When seek commits an offset, this metadata is subsequently lost.

Another example is where you produce to the same topic that you're consuming (eating your own tail), which is useful for things like implementing uniqueness constraints (described well here). There it can often be useful to seek back 1 from the last committed offset, to kickstart the process (as if you don't, no messages are ever produced for you to continue from), especially when doing things like replicating from a store to a Kafka topic.

As far as I can tell, the Java KafkaConsumer does not commit the offset, but only changes the offset in memory so the next fetch happens from there.

Proposed solution

Eventually, I think we'd want to be in a situation where:

  • consumer.seek only affects what message is fetched next, not how progress has been made thus far.
  • consumer.commit can be used to record progress, possible after a seek, if that's required. See Add way to commit offset with Consumer #378.

However, this would constitute a breaking change, despite the committing of the offset is currently a bit of a side effect. So perhaps a flag passed to consumer.seek would be a way to opt-in to the new behaviour, as a way to verify the approach in real applications. That way if the API is to be broken in a new major version, we prevent the need for a revert.

@JaapRood
Copy link
Collaborator Author

While working on Frolyk (JaapRood/frolyk#2), I ran into an additional bug in seeking, that is probably a good one to tackle as part of this one. Because of the behaviour of committing the offset, the logical offsets -1 and -2 don't behave as expected, despite being accepted. Depending on how fromBeginning is set, seeking to either offset will reset the offset to either start or beginning.

@ThisIsMissEm
Copy link

Would it perhaps be an idea to put a message out to all users asking if anyone currently uses consumer.seek, and if this would be a hard breaking change for them?

(That said, I'd be totally on board with this being a semver major change)

@ThisIsMissEm
Copy link

For v1, could it be possible to add a consumer.seekWithoutCommit such that we can gain this behavior without breaking changes? And then for v2 change how consumer.seek works?

@JaapRood
Copy link
Collaborator Author

For v1, could it be possible to add a consumer.seekWithoutCommit such that we can gain this behavior without breaking changes? And then for v2 change how consumer.seek works?

Totally! Fortunately, the plan is for the next version to be a major, so we might get away without the extra method. With a migration instruction we can point users who relied on the committing to the fact that they can simply commit separately from seeking.

@ThisIsMissEm
Copy link

I'm not sure how you like to do versioning, but it seems like a pretty important missing feature

@rcbjBlueMars
Copy link

We ran into the same issue originally reported in this thread on a project that I am working on. Is there a planned date for when an updated consumer.seek() implementation is going to be available?

Thanks!

RCBJ

@ankon
Copy link
Contributor

ankon commented Jun 19, 2020

I don't think there's active work going on for this issue right now.

@angelozerr
Copy link

angelozerr commented Jan 28, 2021

Is there any chance to fix this bug? We need this feature in vscode-kafka to give the capability to start consumer with a given offset. Thanks for your help!

@Nevon
Copy link
Collaborator

Nevon commented Jan 28, 2021

As Ankon said, no one is actively working on this right now, and no one has volunteered to take on the work.

Adding an optional flag to seek that controls whether or not it commits the seeked offset would be my preferred solution:

await consumer.seek({ topic: 'example', partition: 0, offset: 12384, commit: false })

Where the default behavior is commit: true. In a future breaking change we could change this default.

@ThisIsMissEm
Copy link

@Nevon would the kafkajs project take sponsorships for this development work?

@Nevon
Copy link
Collaborator

Nevon commented Jan 28, 2021

@Nevon would the kafkajs project take sponsorships for this development work?

Thank you very much for offering! I'm personally not set up to take on contract work, but I'll do you one better and put up a free PR . #1012

If you want to support my work, consider becoming a sponsor.

@ThisIsMissEm
Copy link

@Nevon oh, cool! Have forwarded to the powers that be, as I'm sure this'll help us, thanks!

@JaapRood
Copy link
Collaborator Author

@Nevon you absolute champ!

Have forwarded to the powers that be, as I'm sure this'll help us, thanks!

@ThisIsMissEm good one, I'll do the same!

@angelozerr
Copy link

@Nevon thanks so much for your current work. I have not the capability to review your PR but I will try to give you feedback with vscode-kafka. Thanks again!

@Nevon
Copy link
Collaborator

Nevon commented Feb 1, 2021

The fix for this will be out in 1.16.0-beta.7 in a few minutes. Let me know if it solves your problem, @angelozerr.

@angelozerr
Copy link

The fix for this will be out in 1.16.0-beta.7 in a few minutes. Let me know if it solves your problem, @angelozerr.

Thanks so much, @Nevon I will give you feedback ASAP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants