-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: add option for admin.fetchOffsets to resolve the offsets #895
Conversation
Hi @artysidorenko, thanks for the PR. Adding this flag is something that I wanted for a while but end up forgetting about 😅, so it's definitely something we would like to have. We never had a need for |
* http://kafka.apache.org/protocol.html#The_Messages_DeleteRecords | ||
* Only available from Kafka 0.11 upwards | ||
*/ | ||
const deleteRecords = async ({ cluster, topicName, topicDetails }) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely think we should move this into the supported protocols rather than having it here, but we can certainly do that in a separate PR rather than bloating this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great contribution, thanks a lot! Don't forget to update the type definition for Admin.fetchOffsets
in types/index.d.ts
Co-authored-by: Tommy Brunn <tommy.brunn@gmail.com>
Cool thanks :) I'll have a go at building |
src/admin/index.js
Outdated
partitions: partitions.map(({ offset, partition, ...props }) => ({ | ||
partition, | ||
offset: | ||
Number(offset) === EARLIEST_OFFSET |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change this to not be a double ternary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, nice to finally have this in. Can you just fix the double ternary and we are ready to merge.
Co-authored-by: Arty <26905074+artysidorenko@users.noreply.github.com>
Closes #205
Had a stab at issue #205 as I don't think anyone was working on it any more.
Tweaking the admin to resolve the offsets I think was fairly quick, but let me know if you had other ideas for the implementation.
I might have gone a little OTT with some of the tests 🙈. Wanted to include test cases for:
(a) what happens when you fetch/resolve without having reset the consumer group offsets: it should return the consumer's latest offsets rather than the topic's highwatermark. That's where I'm sending two separate message batches and fetching the offsets before the second batch, and
(b) what happens when low watermark is greater than 0. Am simulating this by deleting some messages from the topic. I couldn't find that the kafkajs client implemented
deleteRecords
yet? (not sure if it's on the roadmap for the future, or a decision to leave it out, or I might have missed it), so I wrote a helper for the test to do it.Let me know if you want me to simplify any of these.. (also, if you don't have deleteRecords but do want to implement it in the main client I'm happy to look into that as well)