Skip to content

Commit

Permalink
fix for reassign Table Keys faust-streaming#171 (faust-streaming#174)
Browse files Browse the repository at this point in the history
* fix for reassign Table Keys faust-streaming#171

* change comment, because line was to long for linter (< 88 characters)

* add unittest to check "insert > delete > insert" for a key in table
  • Loading branch information
PJ-Schulz authored Jul 23, 2021
1 parent fe343c0 commit c1a6b0e
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
5 changes: 3 additions & 2 deletions faust/stores/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def apply_changelog_batch(
self._create_batch_iterator(to_delete.add, to_key, to_value, batch)
)
for key in to_delete:
delete_key(key, None)
# If the key was assigned a value again, it will not be deleted.
if not self.data[key]:
delete_key(key, None)

def _create_batch_iterator(
self,
Expand All @@ -45,7 +47,6 @@ def _create_batch_iterator(
# to delete keys in the table we set the raw value to None
if event.message.value is None:
mark_as_delete(key)
continue
yield key, to_value(event.value)

def persisted_offset(self, tp: TP) -> Optional[int]:
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/stores/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ def test_apply_changelog_batch__deletes_key_for_None_value(self, *, store):

assert to_key() not in store.data

def test_apply_changelog_batch__deletes_key_and_reassign_it(self, *, store):
self.test_apply_changelog_batch__deletes_key_for_None_value(store=store)

events = [self.mock_event(value=value) for value in ("v1", None, "v2")]
to_key, to_value = self.mock_to_key_value(events[0])

store.apply_changelog_batch(events, to_key=to_key, to_value=to_value)
assert to_key() in store.data

def mock_event_to_key_value(self, key=b"key", value=b"value"):
event = self.mock_event(key=key, value=value)
to_key, to_value = self.mock_to_key_value(event)
Expand Down

0 comments on commit c1a6b0e

Please sign in to comment.