Skip to content

Commit

Permalink
adding intervaltree to manage gaps in topics to prevent OOM
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram committed Mar 9, 2022
1 parent 637979f commit 54fa365
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
7 changes: 5 additions & 2 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1119,14 +1119,17 @@ async def on_task_error(self, exc: BaseException) -> None:
"""Call when processing a message failed."""
await self.commit()

def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
async def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None:
committed = self._committed_offset[tp]
gap_for_tp = self._gap[tp]
if committed is not None:
offset_from = max(offset_from, committed + 1)
# intervaltree intervals exclude the end
if offset_from <= offset_to:
gap_for_tp.addi(offset_from, offset_to + 1)
# sleep 0 to allow other coroutines to get some loop time
# for example, to answer health checks while building the gap
await asyncio.sleep(0)
gap_for_tp.merge_overlaps()

async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
Expand Down Expand Up @@ -1174,7 +1177,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover
if gap > 1 and r_offset:
acks_enabled = acks_enabled_for(message.topic)
if acks_enabled:
self._add_gap(tp, r_offset + 1, offset)
await self._add_gap(tp, r_offset + 1, offset)
if commit_every is not None:
if self._n_acked >= commit_every:
self._n_acked = 0
Expand Down
10 changes: 6 additions & 4 deletions tests/unit/transport/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1098,17 +1098,19 @@ async def test_on_task_error(self, *, consumer):
await consumer.on_task_error(KeyError())
consumer.commit.assert_called_once_with()

def test__add_gap(self, *, consumer):
@pytest.mark.asyncio
async def test__add_gap(self, *, consumer):
tp = TP1
consumer._committed_offset[tp] = 299
consumer._add_gap(TP1, 300, 343)
await consumer._add_gap(TP1, 300, 343)

assert consumer._gap[tp] == IntervalTree([Interval(300, 344)])

def test__add_gap__previous_to_committed(self, *, consumer):
@pytest.mark.asyncio
async def test__add_gap__previous_to_committed(self, *, consumer):
tp = TP1
consumer._committed_offset[tp] = 400
consumer._add_gap(TP1, 300, 343)
await consumer._add_gap(TP1, 300, 343)

assert consumer._gap[tp] == IntervalTree()

Expand Down

0 comments on commit 54fa365

Please sign in to comment.