Skip to content

Commit

Permalink
Memory store repartitioning behavior (faust-streaming#199)
Browse files Browse the repository at this point in the history
* Fix memory store with changelog topic repartition

* Added test for multi partitions on one worker

Co-authored-by: Roman Kornev <me@romanius.me>
Co-authored-by: Taybin Rutkin <taybin@users.noreply.github.com>
Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 26, 2021
1 parent 510051b commit 654b648
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
21 changes: 20 additions & 1 deletion faust/stores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple

from faust.types import TP, EventT
from faust.types.stores import KT, VT

from . import base


class Store(base.Store):
class Store(base.Store, base.StoreT[KT, VT]):
"""Table storage using an in-memory dictionary."""

def __post_init__(self) -> None:
self.data: MutableMapping = {}
self._key_partition: MutableMapping[KT, int] = {}

def _clear(self) -> None:
self.data.clear()
Expand All @@ -34,6 +36,7 @@ def apply_changelog_batch(
# If the key was assigned a value again, it will not be deleted.
if not self.data[key]:
delete_key(key, None)
self._key_partition.pop(key, None)

def _create_batch_iterator(
self,
Expand All @@ -47,8 +50,24 @@ def _create_batch_iterator(
# to delete keys in the table we set the raw value to None
if event.message.value is None:
mark_as_delete(key)
self._key_partition[key] = event.message.partition
yield key, to_value(event.value)

async def on_recovery_completed(
self, active_tps: Set[TP], standby_tps: Set[TP]
) -> None:
partitions = {partition for _, partition in active_tps}

delete_keys = []

for k, p in self._key_partition.items():
if p not in partitions:
delete_keys.append(k)

for k in delete_keys:
self.data.pop(k, None)
self._key_partition.pop(k, None)

def persisted_offset(self, tp: TP) -> Optional[int]:
"""Return the persisted offset.
Expand Down
64 changes: 61 additions & 3 deletions tests/unit/stores/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,68 @@ def test_apply_changelog_batch__deletes_key_and_reassign_it(self, *, store):
store.apply_changelog_batch(events, to_key=to_key, to_value=to_value)
assert to_key() in store.data

def mock_event_to_key_value(self, key=b"key", value=b"value"):
event = self.mock_event(key=key, value=value)
def test_apply_changelog_batch__different_partitions(self, *, store):
events = [
self.mock_event(key=f"key-{i}".encode(), partition=i) for i in range(2)
]
to_key, to_value = self.mock_to_key_value_multi(events)

store.apply_changelog_batch(events, to_key=to_key, to_value=to_value)

assert to_key.call_args_list[0][0][0] in store.data
assert to_key.call_args_list[1][0][0] in store.data

assert store._key_partition.get(to_key.call_args_list[0][0][0]) == 0
assert store._key_partition.get(to_key.call_args_list[1][0][0]) == 1

def test_apply_changelog_batch__different_partitions_deletion(self, *, store):
self.test_apply_changelog_batch__different_partitions(store=store)

events = [
self.mock_event(key=f"key-{i}".encode(), value=None, partition=i)
for i in range(2)
]
to_key, to_value = self.mock_to_key_value_multi(events)

store.apply_changelog_batch(events, to_key=to_key, to_value=to_value)

assert not store._key_partition
assert not store.data

@pytest.mark.asyncio
async def test_apply_changelog_batch__different_partitions_repartition_single(
self, *, store
):
self.test_apply_changelog_batch__different_partitions(store=store)

await store.on_recovery_completed({TP("foo", 0)}, set())

assert len(store.data) == 1
assert len(store._key_partition) == 1

@pytest.mark.asyncio
async def test_apply_changelog_batch__different_partitions_repartition_multi(
self, *, store
):
self.test_apply_changelog_batch__different_partitions(store=store)

await store.on_recovery_completed({TP("foo", 0), TP("foo", 1)}, set())

assert len(store.data) == 2
assert len(store._key_partition) == 2

def mock_event_to_key_value(self, key=b"key", value=b"value", partition=0):
event = self.mock_event(key=key, value=value, partition=partition)
to_key, to_value = self.mock_to_key_value(event)
return event, to_key, to_value

def mock_event(self, key=b"key", value=b"value"):
def mock_event(self, key=b"key", value=b"value", partition=0):
event = Mock(name="event", autospec=Event)
event.key = key
event.value = value
event.message.key = key
event.message.value = value
event.message.partition = partition
return event

def mock_to_key_value(self, event):
Expand All @@ -62,6 +113,13 @@ def mock_to_key_value(self, event):
to_value.return_value = event.value
return to_key, to_value

def mock_to_key_value_multi(self, events):
to_key = Mock(name="to_key")
to_key.side_effect = [e.key for e in events]
to_value = Mock(name="to_value")
to_value.side_effect = [e.value for e in events]
return to_key, to_value

def test_persisted_offset(self, *, store):
assert store.persisted_offset(TP("foo", 0)) is None

Expand Down

0 comments on commit 654b648

Please sign in to comment.