diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index 837d422f5..f567ab81a 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -72,6 +72,7 @@ from weakref import WeakSet from aiokafka.errors import ProducerFenced +from intervaltree import IntervalTree from mode import Service, ServiceT, flight_recorder, get_logger from mode.threads import MethodQueue, QueueServiceThread from mode.utils.futures import notify @@ -392,7 +393,7 @@ class Consumer(Service, ConsumerT): consumer_stopped_errors: ClassVar[Tuple[Type[BaseException], ...]] = () # Mapping of TP to list of gap in offsets. - _gap: MutableMapping[TP, List[int]] + _gap: MutableMapping[TP, IntervalTree] # Mapping of TP to list of acked offsets. _acked: MutableMapping[TP, List[int]] @@ -465,7 +466,7 @@ def __init__( commit_livelock_soft_timeout or self.app.conf.broker_commit_livelock_soft_timeout ) - self._gap = defaultdict(list) + self._gap = defaultdict(IntervalTree) self._acked = defaultdict(list) self._acked_index = defaultdict(set) self._read_offset = defaultdict(lambda: None) @@ -1087,15 +1088,22 @@ def _new_offset(self, tp: TP) -> Optional[int]: # the return value will be: 37 if acked: max_offset = max(acked) - gap_for_tp = self._gap[tp] + gap_for_tp: IntervalTree = self._gap[tp] if gap_for_tp: - gap_index = next( - (i for i, x in enumerate(gap_for_tp) if x > max_offset), - len(gap_for_tp), - ) - gaps = gap_for_tp[:gap_index] - acked.extend(gaps) - gap_for_tp[:gap_index] = [] + # find all the ranges up to the max of acked, add them in to acked, + # and chop them off the gap. + candidates = gap_for_tp.overlap(0, max_offset) + # note: merge_overlaps will sort the intervaltree and will ensure that + # the intervals left over don't overlap each other. So can sort by their + # start without worrying about ends overlapping. + sorted_candidates = sorted(candidates, key=lambda x: x.begin) + if sorted_candidates: + stuff_to_add = [] + for entry in sorted_candidates: + stuff_to_add.extend(range(entry.begin, entry.end)) + new_max_offset = max(stuff_to_add[-1], max_offset + 1) + acked.extend(stuff_to_add) + gap_for_tp.chop(0, new_max_offset) acked.sort() # We iterate over it until we handle gap in the head of acked queue @@ -1123,12 +1131,18 @@ async def on_task_error(self, exc: BaseException) -> None: """Call when processing a message failed.""" await self.commit() - def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None: + async def _add_gap(self, tp: TP, offset_from: int, offset_to: int) -> None: committed = self._committed_offset[tp] gap_for_tp = self._gap[tp] - for offset in range(offset_from, offset_to): - if committed is None or offset > committed: - gap_for_tp.append(offset) + if committed is not None: + offset_from = max(offset_from, committed + 1) + # intervaltree intervals exclude the end + if offset_from <= offset_to: + gap_for_tp.addi(offset_from, offset_to + 1) + # sleep 0 to allow other coroutines to get some loop time + # for example, to answer health checks while building the gap + await asyncio.sleep(0) + gap_for_tp.merge_overlaps() async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover # This is the background thread started by Fetcher, used to @@ -1175,7 +1189,7 @@ async def _drain_messages(self, fetcher: ServiceT) -> None: # pragma: no cover if gap > 1 and r_offset: acks_enabled = acks_enabled_for(message.topic) if acks_enabled: - self._add_gap(tp, r_offset + 1, offset) + await self._add_gap(tp, r_offset + 1, offset) if commit_every is not None: if self._n_acked >= commit_every: self._n_acked = 0 diff --git a/requirements/dist.txt b/requirements/dist.txt index f41b125b9..3a65d05a2 100644 --- a/requirements/dist.txt +++ b/requirements/dist.txt @@ -12,3 +12,4 @@ tox>=2.3.1 twine vulture wheel>=0.29.0 +intervaltree diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 4332e12a9..96cfa9040 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -9,3 +9,4 @@ yarl>=1.0,<2.0 croniter>=0.3.16 mypy_extensions venusian==3.0.0 +intervaltree diff --git a/requirements/test.txt b/requirements/test.txt index 85d8c1412..639b6377c 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -21,6 +21,7 @@ codecov bandit==1.6.2 twine wheel +intervaltree -r requirements.txt -r extras/datadog.txt -r extras/redis.txt diff --git a/tests/unit/transport/test_consumer.py b/tests/unit/transport/test_consumer.py index 2fe4e8d90..9d5439756 100644 --- a/tests/unit/transport/test_consumer.py +++ b/tests/unit/transport/test_consumer.py @@ -1,6 +1,7 @@ import asyncio import pytest +from intervaltree import Interval, IntervalTree from mode import Service from mode.threads import MethodQueue from mode.utils.futures import done_future @@ -1075,13 +1076,15 @@ def test_new_offset(self, tp, acked, expected_offset, expected_acked, *, consume "tp,acked,gaps,expected_offset", [ (TP1, [], [], None), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [], 11), - (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], [9], 11), - (TP1, [1, 2, 3, 4, 6, 7, 8, 10], [5], 9), - (TP1, [1, 3, 4, 6, 7, 8, 10], [2, 5, 9], 11), - (TP1, [3, 4], [], None), - (TP1, [3, 4], [2], None), - (TP1, [3, 4], [1, 2], 5), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], IntervalTree(), 11), + (TP1, [1, 2, 3, 4, 5, 6, 7, 8, 10], IntervalTree([Interval(9, 10)]), 11), + (TP1, [1, 2, 3, 4, 6, 7, 8, 10], IntervalTree([Interval(5, 6)]), 9), + ( + TP1, + [1, 3, 4, 6, 7, 8, 10], + IntervalTree([Interval(2, 3), Interval(5, 6), Interval(9, 10)]), + 11, + ), ], ) def test_new_offset_with_gaps(self, tp, acked, gaps, expected_offset, *, consumer): @@ -1096,19 +1099,21 @@ async def test_on_task_error(self, *, consumer): await consumer.on_task_error(KeyError()) consumer.commit.assert_called_once_with() - def test__add_gap(self, *, consumer): + @pytest.mark.asyncio + async def test__add_gap(self, *, consumer): tp = TP1 consumer._committed_offset[tp] = 299 - consumer._add_gap(TP1, 300, 343) + await consumer._add_gap(TP1, 300, 343) - assert consumer._gap[tp] == list(range(300, 343)) + assert consumer._gap[tp] == IntervalTree([Interval(300, 344)]) - def test__add_gap__previous_to_committed(self, *, consumer): + @pytest.mark.asyncio + async def test__add_gap__previous_to_committed(self, *, consumer): tp = TP1 consumer._committed_offset[tp] = 400 - consumer._add_gap(TP1, 300, 343) + await consumer._add_gap(TP1, 300, 343) - assert consumer._gap[tp] == [] + assert consumer._gap[tp] == IntervalTree() @pytest.mark.asyncio async def test_commit_handler(self, *, consumer):