You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have included information about relevant versions
I have verified that the issue persists when using the master branch of Faust.
Steps to reproduce
Running the following code will only process some of the messages generated by the task. The problem is that the stream buffer fills up and removes the topic from the active partitions, and when Consumer.getmany() is iterating over the list of already-received messages, and see that the topic is no longer in the list of active partitions, it drops the remaining messages.
The fix is to include self._buffered_partitions in the check in Consumer.getmany() so that it will continue to process the already-received messages even when the Consumer is no longer actively fetching new messages from the topic because the buffer is full. When the buffer full condition clears, new messages will be processed as usual.
import faust
app = faust.App(
'hello-world3',
broker='kafka://localhost:29092',
value_serializer='raw',
# stream_buffer_maxsize=1000000,
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
count = 0
async for greeting in greetings:
count += 1
print(count)
@app.task()
async def say_hello():
count = 0
for i in range(0, 40000):
count += 1
await greetings_topic.send(key='key', value=f'hello{count}')
Expected behavior
The agent should continue to receive messages when the Consumer buffer is considered "full" and backpressure has been applied
Actual behavior
The Consumer stops sending the already-received messages to the agents/Stream when backpressure is being applied to the topic
Full traceback
Versions
Python version - 3.8
Faust version - 0.4.3
Operating system - Linux
Kafka version - N/A
RocksDB version (if applicable) N/A
The text was updated successfully, but these errors were encountered:
bobh66
added a commit
to bobh66/faust-1
that referenced
this issue
Jan 28, 2021
Checklist
master
branch of Faust.Steps to reproduce
Running the following code will only process some of the messages generated by the task. The problem is that the stream buffer fills up and removes the topic from the active partitions, and when Consumer.getmany() is iterating over the list of already-received messages, and see that the topic is no longer in the list of active partitions, it drops the remaining messages.
The fix is to include self._buffered_partitions in the check in Consumer.getmany() so that it will continue to process the already-received messages even when the Consumer is no longer actively fetching new messages from the topic because the buffer is full. When the buffer full condition clears, new messages will be processed as usual.
Expected behavior
The agent should continue to receive messages when the Consumer buffer is considered "full" and backpressure has been applied
Actual behavior
The Consumer stops sending the already-received messages to the agents/Stream when backpressure is being applied to the topic
Full traceback
Versions
The text was updated successfully, but these errors were encountered: