Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Fixes #216
During a rebalance, faust implements multiple branches for recovery to resume processing such as in
Recovery.on_recovery_completed
andRecovery._restart_recovery
which roughly follow the pattern of:self.log.info("Seek stream partitions to committed offsets.")
consumer.perform_seek()
self.log.dev("Resume stream partitions")
consumer.resume_partitions(
consumer.resume_flow()
self.app.flow_control.resume()
However, when the application doesn't use any tables,
_restart_recovery
calls_resume_streams
which follows the pattern:consumer.resume_flow()
app.flow_control.resume()
self.log.info("Seek stream partitions to committed offsets.")
consumer.perform_seek()
self.log.dev("Resume stream partitions")
consumer.resume_partitions(
In this case,
app.flow_control.resume()
is called beforeconsumer.perform_seek()
. This can cause some race conditions where the consumer starts fetching messages from where it left off before queues were cleared byapp.flow_control.suspend()
. Sincesuspend
clears queues, if the consumer starts fetching again before seeking back to the last committed offset, a gap appears in the processing.This change brings the order of calls in
_resume_streams
in line with elsewhere in the class.