Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agents are cancelled if a rebalance occurs whilst waiting for a Channel #678

Open
1 of 2 tasks
jacksmith15 opened this issue Oct 29, 2020 · 6 comments
Open
1 of 2 tasks

Comments

@jacksmith15
Copy link

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

I have not been able to build with master of faust due to aiokafka version, but I have checked through the relevant code in master (see below).

Steps to reproduce

  1. Set up an agent which consumes from a Topic and writes to a Channel, like so:

app.py

import asyncio
import random

import faust


app = faust.App("lockup-demo", broker="localhost:29092")

channel = app.channel(maxsize=1)
topic = app.topic("my-topic", partitions=12, internal=True)


@app.timer(interval=2)
async def produce_ids():
    event = random.randint(0, 10000)
    print(f"Producing event {event}")
    await topic.send(value=event)


@app.agent(topic)
async def topic_consumer_agent(stream):
    print("Start agent")
    async for event in stream:
        print(f"Consumed event {event} from topic, sending to channel.")
        try:
            await channel.send(value=event)
        except BaseException as exc:
            print(f"Got exception {repr(exc)}")
            raise exc


@app.agent(channel, concurrency=1)
async def channel_consumer_agent(stream):
    async for event in stream:
        await asyncio.sleep(4)
        print(f"Consumed event {event} from channel.")
  1. Start a single worker (assumes kafka available on port 29092):
faust --datadir=.worker-data-1 -A src.app worker -l info --web-port=6066
  1. Wait for items to begin processing:
[2020-10-29 16:13:40,407] [644661] [INFO] Joined group 'lockup-demo' (generation 24) with member_id faust-1.10.4-18873a85-fcfc-42f3-b516-f1eff23377fe 
[2020-10-29 16:13:40,408] [644661] [INFO] Elected group leader -- performing partition assignments using faust 
[2020-10-29 16:13:40,413] [644661] [INFO] Successfully synced group lockup-demo with generation 24 
[2020-10-29 16:13:40,414] [644661] [INFO] Setting newly assigned partitions 
┌Topic Partition Set──────────────┬────────────┐
│ topic                           │ partitions │
├─────────────────────────────────┼────────────┤
│ lockup-demo-__assignor-__leader │ {0}        │
│ my-topic                        │ {0-11}     │
└─────────────────────────────────┴────────────┘ for group lockup-demo 
[2020-10-29 16:13:40,421] [644661] [INFO] [^---Recovery]: Resuming flow... 
[2020-10-29 16:13:40,422] [644661] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-10-29 16:13:40,438] [644661] [INFO] [^---Fetcher]: Starting... 
[2020-10-29 16:13:40,440] [644661] [INFO] [^---Recovery]: Worker ready 
[2020-10-29 16:13:40,441] [644661] [INFO] [^Worker]: Ready 
[2020-10-29 16:13:40,451] [644661] [WARNING] Consumed event 5898 from topic, sending to channel. 
[2020-10-29 16:13:40,452] [644661] [WARNING] Consumed event 6665 from topic, sending to channel. 
[2020-10-29 16:13:40,453] [644661] [WARNING] Consumed event 9476 from topic, sending to channel. 
[2020-10-29 16:13:42,443] [644661] [WARNING] Producing event 6993 
[2020-10-29 16:13:44,444] [644661] [WARNING] Producing event 5768 
[2020-10-29 16:13:44,453] [644661] [WARNING] Consumed event 5898 from channel. 
[2020-10-29 16:13:44,454] [644661] [WARNING] Consumed event 5234 from topic, sending to channel. 
[2020-10-29 16:13:46,446] [644661] [WARNING] Producing event 8827 
[2020-10-29 16:13:48,448] [644661] [WARNING] Producing event 5499 
[2020-10-29 16:13:48,454] [644661] [WARNING] Consumed event 6665 from channel. 
[2020-10-29 16:13:48,455] [644661] [WARNING] Consumed event 7133 from topic, sending to channel. 
  1. Start a second worker:
faust --datadir=.worker-data-2 -A src.app worker -l info --web-port=6067

Expected behavior

Each worker takes a share of partitions on rebalance, and continues processing.

Actual behavior

Each worker logs their share of partitions, but the first worker's topic_consumer_agent does not continue processing. E.G.

Worker 1 Log

[2020-10-29 16:15:07,450] [644661] [INFO] Revoking previously assigned partitions 
┌Topic Partition Set──────────────┬────────────┐
│ topic                           │ partitions │
├─────────────────────────────────┼────────────┤
│ lockup-demo-__assignor-__leader │ {0}        │
│ my-topic                        │ {0-11}     │
└─────────────────────────────────┴────────────┘ for group lockup-demo 
[2020-10-29 16:15:07,484] [644661] [WARNING] Got exception CancelledError() 
[2020-10-29 16:15:07,492] [644661] [INFO] (Re-)joining group lockup-demo 
[2020-10-29 16:15:07,496] [644661] [INFO] Joined group 'lockup-demo' (generation 25) with member_id faust-1.10.4-18873a85-fcfc-42f3-b516-f1eff23377fe 
[2020-10-29 16:15:07,496] [644661] [INFO] Elected group leader -- performing partition assignments using faust 
[2020-10-29 16:15:07,502] [644661] [INFO] Successfully synced group lockup-demo with generation 25 
[2020-10-29 16:15:07,504] [644661] [INFO] Setting newly assigned partitions 
┌Topic Partition Set──────────────┬────────────┐
│ topic                           │ partitions │
├─────────────────────────────────┼────────────┤
│ lockup-demo-__assignor-__leader │ {0}        │
│ my-topic                        │ {6-11}     │
└─────────────────────────────────┴────────────┘ for group lockup-demo 
[2020-10-29 16:15:07,510] [644661] [INFO] [^---Recovery]: Resuming flow... 
[2020-10-29 16:15:07,510] [644661] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-10-29 16:15:07,526] [644661] [INFO] [^---Recovery]: Worker ready 
[2020-10-29 16:15:08,493] [644661] [WARNING] Consumed event 8089 from channel. 
[2020-10-29 16:15:08,508] [644661] [WARNING] Producing event 1343 
[2020-10-29 16:15:10,511] [644661] [WARNING] Producing event 7120 
[2020-10-29 16:15:12,513] [644661] [WARNING] Producing event 5203 
[2020-10-29 16:15:14,515] [644661] [WARNING] Producing event 6326 
[2020-10-29 16:15:16,516] [644661] [WARNING] Producing event 4353 
[2020-10-29 16:15:18,518] [644661] [WARNING] Producing event 1766 
[2020-10-29 16:15:20,519] [644661] [WARNING] Producing event 7995 
[2020-10-29 16:15:22,522] [644661] [WARNING] Producing event 3025 
[2020-10-29 16:15:24,522] [644661] [WARNING] Producing event 5574 
[2020-10-29 16:15:26,525] [644661] [WARNING] Producing event 1634 
[2020-10-29 16:15:28,525] [644661] [WARNING] Producing event 3433 
[2020-10-29 16:15:30,529] [644661] [WARNING] Producing event 3449 

Worker 2 Log

[2020-10-29 16:15:07,496] [644921] [INFO] Joined group 'lockup-demo' (generation 25) with member_id faust-1.10.4-651fd5fc-1c02-4981-80ce-057040f754c4 
[2020-10-29 16:15:07,502] [644921] [INFO] Successfully synced group lockup-demo with generation 25 
[2020-10-29 16:15:07,503] [644921] [INFO] Setting newly assigned partitions 
┌Topic Partition Set────┐
│ topic    │ partitions │
├──────────┼────────────┤
│ my-topic │ {0-5}      │
└──────────┴────────────┘ for group lockup-demo 
[2020-10-29 16:15:07,510] [644921] [INFO] [^---Recovery]: Resuming flow... 
[2020-10-29 16:15:07,511] [644921] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-10-29 16:15:07,526] [644921] [INFO] [^---Fetcher]: Starting... 
[2020-10-29 16:15:07,526] [644921] [INFO] [^---Recovery]: Worker ready 
[2020-10-29 16:15:07,530] [644921] [INFO] [^Worker]: Ready 
[2020-10-29 16:15:07,532] [644921] [WARNING] Consumed event 8099 from topic, sending to channel. 
[2020-10-29 16:15:07,533] [644921] [WARNING] Consumed event 5499 from topic, sending to channel. 
[2020-10-29 16:15:07,534] [644921] [WARNING] Consumed event 1545 from topic, sending to channel. 
[2020-10-29 16:15:09,532] [644921] [WARNING] Producing event 800 
[2020-10-29 16:15:11,532] [644921] [WARNING] Producing event 3290 
[2020-10-29 16:15:11,533] [644921] [WARNING] Consumed event 8099 from channel. 
[2020-10-29 16:15:11,534] [644921] [WARNING] Consumed event 1604 from topic, sending to channel. 
[2020-10-29 16:15:13,534] [644921] [WARNING] Producing event 7142 
[2020-10-29 16:15:15,535] [644921] [WARNING] Consumed event 5499 from channel. 
[2020-10-29 16:15:15,536] [644921] [WARNING] Producing event 3639 

Similarly, if worker one is then terminated, the second worker's topic_processor_agent is cancelled after a rebalance:

[2020-10-29 16:18:01,574] [644921] [INFO] Revoking previously assigned partitions 
┌Topic Partition Set────┐
│ topic    │ partitions │
├──────────┼────────────┤
│ my-topic │ {0-5}      │
└──────────┴────────────┘ for group lockup-demo 
[2020-10-29 16:18:01,611] [644921] [WARNING] Got exception CancelledError() 
[2020-10-29 16:18:01,617] [644921] [INFO] (Re-)joining group lockup-demo 
[2020-10-29 16:18:01,620] [644921] [INFO] Joined group 'lockup-demo' (generation 26) with member_id faust-1.10.4-651fd5fc-1c02-4981-80ce-057040f754c4 
[2020-10-29 16:18:01,621] [644921] [INFO] Elected group leader -- performing partition assignments using faust 
[2020-10-29 16:18:01,628] [644921] [INFO] Successfully synced group lockup-demo with generation 26 
[2020-10-29 16:18:01,630] [644921] [INFO] Setting newly assigned partitions 
┌Topic Partition Set──────────────┬────────────┐
│ topic                           │ partitions │
├─────────────────────────────────┼────────────┤
│ lockup-demo-__assignor-__leader │ {0}        │
│ my-topic                        │ {0-11}     │
└─────────────────────────────────┴────────────┘ for group lockup-demo 
[2020-10-29 16:18:01,638] [644921] [INFO] [^---Recovery]: Resuming flow... 
[2020-10-29 16:18:01,640] [644921] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-10-29 16:18:01,660] [644921] [INFO] [^---Recovery]: Worker ready 
[2020-10-29 16:18:01,663] [644921] [WARNING] Producing event 2324 
[2020-10-29 16:18:03,615] [644921] [WARNING] Consumed event 8648 from channel. 
[2020-10-29 16:18:03,659] [644921] [WARNING] Producing event 2405 
[2020-10-29 16:18:05,668] [644921] [WARNING] Producing event 3834 
[2020-10-29 16:18:07,663] [644921] [WARNING] Producing event 1638 
[2020-10-29 16:18:09,670] [644921] [WARNING] Producing event 9907 
[2020-10-29 16:18:11,666] [644921] [WARNING] Producing event 7274 
[2020-10-29 16:18:13,673] [644921] [WARNING] Producing event 8325 
[2020-10-29 16:18:15,668] [644921] [WARNING] Producing event 2437 
[2020-10-29 16:18:17,676] [644921] [WARNING] Producing event 5504 

Notes

This appears to be caused by the underlying FlowControlQueue, which issues asyncio.CancelledErrors to any tasks which are currently waiting to write to the queue, specifically this block in mode

When this happens, the topic_consumer_agent receives an asyncio.CancelledError, and ceases processing.

In turn this happens because clear_on_resume is passed as True to FlowControlQueue in faust's Channel, as seen in the source code.

A workaround for this is to sub-class Channel and override the clear_on_resume argument to False, and to set acks=False on the Topic.

I'm not totally sure why acks needs to be False, but just modifying clear_on_resume appears to cause rebalance loops.

Versions

  • Python version: 3.8.5
  • Faust version: 1.10.4
  • Mode version: 4.3.2
  • Operating system: Ubuntu 20.04
  • Kafka version: confluentinc/cp-kafka:5.5.1 docker image
  • RocksDB version: not applicable
@benhowes
Copy link

Something which played into making this problem hard to track down was that this block https://github.com/robinhood/faust/blob/master/faust/agents/agent.py#L650-L652 catches the CancelledError raised via mode. I don't have a proposed solution for resolving at the moment, my experiments with it have simply confirmed that it's difficult to know when a cancelled error is recoverable or not.

@jacksmith15
Copy link
Author

It's hard to see the benefit of ever swallowing that exception - regardless of should_stop. Either way coro will be dead.

@forsberg
Copy link

I've experienced something that seems very similar, but only when running Cython-optimized version of faust.

@forsberg
Copy link

This is a long-shot guess without looking at the source code, but could this be related to #571?

@patkivikram
Copy link
Collaborator

please take a look at https://github.com/faust-streaming/faust/pull/42/files

@patkivikram
Copy link
Collaborator

I have created a release https://pypi.org/project/faust-streaming/0.2.1rc2/ . Can you test if it works for you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants