Skip to content

Commit

Permalink
Revert "fix for out of order events (faust-streaming#228)" (faust-str…
Browse files Browse the repository at this point in the history
…eaming#232)

This reverts commit 7e3c60c
  • Loading branch information
patkivikram authored Dec 3, 2021
1 parent 03122ca commit 45015a5
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ class Consumer(Service, ConsumerT):

flow_active: bool = True
can_resume_flow: Event
suspend_flow: Event

def __init__(
self,
Expand Down Expand Up @@ -475,6 +476,7 @@ def __init__(
self._end_offset_monitor_interval = self.commit_interval * 2
self.randomly_assigned_topics = set()
self.can_resume_flow = Event()
self.suspend_flow = Event()
self._reset_state()
super().__init__(loop=loop or self.transport.loop, **kwargs)
self.transactions = self.transport.create_transaction_manager(
Expand All @@ -497,6 +499,7 @@ def _reset_state(self) -> None:
self._paused_partitions = set()
self._buffered_partitions = set()
self.can_resume_flow.clear()
self.suspend_flow.clear()
self.flow_active = True
self._time_start = monotonic()

Expand Down Expand Up @@ -573,11 +576,13 @@ def stop_flow(self) -> None:
"""Block consumer from processing any more messages."""
self.flow_active = False
self.can_resume_flow.clear()
self.suspend_flow.set()

def resume_flow(self) -> None:
"""Allow consumer to process messages."""
self.flow_active = True
self.can_resume_flow.set()
self.suspend_flow.clear()

def pause_partitions(self, tps: Iterable[TP]) -> None:
"""Pause fetching from partitions."""
Expand Down Expand Up @@ -1120,7 +1125,9 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
if self._n_acked >= commit_every:
self._n_acked = 0
await self.commit()
await callback(message)
await self.wait_first(
callback(message), self.suspend_flow.wait()
)
set_read_offset(tp, offset)
else:
self.log.dev(
Expand Down

0 comments on commit 45015a5

Please sign in to comment.