Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel message handling #320

Closed
yonigoyhman opened this issue Mar 20, 2019 · 6 comments
Closed

Parallel message handling #320

yonigoyhman opened this issue Mar 20, 2019 · 6 comments
Labels

Comments

@yonigoyhman
Copy link

yonigoyhman commented Mar 20, 2019

Hey,

we are using 3rd party multi-threaded libraries and would like to be able to handle messages in parallel and only commit once a message was handled successfully.

These parallel handles might have different running time.

Going over your example, I have a few questions:

  1. If a higher offset finishes processing before a lower one ,and we resolveOffset, will this offset be committed immediately (causing the all lower offsets that might have not finished processing be committed as well)?

  2. If we fail processing a message, and use the retry mechanism, will the message be committed before actually processed successfully? What happens if process crashes before a successful retry?

  3. If we pause a consumer due to back-pressure, and a re-balance occurs before we resume, will the consumer maintain its paused state?

Thanks.

@tulios
Copy link
Owner

tulios commented Mar 20, 2019

Hi @yonigoyhman, answering your questions:

If a higher offset finishes processing before a lower one ,and we resolveOffset, will this offset be committed immediately (causing the all lower offsets that might have not finished processing be committed as well)?

resolveOffset will never commit; it only keeps a checkpoint of the offsets that should be committed. One thing to keep in mind is that you should never resolve offsets out of order, like, resolve 5, then 1, then 3. You can also only resolve the highest offset; you don't have to resolve every offset you process. The advantage of resolving every offset is that you can commit the results frequently, instead of only once. I'm assuming you are using eachBatch, so the best course of action is only to resolve once all processes finish.

If we fail processing a message, and use the retry mechanism, will the message be committed before actually processed successfully? What happens if process crashes before a successful retry?

No. If it crashes, the resolved offsets will be committed, and the unresolved ones will come back again on the next batch. You should only resolve the offsets once they have been processed.

If we pause a consumer due to back-pressure, and a re-balance occurs before we resume, will the consumer maintain its paused state?

Yes. Rebalances won't affect the paused state so that it will remain paused, but if the consumer crashes for some reason (usually connectivity issues, or issues on your callbacks), the consumer will be resumed. I think we can improve this and keep the consumer paused, but this is the current state.

We have a pattern for processing messages in parellel in our systems, we usually use a custom function called promiseAllInBatches, and the code looks like this:

consumer.run({
  eachBatch: async ({ batch, resolveOffset, resolveOffset, commitOffsetsIfNecessary, heartbeat, isRunning }) => {
    const promises = batch.messages.map(message => () => processMessage(topic, partition, message))

    const shouldContinue = async (processedMessages, chunk, numberOfChunks) => {
      const lastMessage = processedMessages.pop()

      await resolveOffset(lastMessage.offset)
      await commitOffsetsIfNecessary()
      await heartbeat()

      return isRunning()
    }

    await promiseAllInBatches({ batchSize: 20, shouldContinue }, promises)
  }
})

promiseAllInBatches will process all messages in parallel in chunks of X.

@yonigoyhman
Copy link
Author

Thanks for the super quick response @tulios !

We have a similar solution using async.queue where we define a maxParallelHandles, maxQueueSize, and we pause consumption if we reach maxQueueSize.

We do however manage offsets manually (tracking each successful message handle , and periodically committing first un-handled message).

Looks like we will need to keep this mechanism even with kafka.js. (We are currently using node-rdkafka, but experiencing some difficulties..).

Thanks.

@sklose
Copy link
Contributor

sklose commented Mar 20, 2019

I think this request is related to #122 (comment) ... we have a very similar use case where we process messages outside the original batch callback and need to manage which offsets get committed manually.

Currently we hold on to to the commitOffsetsIfNecessary callbacks and invoke it outside the batch - that works most of the time, but has some race conditions (especially during rebalances it starts throwing errors).

In the next couple of weeks, we are planning to take a stab at submitting a PR that allows to commit offsets outside the runner-loop - unless any of you does it before us or disagrees with that change :)

@rdbenoit
Copy link

Not sure if this would be applicable to the problem discussed here. We have implemented a variation on this directly on our kafkajs fork that allows for batches to be processed in parallel rather than in series as they are now with the for loop. This allows for the batches (which are tied to partitions) to be process in parallel while still keeping the messages inside of each batch in order.

ambyint@c214feb

@Nevon
Copy link
Collaborator

Nevon commented Apr 3, 2019

@rdbenoit: This is a great idea, and something I think we should enable both through eachMessage and eachBatch. We have users with hundreds of partitions, so maybe a simple boolean flag is not quite enough, but I think this is something we should for sure do upstream. I will open up an issue to track this.

@Nevon
Copy link
Collaborator

Nevon commented Apr 8, 2019

@rdbenoit I just created a PR addressing this issue: #332

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants