Skip to content

Commit

Permalink
Revert cancel getmany and add extra seek (faust-streaming#256)
Browse files Browse the repository at this point in the history
* Revert cancel getmany and add extra seek

* Revert test

* Modifications for tests

Co-authored-by: Eric Kerstens <ekerstens@expediagroup.com>
  • Loading branch information
ekerstens and Eric Kerstens authored Jan 11, 2022
1 parent 84d58fd commit a7256eb
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 46 deletions.
2 changes: 0 additions & 2 deletions faust/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1681,8 +1681,6 @@ async def _on_partitions_revoked(self, revoked: Set[TP]) -> None:
T(self.flow_control.suspend)()
on_timeout.info("consumer.pause_partitions")
T(consumer.pause_partitions)(assignment)
on_timeout.info("consumer.wait_for_stopped_flow")
await T(consumer.wait_for_stopped_flow)()

# Every agent instance has an incoming buffer of messages
# (a asyncio.Queue) -- we clear those to make sure
Expand Down
67 changes: 25 additions & 42 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ class Consumer(Service, ConsumerT):
flow_active: bool = True
can_resume_flow: Event
suspend_flow: Event
not_waiting_next_records: Event

def __init__(
self,
Expand Down Expand Up @@ -478,8 +477,6 @@ def __init__(
self.randomly_assigned_topics = set()
self.can_resume_flow = Event()
self.suspend_flow = Event()
self.not_waiting_next_records = Event()
self.not_waiting_next_records.set()
self._reset_state()
super().__init__(loop=loop or self.transport.loop, **kwargs)
self.transactions = self.transport.create_transaction_manager(
Expand All @@ -503,7 +500,6 @@ def _reset_state(self) -> None:
self._buffered_partitions = set()
self.can_resume_flow.clear()
self.suspend_flow.clear()
self.not_waiting_next_records.set()
self.flow_active = True
self._time_start = monotonic()

Expand Down Expand Up @@ -585,11 +581,6 @@ def stop_flow(self) -> None:
self.can_resume_flow.clear()
self.suspend_flow.set()

async def wait_for_stopped_flow(self) -> None:
"""Wait until the consumer is not waiting on any newly fetched records."""
if not self.not_waiting_next_records.is_set():
await self.not_waiting_next_records.wait()

def resume_flow(self) -> None:
"""Allow consumer to process messages."""
self.flow_active = True
Expand Down Expand Up @@ -742,48 +733,40 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]:
self.app.monitor.track_tp_end_offset(tp, highwater_mark)
# convert timestamp to seconds from int milliseconds.
yield tp, to_message(tp, record)
else:
self.log.dev(
"getmany called while flow not active. Seek back to committed offsets."
)
await self.perform_seek()

async def _wait_next_records(
self, timeout: float
) -> Tuple[Optional[RecordMap], Optional[Set[TP]]]:
if not self.flow_active:
await self.wait(self.can_resume_flow)

# Implementation for the Fetcher service.
try:
self.not_waiting_next_records.clear()
is_client_only = self.app.client_only

is_client_only = self.app.client_only
active_partitions: Optional[Set[TP]]
if is_client_only:
active_partitions = None
else:
active_partitions = self._get_active_partitions()

active_partitions: Optional[Set[TP]]
if is_client_only:
active_partitions = None
else:
active_partitions = self._get_active_partitions()

records: RecordMap = {}
if is_client_only or active_partitions:
# Fetch records only if active partitions to avoid the risk of
# fetching all partitions in the beginning when none of the
# partitions is paused/resumed.
suspend_flow = self.suspend_flow.wait()
getmany = self._getmany(
active_partitions=active_partitions,
timeout=timeout,
)
wait_results = await self.wait_first(getmany, suspend_flow)
for coro, result in zip(wait_results.done, wait_results.results):
# Ignore records fetched while flow was suspended
if coro is suspend_flow:
records = {}
break
if coro is getmany:
records = result
else:
# We should still release to the event loop
await self.sleep(1)
return records, active_partitions
finally:
self.not_waiting_next_records.set()
records: RecordMap = {}
if is_client_only or active_partitions:
# Fetch records only if active partitions to avoid the risk of
# fetching all partitions in the beginning when none of the
# partitions is paused/resumed.
records = await self._getmany(
active_partitions=active_partitions,
timeout=timeout,
)
else:
# We should still release to the event loop
await self.sleep(1)
return records, active_partitions

@abc.abstractmethod
def _to_message(self, tp: TP, record: Any) -> ConsumerMessage:
Expand Down
1 change: 0 additions & 1 deletion tests/unit/app/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ async def test_on_partitions_revoked(self, *, app):
transactions=Mock(
on_partitions_revoked=AsyncMock(),
),
wait_for_stopped_flow=AsyncMock(),
)
app.tables = Mock()
app.flow_control = Mock()
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def _to_message(self, *args, **kwargs):
...

async def seek_to_committed(self, *args, **kwargs):
...
return {}

async def seek_wait(self, *args, **kwargs):
...
Expand Down Expand Up @@ -544,6 +544,7 @@ def to_message(tp, record):
assert not consumer.should_stop
consumer.flow_active = False
consumer.can_resume_flow.set()
# Test is hanging here
assert [a async for a in consumer.getmany(1.0)] == []
assert not consumer.should_stop
consumer.flow_active = True
Expand Down

0 comments on commit a7256eb

Please sign in to comment.