Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add optional stream ID generator to the StreamChangeCache
Browse files Browse the repository at this point in the history
This will assert that any changes being flagged have tokens ahead of
the current known position. If the change is at or behind the current
token there is a race condition during which changes may be incorrectly
missed, this guards against that.
  • Loading branch information
Fizzadar committed Dec 9, 2022
1 parent 3dea38b commit 5ee0425
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,12 @@ def __init__(
"EventsRoomStreamChangeCache",
min_event_val,
prefilled_cache=event_cache_prefill,
stream_id_gen=self._stream_id_gen,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max
"MembershipStreamChangeCache",
events_max,
stream_id_gen=self._stream_id_gen,
)

if hs.config.worker.run_background_tasks:
Expand Down
9 changes: 9 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sortedcontainers import SortedDict

from synapse.util import caches
from synapse.storage.util.id_generators import AbstractStreamIdTracker

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(
current_stream_pos: int,
max_size: int = 10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
stream_id_gen: Optional[AbstractStreamIdTracker] = None,
) -> None:
self._original_max_size: int = max_size
self._max_size = math.floor(max_size)
Expand All @@ -92,6 +94,8 @@ def __init__(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
)

self.stream_id_gen = stream_id_gen

if prefilled_cache:
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
Expand Down Expand Up @@ -271,6 +275,11 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
if stream_pos <= self._earliest_known_stream_pos:
return

# Any change being flagged must be ahead of any current token, otherwise
# we have a race condition between token position and stream change cache.
if self.stream_id_gen:
assert stream_pos > self.stream_id_gen.get_current_token()

old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
if old_pos >= stream_pos:
Expand Down

0 comments on commit 5ee0425

Please sign in to comment.