From 5ee04253c1e2d33e3bad9d8c74422cf468b304b0 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 9 Dec 2022 11:19:47 +0000 Subject: [PATCH] Add optional stream ID generator to the `StreamChangeCache` This will assert that any changes being flagged have tokens ahead of the current known position. If the change is at or behind the current token there is a race condition during which changes may be incorrectly missed, this guards against that. --- synapse/storage/databases/main/events_worker.py | 5 ++++- synapse/util/caches/stream_change_cache.py | 9 +++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index cb281eccdc1c..b0ee2073bab7 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -250,9 +250,12 @@ def __init__( "EventsRoomStreamChangeCache", min_event_val, prefilled_cache=event_cache_prefill, + stream_id_gen=self._stream_id_gen, ) self._membership_stream_cache = StreamChangeCache( - "MembershipStreamChangeCache", events_max + "MembershipStreamChangeCache", + events_max, + stream_id_gen=self._stream_id_gen, ) if hs.config.worker.run_background_tasks: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 165745954990..00d799c489da 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -20,6 +20,7 @@ from sortedcontainers import SortedDict from synapse.util import caches +from synapse.storage.util.id_generators import AbstractStreamIdTracker logger = logging.getLogger(__name__) @@ -70,6 +71,7 @@ def __init__( current_stream_pos: int, max_size: int = 10000, prefilled_cache: Optional[Mapping[EntityType, int]] = None, + stream_id_gen: Optional[AbstractStreamIdTracker] = None, ) -> None: self._original_max_size: int = max_size self._max_size = math.floor(max_size) @@ -92,6 +94,8 @@ def __init__( "cache", self.name, self._cache, resize_callback=self.set_cache_factor ) + self.stream_id_gen = stream_id_gen + if prefilled_cache: for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) @@ -271,6 +275,11 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: if stream_pos <= self._earliest_known_stream_pos: return + # Any change being flagged must be ahead of any current token, otherwise + # we have a race condition between token position and stream change cache. + if self.stream_id_gen: + assert stream_pos > self.stream_id_gen.get_current_token() + old_pos = self._entity_to_key.get(entity, None) if old_pos is not None: if old_pos >= stream_pos: