-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel message handling #320
Comments
Hi @yonigoyhman, answering your questions:
No. If it crashes, the resolved offsets will be committed, and the unresolved ones will come back again on the next batch. You should only resolve the offsets once they have been processed.
Yes. Rebalances won't affect the paused state so that it will remain paused, but if the consumer crashes for some reason (usually connectivity issues, or issues on your callbacks), the consumer will be resumed. I think we can improve this and keep the consumer paused, but this is the current state. We have a pattern for processing messages in parellel in our systems, we usually use a custom function called consumer.run({
eachBatch: async ({ batch, resolveOffset, resolveOffset, commitOffsetsIfNecessary, heartbeat, isRunning }) => {
const promises = batch.messages.map(message => () => processMessage(topic, partition, message))
const shouldContinue = async (processedMessages, chunk, numberOfChunks) => {
const lastMessage = processedMessages.pop()
await resolveOffset(lastMessage.offset)
await commitOffsetsIfNecessary()
await heartbeat()
return isRunning()
}
await promiseAllInBatches({ batchSize: 20, shouldContinue }, promises)
}
})
|
Thanks for the super quick response @tulios ! We have a similar solution using We do however manage offsets manually (tracking each successful message handle , and periodically committing first un-handled message). Looks like we will need to keep this mechanism even with kafka.js. (We are currently using node-rdkafka, but experiencing some difficulties..). Thanks. |
I think this request is related to #122 (comment) ... we have a very similar use case where we process messages outside the original batch callback and need to manage which offsets get committed manually. Currently we hold on to to the commitOffsetsIfNecessary callbacks and invoke it outside the batch - that works most of the time, but has some race conditions (especially during rebalances it starts throwing errors). In the next couple of weeks, we are planning to take a stab at submitting a PR that allows to commit offsets outside the runner-loop - unless any of you does it before us or disagrees with that change :) |
Not sure if this would be applicable to the problem discussed here. We have implemented a variation on this directly on our kafkajs fork that allows for batches to be processed in parallel rather than in series as they are now with the |
@rdbenoit: This is a great idea, and something I think we should enable both through |
Hey,
we are using 3rd party multi-threaded libraries and would like to be able to handle messages in parallel and only commit once a message was handled successfully.
These parallel handles might have different running time.
Going over your example, I have a few questions:
If a higher offset finishes processing before a lower one ,and we resolveOffset, will this offset be committed immediately (causing the all lower offsets that might have not finished processing be committed as well)?
If we fail processing a message, and use the retry mechanism, will the message be committed before actually processed successfully? What happens if process crashes before a successful retry?
If we pause a consumer due to back-pressure, and a re-balance occurs before we resume, will the consumer maintain its paused state?
Thanks.
The text was updated successfully, but these errors were encountered: