Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Avoid checking the event cache when backfilling events #14164

Merged
merged 6 commits into from
Oct 18, 2022

Conversation

anoadragon453
Copy link
Member

Most of the context is in #14161.

This PR augments another optimisation. In #14161, when we purged and rejoined a room, we found that while the state was now persisted correctly, non-state events were not. These events are not initially received when you rejoin a room, but must be backfilled from the remote homeserver after you join the room.

Backfilling involves another area where we refuse to persist events if they already exist in the event cache:

existing = await self._store.get_event(
event_id, allow_none=True, allow_rejected=True
)
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
"_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
logger.info("De-outliering event %s", event_id)

It has one more conditional over the _have_seen_events_dict case though. If we found an event in the cache, and it was an outlier, we'll still persist it. Of course, most events aren't outliers, including messages. Hence why we see messages get lost when rejoining a room after purging it in #14161 (comment),

This PR changes this optimisation such that it skips the event cache entirely, and instead just checks the database directly. Again, as with #14161, there's a potential performance degradation here, but it prevents breakage until something like #13916 comes along and correctly invalidates the event cache.


The main change here is skipping the event cache during backfill when checking to see if we already have an event before persisting it.

Additionally, there's a potential speedup in this PR as well. Before, we were checking one event ID at a time against the cache (or DB in case of a cache miss) in _process_pulled_event.

This PR moves this checking code outside of _process_pulled_event and excludes events we've already seen before sorting them and passing them one-by-one to _process_pulled_event.

In the case of lots of cache misses, this code should theoretically be faster...

reivilibre
reivilibre previously approved these changes Oct 14, 2022
Copy link
Contributor

@reivilibre reivilibre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable; agreed that getting them in one call to the DB should be more efficient.

Comment on lines 808 to 822
if event_id in existing_events_map:
existing_event = existing_events_map[event_id]

# ...and the event itself was not previously stored as an outlier...
if not existing_event.event.internal_metadata.is_outlier():
# ...then there's no need to persist it. We have it already.
logger.info(
"_process_pulled_event: Ignoring received event %s which we "
"have already seen",
event.event_id,
)

# While we have seen this event before, it was stored as an outlier.
# We'll now persist it as a non-outlier.
logger.info("De-outliering event %s", event_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This control flow is only doing logging! I think you wanted a continue in the inner if block, right? :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Maybe this indicates that a test is needed...)

Copy link
Member Author

@anoadragon453 anoadragon453 Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serves me right for not manually testing again before putting this up!

I'll write up a unit test for this function, but otherwise I find this behaviour a little hard to test...

Copy link
Contributor

@MadLittleMods MadLittleMods Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A complement test seems like it would fit #14161 (comment) well

If you're looking for a test that works with _process_pulled_events, I wrote one in #13864, see (still a bit messy in scratch state)

def test_process_pulled_events_asdf(self) -> None:
main_store = self.hs.get_datastores().main
state_storage_controller = self.hs.get_storage_controllers().state
def _debug_event_string(event: EventBase) -> str:
debug_body = event.content.get("body", event.type)
maybe_state_key = getattr(event, "state_key", None)
return f"event_id={event.event_id},depth={event.depth},body={debug_body}({maybe_state_key}),prevs={event.prev_event_ids()}"
known_event_dict: Dict[str, Tuple[EventBase, List[EventBase]]] = {}
def _add_to_known_event_list(
event: EventBase, state_events: Optional[List[EventBase]] = None
) -> None:
if state_events is None:
state_map = self.get_success(
state_storage_controller.get_state_for_event(event.event_id)
)
state_events = list(state_map.values())
known_event_dict[event.event_id] = (event, state_events)
async def get_room_state_ids(
destination: str, room_id: str, event_id: str
) -> JsonDict:
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state_ids: Event ({event_id}) not part of our known events list"
)
known_event, known_event_state_list = known_event_info
logger.info(
"stubbed get_room_state_ids: destination=%s event_id=%s auth_event_ids=%s",
destination,
event_id,
known_event.auth_event_ids(),
)
# self.assertEqual(event_id, missing_event.event_id)
return {
"pdu_ids": [
state_event.event_id for state_event in known_event_state_list
],
"auth_chain_ids": known_event.auth_event_ids(),
}
async def get_room_state(
room_version: RoomVersion, destination: str, room_id: str, event_id: str
) -> StateRequestResponse:
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state: Event ({event_id}) not part of our known events list"
)
known_event, known_event_state_list = known_event_info
logger.info(
"stubbed get_room_state: destination=%s event_id=%s auth_event_ids=%s",
destination,
event_id,
known_event.auth_event_ids(),
)
auth_event_ids = known_event.auth_event_ids()
auth_events = []
for auth_event_id in auth_event_ids:
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state: Auth event ({auth_event_id}) is not part of our known events list"
)
known_auth_event, _ = known_event_info
auth_events.append(known_auth_event)
return StateRequestResponse(
state=known_event_state_list,
auth_events=auth_events,
)
async def get_event(destination: str, event_id: str, timeout=None):
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_event: Event ({event_id}) not part of our known events list"
)
known_event, _ = known_event_info
return {"pdus": [known_event.get_pdu_json()]}
self.mock_federation_transport_client.get_room_state_ids.side_effect = (
get_room_state_ids
)
self.mock_federation_transport_client.get_room_state.side_effect = (
get_room_state
)
self.mock_federation_transport_client.get_event.side_effect = get_event
# create the room
room_creator = self.appservice.sender
room_id = self.helper.create_room_as(
room_creator=self.appservice.sender, tok=self.appservice.token
)
room_version = self.get_success(main_store.get_room_version(room_id))
event_before = self.get_success(
inject_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.Message,
content={"body": "eventBefore0", "msgtype": "m.text"},
)
)
_add_to_known_event_list(event_before)
event_after = self.get_success(
inject_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.Message,
content={"body": "eventAfter0", "msgtype": "m.text"},
)
)
_add_to_known_event_list(event_after)
state_map = self.get_success(
state_storage_controller.get_state_for_event(event_before.event_id)
)
room_create_event = state_map.get((EventTypes.Create, ""))
pl_event = state_map.get((EventTypes.PowerLevels, ""))
as_membership_event = state_map.get((EventTypes.Member, room_creator))
assert room_create_event is not None
assert pl_event is not None
assert as_membership_event is not None
for state_event in state_map.values():
_add_to_known_event_list(state_event)
# This should be the successor of the event we want to insert next to
# (the successor of event_before is event_after).
inherited_depth = event_after.depth
historical_base_auth_event_ids = [
room_create_event.event_id,
pl_event.event_id,
]
historical_state_events = list(state_map.values())
historical_state_event_ids = [
state_event.event_id for state_event in historical_state_events
]
maria_mxid = "@maria:test"
maria_membership_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=maria_mxid,
state_key=maria_mxid,
type=EventTypes.Member,
content={
"membership": "join",
},
# It all works when I add a prev_event for the floating
# insertion event but the event no longer floats.
# It's able to resolve state at the prev_events though.
prev_event_ids=[event_before.event_id],
# allow_no_prev_events=True,
# prev_event_ids=[],
# auth_event_ids=historical_base_auth_event_ids,
#
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events. For member events, we do it this way.
state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(maria_membership_event, historical_state_events)
historical_state_events.append(maria_membership_event)
historical_state_event_ids.append(maria_membership_event.event_id)
batch_id = random_string(8)
next_batch_id = random_string(8)
insertion_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_INSERTION,
content={
EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
# The difference from the actual room /batch_send is that this is normally
# floating as well. But seems to work once we connect it to the
# floating historical state chain.
prev_event_ids=[maria_membership_event.event_id],
# allow_no_prev_events=True,
# prev_event_ids=[],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
# state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(insertion_event, historical_state_events)
historical_message_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=maria_mxid,
type=EventTypes.Message,
content={"body": "Historical message", "msgtype": "m.text"},
prev_event_ids=[insertion_event.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
maria_membership_event.event_id,
],
depth=inherited_depth,
)
)
_add_to_known_event_list(historical_message_event, historical_state_events)
batch_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_BATCH,
content={
EventContentFields.MSC2716_BATCH_ID: batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
prev_event_ids=[historical_message_event.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
depth=inherited_depth,
)
)
_add_to_known_event_list(batch_event, historical_state_events)
base_insertion_event, base_insertion_event_context = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_INSERTION,
content={
EventContentFields.MSC2716_NEXT_BATCH_ID: batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
prev_event_ids=[event_before.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
# state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(base_insertion_event, historical_state_events)
# Chronological
pulled_events: List[EventBase] = [
# Beginning of room (oldest messages)
# *list(state_map.values()),
room_create_event,
pl_event,
as_membership_event,
state_map.get((EventTypes.JoinRules, "")),
state_map.get((EventTypes.RoomHistoryVisibility, "")),
event_before,
# HISTORICAL MESSAGE END
insertion_event,
historical_message_event,
batch_event,
base_insertion_event,
# HISTORICAL MESSAGE START
event_after,
# Latest in the room (newest messages)
]
# pulled_events: List[EventBase] = [
# # Beginning of room (oldest messages)
# # *list(state_map.values()),
# room_create_event,
# pl_event,
# as_membership_event,
# state_map.get((EventTypes.JoinRules, "")),
# state_map.get((EventTypes.RoomHistoryVisibility, "")),
# event_before,
# # HISTORICAL MESSAGE END
# insertion_event,
# historical_message_event,
# batch_event,
# base_insertion_event,
# # HISTORICAL MESSAGE START
# event_after,
# # Latest in the room (newest messages)
# ]
# The order that we get after passing reverse chronological events in
# that mostly passes. Only the insertion event is rejected but the
# historical messages appear /messages scrollback.
# pulled_events: List[EventBase] = [
# # Beginning of room (oldest messages)
# # *list(state_map.values()),
# room_create_event,
# pl_event,
# as_membership_event,
# state_map.get((EventTypes.JoinRules, "")),
# state_map.get((EventTypes.RoomHistoryVisibility, "")),
# event_before,
# event_after,
# base_insertion_event,
# batch_event,
# historical_message_event,
# insertion_event,
# # Latest in the room (newest messages)
# ]
import logging
logger = logging.getLogger(__name__)
logger.info(
"pulled_events=%s",
json.dumps(
[_debug_event_string(event) for event in pulled_events],
indent=4,
),
)
for event, _ in known_event_dict.values():
if event.internal_metadata.outlier:
self.fail("Our pristine events should not be marked as an outlier")
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_events(
self.OTHER_SERVER_NAME,
[
# Make copies of events since Synapse modifies the
# internal_metadata in place and we want to keep our
# pristine copies
make_event_from_dict(pulled_event.get_pdu_json(), room_version)
for pulled_event in pulled_events
],
backfilled=True,
)
)
from_token = self.get_success(
self.hs.get_event_sources().get_current_token_for_pagination(room_id)
)
actual_events_in_room_reverse_chronological, _ = self.get_success(
main_store.paginate_room_events(
room_id, from_key=from_token.room_key, limit=100, direction="b"
)
)
# We have to reverse the list to make it chronological.
actual_events_in_room_chronological = list(
reversed(actual_events_in_room_reverse_chronological)
)
expected_event_order = [
# Beginning of room (oldest messages)
# *list(state_map.values()),
room_create_event,
as_membership_event,
pl_event,
state_map.get((EventTypes.JoinRules, "")),
state_map.get((EventTypes.RoomHistoryVisibility, "")),
event_before,
# HISTORICAL MESSAGE END
insertion_event,
historical_message_event,
batch_event,
base_insertion_event,
# HISTORICAL MESSAGE START
event_after,
# Latest in the room (newest messages)
]
event_id_diff = {event.event_id for event in expected_event_order} - {
event.event_id for event in actual_events_in_room_chronological
}
event_diff_ordered = [
event for event in expected_event_order if event.event_id in event_id_diff
]
event_id_extra = {
event.event_id for event in actual_events_in_room_chronological
} - {event.event_id for event in expected_event_order}
event_extra_ordered = [
event
for event in actual_events_in_room_chronological
if event.event_id in event_id_extra
]
assertion_message = (
"Debug info:\nActual events missing from expected list: %s\nActual events contain %d additional events compared to expected: %s\nExpected event order: %s\nActual event order: %s"
% (
json.dumps(
[_debug_event_string(event) for event in event_diff_ordered],
indent=4,
),
len(event_extra_ordered),
json.dumps(
[_debug_event_string(event) for event in event_extra_ordered],
indent=4,
),
json.dumps(
[_debug_event_string(event) for event in expected_event_order],
indent=4,
),
json.dumps(
[
_debug_event_string(event)
for event in actual_events_in_room_chronological
],
indent=4,
),
)
)
# assert (
# actual_events_in_room_chronological == expected_event_order
# ), assertion_message
self.assertEqual(
[event.event_id for event in actual_events_in_room_chronological],
[event.event_id for event in expected_event_order],
assertion_message,
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with a Complement test is that we'd need to call Synapse's Delete Room Admin API, which would only be specific to Synapse. I could write this test, but I wouldn't be able to publish it anywhere permanent, and the steps in it would be no different than manually testing.

Manually testing again, with the continue backfill doesn't drop any events. Thanks for catching that :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cracked and settled for writing a unit test for backfill, while including a bit that specifically checks that backfill won't rely on the event cache to determine whether to persist an event. The result is a test for the backfill + the specific behaviour that PR removes.

def test_backfill_ignores_known_events(self) -> None:
"""
Tests that events that we already know about are ignored when backfilling.
"""
# Set up users
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
other_server = "otherserver"
other_user = "@otheruser:" + other_server
# Create a room to backfill events into
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(self.store.get_room_version(room_id))
# Build an event to backfill
event = event_from_pdu_json(
{
"type": EventTypes.Message,
"content": {"body": "hello world", "msgtype": "m.text"},
"room_id": room_id,
"sender": other_user,
"depth": 32,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
room_version,
)
# Ensure the event is not already in the DB
self.get_failure(
self.store.get_event(event.event_id),
NotFoundError,
)
# Backfill the event and check that it has entered the DB.
# We mock out the FederationClient.backfill method, to pretend that a remote
# server has returned our fake event.
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
self.hs.get_federation_client().backfill = federation_client_backfill_mock
# We also mock the persist method with a side effect of itself. This allows us
# to track when it has been called while preserving its function.
persist_events_and_notify_mock = Mock(
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
)
self.hs.get_federation_event_handler().persist_events_and_notify = (
persist_events_and_notify_mock
)
# Small side-tangent. We populate the event cache with the event, even though
# it is not yet in the DB. This is an invalid scenario that can currently occur
# due to not properly invalidating the event cache.
# See https://github.com/matrix-org/synapse/issues/13476.
#
# As a result, backfill should not rely on the event cache to check whether
# we already have an event in the DB.
# TODO: Remove this bit when the event cache is properly invalidated.
cache_entry = EventCacheEntry(
event=event,
redacted_event=None,
)
self.store._get_event_cache.set_local((event.event_id,), cache_entry)
# We now call FederationEventHandler.backfill (a separate method) to trigger
# a backfill request. It should receive the fake event.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
)
)
# Check that our fake event was persisted.
persist_events_and_notify_mock.assert_called_once()
persist_events_and_notify_mock.reset_mock()
# Now we repeat the backfill, having the homeserver receive the fake event
# again.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
),
)
# This time, we expect no event persistence to have occurred, as we already
# have this event.
persist_events_and_notify_mock.assert_not_called()

Copy link
Contributor

@reivilibre reivilibre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK with the change suggested

# Check if we already any of these have these events.
# Note: we currently make a lookup in the database directly here rather than
# checking the event cache, due to:
# https://github.com/matrix-org/synapse/issues/13476
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add an additional comment about potentially adding a cached check back when "something like #13916 comes along and correctly invalidates the event cache."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and added a comment to the referenced bug instead, which should serve the same purpose and tie things together a bit better on GitHub.

@anoadragon453
Copy link
Member Author

I successfully tested the following on my homeserver on develop (2c63cdc) + #14161 + #14164 (this PR):

  • Leaving Synapse Admins (#synapse:matrix.org)
  • Purging the room via the manhole
  • Rejoining the room
  • Sending and receiving messages

Worked as expected 🎉

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants