Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert cancel getmany and add extra seek #256

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Modifications for tests
  • Loading branch information
Eric Kerstens committed Jan 11, 2022
commit 66edcb3dd3a381942507e6f5ff9318c01cb6a4a7
6 changes: 1 addition & 5 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,11 +742,7 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]:
async def _wait_next_records(
self, timeout: float
) -> Tuple[Optional[RecordMap], Optional[Set[TP]]]:
while True:
# can_resume_flow may return even if flow went inactive -> active ->
# inactive. Check that flow is actually active after can_resume_flow is set.
if self.flow_active:
break
if not self.flow_active:
await self.wait(self.can_resume_flow)

# Implementation for the Fetcher service.
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def _to_message(self, *args, **kwargs):
...

async def seek_to_committed(self, *args, **kwargs):
...
return {}

async def seek_wait(self, *args, **kwargs):
...
Expand Down Expand Up @@ -544,6 +544,7 @@ def to_message(tp, record):
assert not consumer.should_stop
consumer.flow_active = False
consumer.can_resume_flow.set()
# Test is hanging here
assert [a async for a in consumer.getmany(1.0)] == []
assert not consumer.should_stop
consumer.flow_active = True
Expand Down