Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update events stream cache before ID generator #14648

Closed
1 change: 1 addition & 0 deletions changelog.d/14648.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix events stream change cache and stream ID update order. Contributed by Nick @ Beeper (@fizzadar).
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ async def get_forward_extremities_for_room_at_stream_ordering(
"""
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) # type: ignore[attr-defined]
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)

# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
Expand Down
36 changes: 32 additions & 4 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,7 @@ def __init__(
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
#
# If this process is the writer than we need to use
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
# SQLite/StreamIdGenerator only support a single writer instance (is_writer)
self._stream_id_gen = StreamIdGenerator(
db_conn,
"events",
Expand Down Expand Up @@ -242,6 +239,25 @@ def __init__(
prefilled_cache=curr_state_delta_prefill,
)

event_cache_prefill, min_event_val = self.db_pool.get_cache_dict(
db_conn,
"events",
entity_column="room_id",
stream_column="stream_ordering",
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache",
min_event_val,
prefilled_cache=event_cache_prefill,
stream_id_gen=self._stream_id_gen,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache",
events_max,
stream_id_gen=self._stream_id_gen,
)

if hs.config.worker.run_background_tasks:
# We periodically clean out old transaction ID mappings
self._clock.looping_call(
Expand Down Expand Up @@ -298,8 +314,20 @@ def process_replication_rows(
token: int,
rows: Iterable[Any],
) -> None:
# Process event stream replication rows, handling both the ID generators from the events
# worker store and the stream change caches in this store as the two are interlinked.
if stream_name == EventsStream.NAME:
for row in rows:
data = row.data
self._events_stream_cache.entity_has_changed(data.room_id, token)
if data.type == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(
data.state_key, token
)
# NOTE: this must be updated *after* the stream change cache, so other threads don't
# see a token ahead of the cache.
self._stream_id_gen.advance(instance_name, token)

elif stream_name == BackfillStream.NAME:
self._backfill_id_gen.advance(instance_name, -token)

Expand Down
18 changes: 0 additions & 18 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable

if TYPE_CHECKING:
Expand Down Expand Up @@ -397,23 +396,6 @@ def __init__(
# during startup which would cause one to die.
self._need_to_reset_federation_stream_positions = self._send_federation

events_max = self.get_room_max_stream_ordering()
event_cache_prefill, min_event_val = self.db_pool.get_cache_dict(
db_conn,
"events",
entity_column="room_id",
stream_column="stream_ordering",
max_value=events_max,
)
self._events_stream_cache = StreamChangeCache(
"EventsRoomStreamChangeCache",
min_event_val,
prefilled_cache=event_cache_prefill,
)
self._membership_stream_cache = StreamChangeCache(
"MembershipStreamChangeCache", events_max
)

self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()

Expand Down
17 changes: 15 additions & 2 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import attr
from sortedcontainers import SortedDict

from synapse.storage.util.id_generators import AbstractStreamIdTracker
from synapse.util import caches

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -70,6 +71,7 @@ def __init__(
current_stream_pos: int,
max_size: int = 10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
stream_id_gen: Optional[AbstractStreamIdTracker] = None,
) -> None:
self._original_max_size: int = max_size
self._max_size = math.floor(max_size)
Expand All @@ -92,9 +94,11 @@ def __init__(
"cache", self.name, self._cache, resize_callback=self.set_cache_factor
)

self.stream_id_gen = stream_id_gen

if prefilled_cache:
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
self.entity_has_changed(entity, stream_pos, check_pos=False)

def set_cache_factor(self, factor: float) -> bool:
"""
Expand Down Expand Up @@ -256,7 +260,9 @@ def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult:
changed_entities.extend(self._cache[k])
return AllEntitiesChangedResult(changed_entities)

def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
def entity_has_changed(
self, entity: EntityType, stream_pos: int, check_pos: bool = True
) -> None:
"""
Informs the cache that the entity has been changed at the given position.

Expand All @@ -271,6 +277,13 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
if stream_pos <= self._earliest_known_stream_pos:
return

# Any change being flagged must be ahead of any current token, otherwise
# we have a race condition between token position and stream change cache.
# NOTE: this checks for equal to allow for a process persisting an event to
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super pleased with this perhaps a better way would be to add a flag to explicitly enable exact matches for writer instances?

# immediately flag the cache, as it cannot know the ID before generating it.
if check_pos and self.stream_id_gen:
assert stream_pos >= self.stream_id_gen.get_current_token()

old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
if old_pos >= stream_pos:
Expand Down