Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Producer hangs on send pending check with thread producer enabled #270

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions faust/transport/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ async def flush_atmost(self, max_messages: Optional[int]) -> int:
(max_messages is None or flushed_messages < max_messages)
):
self.message_sent.clear()
await self.message_sent.wait()
flushed_messages += 1
try:
await asyncio.wait_for(self.message_sent.wait(), timeout=0.1)
flushed_messages += 1
except asyncio.TimeoutError:
return flushed_messages
else:
return flushed_messages

Expand Down
51 changes: 51 additions & 0 deletions tests/unit/transport/test_producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from typing import Any
from unittest.mock import PropertyMock

import pytest
from mode.utils.mocks import AsyncMock, Mock, call
Expand Down Expand Up @@ -146,6 +148,55 @@ async def _inner():
assert (await buf.flush_atmost(6)) == 6
assert not buf.size

@pytest.mark.asyncio
async def test_flush_atmost_with_simulated_threaded_behavior(self, *, buf):
def create_send_pending_mock(max_messages):
sent_messages = 0

async def _inner(*args: Any):
nonlocal sent_messages
if sent_messages < max_messages:
sent_messages += 1
return
else:
await asyncio.Future()

return _inner

buf._send_pending = create_send_pending_mock(10)

class WaitForEverEvent(asyncio.Event):
test_stopped: bool = False

def stop_test(self) -> None:
self.test_stopped = True

async def wait(self) -> None:
while not self.test_stopped:
await asyncio.sleep(1.0)

wait_for_event = buf.message_sent = WaitForEverEvent()
await buf.start()

try:
original_size_property = buf.__class__.size
buf.__class__.size = PropertyMock(return_value=10)
waiting = buf.flush_atmost(10)
loop = asyncio.get_event_loop()
task = loop.create_task(waiting)
await asyncio.sleep(0)
assert (
not task.done()
), "Task has completed even though not all events have been issued"
buf.__class__.size = PropertyMock(return_value=0)
await asyncio.sleep(0.2)
assert task.done(), "Task has not been completed even though queue is None"
assert isinstance(task.result(), int)
finally:
wait_for_event.stop_test()
buf.__class__.size = original_size_property
await buf.stop()


class ProducerTests:
@pytest.mark.asyncio
Expand Down