Skip to content

Commit

Permalink
fix for out of order events (faust-streaming#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram authored Nov 30, 2021
1 parent 02afc3c commit 7e3c60c
Showing 1 changed file with 1 addition and 8 deletions.
9 changes: 1 addition & 8 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,6 @@ class Consumer(Service, ConsumerT):

flow_active: bool = True
can_resume_flow: Event
suspend_flow: Event

def __init__(
self,
Expand Down Expand Up @@ -476,7 +475,6 @@ def __init__(
self._end_offset_monitor_interval = self.commit_interval * 2
self.randomly_assigned_topics = set()
self.can_resume_flow = Event()
self.suspend_flow = Event()
self._reset_state()
super().__init__(loop=loop or self.transport.loop, **kwargs)
self.transactions = self.transport.create_transaction_manager(
Expand All @@ -499,7 +497,6 @@ def _reset_state(self) -> None:
self._paused_partitions = set()
self._buffered_partitions = set()
self.can_resume_flow.clear()
self.suspend_flow.clear()
self.flow_active = True
self._time_start = monotonic()

Expand Down Expand Up @@ -576,13 +573,11 @@ def stop_flow(self) -> None:
"""Block consumer from processing any more messages."""
self.flow_active = False
self.can_resume_flow.clear()
self.suspend_flow.set()

def resume_flow(self) -> None:
"""Allow consumer to process messages."""
self.flow_active = True
self.can_resume_flow.set()
self.suspend_flow.clear()

def pause_partitions(self, tps: Iterable[TP]) -> None:
"""Pause fetching from partitions."""
Expand Down Expand Up @@ -1125,9 +1120,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
if self._n_acked >= commit_every:
self._n_acked = 0
await self.commit()
await self.wait_first(
callback(message), self.suspend_flow.wait()
)
await callback(message)
set_read_offset(tp, offset)
else:
self.log.dev(
Expand Down

0 comments on commit 7e3c60c

Please sign in to comment.