Skip to content

Commit

Permalink
fix race conditions on rebalance (faust-streaming#241)
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram authored Dec 15, 2021
1 parent 48c2f0b commit 6f3c783
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,14 @@ async def _resume_streams(self, generation_id: int = 0) -> None:
self.log.warning("Recovery rebalancing again")
return
if assignment:
consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)
self.log.info("Seek stream partitions to committed offsets.")
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)
self.log.dev("Resume stream partitions")
consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)
else:
self.log.info("Resuming streams with empty assignment")
self.completed.set()
Expand Down Expand Up @@ -615,16 +615,16 @@ async def on_recovery_completed(self, generation_id: int = 0) -> None:
f"{self.app.consumer_generation_id} param {generation_id}"
)
return
consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)
if assignment:
self.log.info("Seek stream partitions to committed offsets.")
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)
self.completed.set()
self.log.dev("Resume stream partitions")
consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)
consumer.resume_flow()
self.app.flow_control.resume()
# finally make sure the fetcher is running.
Expand Down

0 comments on commit 6f3c783

Please sign in to comment.