Skip to content

Commit

Permalink
fix race condition when buffers are full (faust-streaming#237)
Browse files Browse the repository at this point in the history
* fix race condition when buffers are full

* fix race condition when buffers are full

* Fix error messages in faust app faust-streaming#166
  • Loading branch information
patkivikram authored Dec 13, 2021
1 parent 9a298ac commit 7d861dc
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
8 changes: 6 additions & 2 deletions faust/_cython/streams.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,17 @@ cdef class StreamIterator:
offset = message.offset
consumer = self.consumer

if message.generation_id != self.app.consumer_generation_id:
if (
not self.app.flow_control.is_active()
or message.generation_id != self.app.consumer_generation_id
):
self.app.log.dev(
"Skipping message %r with generation_id %r because "
"app generation_id is %r",
"app generation_id is %r flow control.is_active %r",
message,
message.generation_id,
self.app.consumer_generation_id,
self.app.flow_control.is_active()
)
return None, self._skipped_value, stream_state

Expand Down
8 changes: 6 additions & 2 deletions faust/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,14 +974,18 @@ async def _py_aiter(self) -> AsyncIterator[T_co]:
tp = message.tp
offset = message.offset

if message.generation_id != self.app.consumer_generation_id:
if (
not self.app.flow_control.is_active()
or message.generation_id != self.app.consumer_generation_id
):
value = skipped_value
self.log.dev(
"Skipping message %r with generation_id %r because "
"app generation_id is %r",
"app generation_id is %r flow_control.is_active %r",
message,
message.generation_id,
self.app.consumer_generation_id,
self.app.flow_control.is_active(),
)
break
if topic in acking_topics and not message.tracked:
Expand Down

0 comments on commit 7d861dc

Please sign in to comment.