From 654b648873467eb8ab7104a03e3e74ac9df079c6 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 26 Oct 2021 23:22:28 +0600 Subject: [PATCH] Memory store repartitioning behavior (#199) * Fix memory store with changelog topic repartition * Added test for multi partitions on one worker Co-authored-by: Roman Kornev Co-authored-by: Taybin Rutkin Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com> --- faust/stores/memory.py | 21 ++++++++++- tests/unit/stores/test_memory.py | 64 ++++++++++++++++++++++++++++++-- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/faust/stores/memory.py b/faust/stores/memory.py index 5f34c5005..d0aefc6b7 100644 --- a/faust/stores/memory.py +++ b/faust/stores/memory.py @@ -2,15 +2,17 @@ from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple from faust.types import TP, EventT +from faust.types.stores import KT, VT from . import base -class Store(base.Store): +class Store(base.Store, base.StoreT[KT, VT]): """Table storage using an in-memory dictionary.""" def __post_init__(self) -> None: self.data: MutableMapping = {} + self._key_partition: MutableMapping[KT, int] = {} def _clear(self) -> None: self.data.clear() @@ -34,6 +36,7 @@ def apply_changelog_batch( # If the key was assigned a value again, it will not be deleted. if not self.data[key]: delete_key(key, None) + self._key_partition.pop(key, None) def _create_batch_iterator( self, @@ -47,8 +50,24 @@ def _create_batch_iterator( # to delete keys in the table we set the raw value to None if event.message.value is None: mark_as_delete(key) + self._key_partition[key] = event.message.partition yield key, to_value(event.value) + async def on_recovery_completed( + self, active_tps: Set[TP], standby_tps: Set[TP] + ) -> None: + partitions = {partition for _, partition in active_tps} + + delete_keys = [] + + for k, p in self._key_partition.items(): + if p not in partitions: + delete_keys.append(k) + + for k in delete_keys: + self.data.pop(k, None) + self._key_partition.pop(k, None) + def persisted_offset(self, tp: TP) -> Optional[int]: """Return the persisted offset. diff --git a/tests/unit/stores/test_memory.py b/tests/unit/stores/test_memory.py index 5b38575f7..a32f99f31 100644 --- a/tests/unit/stores/test_memory.py +++ b/tests/unit/stores/test_memory.py @@ -42,17 +42,68 @@ def test_apply_changelog_batch__deletes_key_and_reassign_it(self, *, store): store.apply_changelog_batch(events, to_key=to_key, to_value=to_value) assert to_key() in store.data - def mock_event_to_key_value(self, key=b"key", value=b"value"): - event = self.mock_event(key=key, value=value) + def test_apply_changelog_batch__different_partitions(self, *, store): + events = [ + self.mock_event(key=f"key-{i}".encode(), partition=i) for i in range(2) + ] + to_key, to_value = self.mock_to_key_value_multi(events) + + store.apply_changelog_batch(events, to_key=to_key, to_value=to_value) + + assert to_key.call_args_list[0][0][0] in store.data + assert to_key.call_args_list[1][0][0] in store.data + + assert store._key_partition.get(to_key.call_args_list[0][0][0]) == 0 + assert store._key_partition.get(to_key.call_args_list[1][0][0]) == 1 + + def test_apply_changelog_batch__different_partitions_deletion(self, *, store): + self.test_apply_changelog_batch__different_partitions(store=store) + + events = [ + self.mock_event(key=f"key-{i}".encode(), value=None, partition=i) + for i in range(2) + ] + to_key, to_value = self.mock_to_key_value_multi(events) + + store.apply_changelog_batch(events, to_key=to_key, to_value=to_value) + + assert not store._key_partition + assert not store.data + + @pytest.mark.asyncio + async def test_apply_changelog_batch__different_partitions_repartition_single( + self, *, store + ): + self.test_apply_changelog_batch__different_partitions(store=store) + + await store.on_recovery_completed({TP("foo", 0)}, set()) + + assert len(store.data) == 1 + assert len(store._key_partition) == 1 + + @pytest.mark.asyncio + async def test_apply_changelog_batch__different_partitions_repartition_multi( + self, *, store + ): + self.test_apply_changelog_batch__different_partitions(store=store) + + await store.on_recovery_completed({TP("foo", 0), TP("foo", 1)}, set()) + + assert len(store.data) == 2 + assert len(store._key_partition) == 2 + + def mock_event_to_key_value(self, key=b"key", value=b"value", partition=0): + event = self.mock_event(key=key, value=value, partition=partition) to_key, to_value = self.mock_to_key_value(event) return event, to_key, to_value - def mock_event(self, key=b"key", value=b"value"): + def mock_event(self, key=b"key", value=b"value", partition=0): event = Mock(name="event", autospec=Event) event.key = key event.value = value event.message.key = key event.message.value = value + event.message.partition = partition return event def mock_to_key_value(self, event): @@ -62,6 +113,13 @@ def mock_to_key_value(self, event): to_value.return_value = event.value return to_key, to_value + def mock_to_key_value_multi(self, events): + to_key = Mock(name="to_key") + to_key.side_effect = [e.key for e in events] + to_value = Mock(name="to_value") + to_value.side_effect = [e.value for e in events] + return to_key, to_value + def test_persisted_offset(self, *, store): assert store.persisted_offset(TP("foo", 0)) is None