You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In my company, we are using quite a lot aiokafka : for basic producer/consumer loop, but we also built our own streaming lib on top of it (with statefull transformer).
But this kind of loop will fail if the background group rebalance routine is executing during the processing. When reaching the commit, you will get a CommitFailedError. If not catched, the worker will die, then a new rebalance will be triggering, causing again some rebalance, and so on. We are also under the impression that the "commit" without argument could be risky as subscription could have change in between
We have end up implementing the above pattern like this
whileTrue:
records=consumer.getmany(...)
offsets= {}
fortp, messagesinrecords.items():
offsets[tp] =messages[-1].offset+1awaitsome_processing(records)
try:
consumer.commit(offsets)
except (CommitFailedError, IllegalStateError) aserr:
logger.warning("Commit failed due to rebalancing, circle back to consume new messages", exc_info=err)
Is this pattern correct ? Then should the example in the documentation highlights this behavior ? Should aiokafka provides helpers for such pattern ? max_poll_interval_ms documentation and related error messages are also wrong and confusing (like the above will suggest to change max_poll_interval_ms but it won't help, as it is happening when the processing time is below max_poll_interval_ms too)
In the past, we also tried to mutually exclude the rebalance process from the consumer loop with locks, but then it is blocking also the heartbeat and makes it complicated to maintain a consumer alive.
This discussion was converted from issue #929 on October 25, 2023 08:45.
Heading
Bold
Italic
Quote
Code
Link
Numbered list
Unordered list
Task list
Attach files
Mention
Reference
Menu
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi !
In my company, we are using quite a lot aiokafka : for basic producer/consumer loop, but we also built our own streaming lib on top of it (with statefull transformer).
We stumble quite often on issue related to have the rebalance logic done in a background coroutine, like it is explained here https://aiokafka.readthedocs.io/en/stable/kafka-python_difference.html#rebalances-are-happening-in-the-background
Is has a lot consequences, and makes some classic patterns wrong.
The simplest example is the "at least once" consumer loop
The pattern we did, like in a java client was, like in the example https://aiokafka.readthedocs.io/en/stable/examples/manual_commit.html :
But this kind of loop will fail if the background group rebalance routine is executing during the processing. When reaching the commit, you will get a CommitFailedError. If not catched, the worker will die, then a new rebalance will be triggering, causing again some rebalance, and so on. We are also under the impression that the "commit" without argument could be risky as subscription could have change in between
We have end up implementing the above pattern like this
Is this pattern correct ? Then should the example in the documentation highlights this behavior ? Should aiokafka provides helpers for such pattern ?
max_poll_interval_ms
documentation and related error messages are also wrong and confusing (like the above will suggest to changemax_poll_interval_ms
but it won't help, as it is happening when the processing time is belowmax_poll_interval_ms
too)In the past, we also tried to mutually exclude the rebalance process from the consumer loop with locks, but then it is blocking also the heartbeat and makes it complicated to maintain a consumer alive.
Beta Was this translation helpful? Give feedback.
All reactions