-
Notifications
You must be signed in to change notification settings - Fork 534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Agents are cancelled if a rebalance occurs whilst waiting for a Channel #678
Comments
Something which played into making this problem hard to track down was that this block https://github.com/robinhood/faust/blob/master/faust/agents/agent.py#L650-L652 catches the |
It's hard to see the benefit of ever swallowing that exception - regardless of |
I've experienced something that seems very similar, but only when running Cython-optimized version of faust. |
This is a long-shot guess without looking at the source code, but could this be related to #571? |
please take a look at https://github.com/faust-streaming/faust/pull/42/files |
I have created a release https://pypi.org/project/faust-streaming/0.2.1rc2/ . Can you test if it works for you? |
Checklist
master
branch of Faust.I have not been able to build with
master
of faust due toaiokafka
version, but I have checked through the relevant code inmaster
(see below).Steps to reproduce
Topic
and writes to aChannel
, like so:app.py
Expected behavior
Each worker takes a share of partitions on rebalance, and continues processing.
Actual behavior
Each worker logs their share of partitions, but the first worker's
topic_consumer_agent
does not continue processing. E.G.Worker 1 Log
Worker 2 Log
Similarly, if worker one is then terminated, the second worker's
topic_processor_agent
is cancelled after a rebalance:Notes
This appears to be caused by the underlying
FlowControlQueue
, which issuesasyncio.CancelledError
s to any tasks which are currently waiting to write to the queue, specifically this block inmode
When this happens, the
topic_consumer_agent
receives anasyncio.CancelledError
, and ceases processing.In turn this happens because
clear_on_resume
is passed asTrue
toFlowControlQueue
in faust'sChannel
, as seen in the source code.A workaround for this is to sub-class
Channel
and override theclear_on_resume
argument toFalse
, and to setacks=False
on theTopic
.I'm not totally sure why
acks
needs to beFalse
, but just modifyingclear_on_resume
appears to cause rebalance loops.Versions
confluentinc/cp-kafka:5.5.1
docker imageThe text was updated successfully, but these errors were encountered: