Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Avoid checking the event cache when backfilling events #14164

Merged
merged 6 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14164.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
46 changes: 33 additions & 13 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,41 @@ async def _process_pulled_events(
],
)

# Check if we already any of these have these events.
# Note: we currently make a lookup in the database directly here rather than
# checking the event cache, due to:
# https://github.com/matrix-org/synapse/issues/13476
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add an additional comment about potentially adding a cached check back when "something like #13916 comes along and correctly invalidates the event cache."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and added a comment to the referenced bug instead, which should serve the same purpose and tie things together a bit better on GitHub.

existing_events_map = await self._store._get_events_from_db(
[event.event_id for event in events]
)

new_events = []
for event in events:
event_id = event.event_id

# If we've already seen this event ID...
if event_id in existing_events_map:
existing_event = existing_events_map[event_id]

# ...and the event itself was not previously stored as an outlier...
if not existing_event.event.internal_metadata.is_outlier():
# ...then there's no need to persist it. We have it already.
logger.info(
"_process_pulled_event: Ignoring received event %s which we "
"have already seen",
event.event_id,
)

# While we have seen this event before, it was stored as an outlier.
# We'll now persist it as a non-outlier.
logger.info("De-outliering event %s", event_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This control flow is only doing logging! I think you wanted a continue in the inner if block, right? :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Maybe this indicates that a test is needed...)

Copy link
Member Author

@anoadragon453 anoadragon453 Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serves me right for not manually testing again before putting this up!

I'll write up a unit test for this function, but otherwise I find this behaviour a little hard to test...

Copy link
Contributor

@MadLittleMods MadLittleMods Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A complement test seems like it would fit #14161 (comment) well

If you're looking for a test that works with _process_pulled_events, I wrote one in #13864, see (still a bit messy in scratch state)

def test_process_pulled_events_asdf(self) -> None:
main_store = self.hs.get_datastores().main
state_storage_controller = self.hs.get_storage_controllers().state
def _debug_event_string(event: EventBase) -> str:
debug_body = event.content.get("body", event.type)
maybe_state_key = getattr(event, "state_key", None)
return f"event_id={event.event_id},depth={event.depth},body={debug_body}({maybe_state_key}),prevs={event.prev_event_ids()}"
known_event_dict: Dict[str, Tuple[EventBase, List[EventBase]]] = {}
def _add_to_known_event_list(
event: EventBase, state_events: Optional[List[EventBase]] = None
) -> None:
if state_events is None:
state_map = self.get_success(
state_storage_controller.get_state_for_event(event.event_id)
)
state_events = list(state_map.values())
known_event_dict[event.event_id] = (event, state_events)
async def get_room_state_ids(
destination: str, room_id: str, event_id: str
) -> JsonDict:
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state_ids: Event ({event_id}) not part of our known events list"
)
known_event, known_event_state_list = known_event_info
logger.info(
"stubbed get_room_state_ids: destination=%s event_id=%s auth_event_ids=%s",
destination,
event_id,
known_event.auth_event_ids(),
)
# self.assertEqual(event_id, missing_event.event_id)
return {
"pdu_ids": [
state_event.event_id for state_event in known_event_state_list
],
"auth_chain_ids": known_event.auth_event_ids(),
}
async def get_room_state(
room_version: RoomVersion, destination: str, room_id: str, event_id: str
) -> StateRequestResponse:
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state: Event ({event_id}) not part of our known events list"
)
known_event, known_event_state_list = known_event_info
logger.info(
"stubbed get_room_state: destination=%s event_id=%s auth_event_ids=%s",
destination,
event_id,
known_event.auth_event_ids(),
)
auth_event_ids = known_event.auth_event_ids()
auth_events = []
for auth_event_id in auth_event_ids:
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_room_state: Auth event ({auth_event_id}) is not part of our known events list"
)
known_auth_event, _ = known_event_info
auth_events.append(known_auth_event)
return StateRequestResponse(
state=known_event_state_list,
auth_events=auth_events,
)
async def get_event(destination: str, event_id: str, timeout=None):
self.assertEqual(destination, self.OTHER_SERVER_NAME)
known_event_info = known_event_dict.get(event_id)
if known_event_info is None:
self.fail(
f"stubbed get_event: Event ({event_id}) not part of our known events list"
)
known_event, _ = known_event_info
return {"pdus": [known_event.get_pdu_json()]}
self.mock_federation_transport_client.get_room_state_ids.side_effect = (
get_room_state_ids
)
self.mock_federation_transport_client.get_room_state.side_effect = (
get_room_state
)
self.mock_federation_transport_client.get_event.side_effect = get_event
# create the room
room_creator = self.appservice.sender
room_id = self.helper.create_room_as(
room_creator=self.appservice.sender, tok=self.appservice.token
)
room_version = self.get_success(main_store.get_room_version(room_id))
event_before = self.get_success(
inject_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.Message,
content={"body": "eventBefore0", "msgtype": "m.text"},
)
)
_add_to_known_event_list(event_before)
event_after = self.get_success(
inject_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.Message,
content={"body": "eventAfter0", "msgtype": "m.text"},
)
)
_add_to_known_event_list(event_after)
state_map = self.get_success(
state_storage_controller.get_state_for_event(event_before.event_id)
)
room_create_event = state_map.get((EventTypes.Create, ""))
pl_event = state_map.get((EventTypes.PowerLevels, ""))
as_membership_event = state_map.get((EventTypes.Member, room_creator))
assert room_create_event is not None
assert pl_event is not None
assert as_membership_event is not None
for state_event in state_map.values():
_add_to_known_event_list(state_event)
# This should be the successor of the event we want to insert next to
# (the successor of event_before is event_after).
inherited_depth = event_after.depth
historical_base_auth_event_ids = [
room_create_event.event_id,
pl_event.event_id,
]
historical_state_events = list(state_map.values())
historical_state_event_ids = [
state_event.event_id for state_event in historical_state_events
]
maria_mxid = "@maria:test"
maria_membership_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=maria_mxid,
state_key=maria_mxid,
type=EventTypes.Member,
content={
"membership": "join",
},
# It all works when I add a prev_event for the floating
# insertion event but the event no longer floats.
# It's able to resolve state at the prev_events though.
prev_event_ids=[event_before.event_id],
# allow_no_prev_events=True,
# prev_event_ids=[],
# auth_event_ids=historical_base_auth_event_ids,
#
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events. For member events, we do it this way.
state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(maria_membership_event, historical_state_events)
historical_state_events.append(maria_membership_event)
historical_state_event_ids.append(maria_membership_event.event_id)
batch_id = random_string(8)
next_batch_id = random_string(8)
insertion_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_INSERTION,
content={
EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
# The difference from the actual room /batch_send is that this is normally
# floating as well. But seems to work once we connect it to the
# floating historical state chain.
prev_event_ids=[maria_membership_event.event_id],
# allow_no_prev_events=True,
# prev_event_ids=[],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
# state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(insertion_event, historical_state_events)
historical_message_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=maria_mxid,
type=EventTypes.Message,
content={"body": "Historical message", "msgtype": "m.text"},
prev_event_ids=[insertion_event.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
maria_membership_event.event_id,
],
depth=inherited_depth,
)
)
_add_to_known_event_list(historical_message_event, historical_state_events)
batch_event, _ = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_BATCH,
content={
EventContentFields.MSC2716_BATCH_ID: batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
prev_event_ids=[historical_message_event.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
depth=inherited_depth,
)
)
_add_to_known_event_list(batch_event, historical_state_events)
base_insertion_event, base_insertion_event_context = self.get_success(
create_event(
self.hs,
room_id=room_id,
sender=room_creator,
type=EventTypes.MSC2716_INSERTION,
content={
EventContentFields.MSC2716_NEXT_BATCH_ID: batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
prev_event_ids=[event_before.event_id],
# Because we're creating all of these events without persisting them yet,
# we have to explicitly provide some auth_events
auth_event_ids=[
*historical_base_auth_event_ids,
as_membership_event.event_id,
],
# state_event_ids=historical_state_event_ids,
depth=inherited_depth,
)
)
_add_to_known_event_list(base_insertion_event, historical_state_events)
# Chronological
pulled_events: List[EventBase] = [
# Beginning of room (oldest messages)
# *list(state_map.values()),
room_create_event,
pl_event,
as_membership_event,
state_map.get((EventTypes.JoinRules, "")),
state_map.get((EventTypes.RoomHistoryVisibility, "")),
event_before,
# HISTORICAL MESSAGE END
insertion_event,
historical_message_event,
batch_event,
base_insertion_event,
# HISTORICAL MESSAGE START
event_after,
# Latest in the room (newest messages)
]
# pulled_events: List[EventBase] = [
# # Beginning of room (oldest messages)
# # *list(state_map.values()),
# room_create_event,
# pl_event,
# as_membership_event,
# state_map.get((EventTypes.JoinRules, "")),
# state_map.get((EventTypes.RoomHistoryVisibility, "")),
# event_before,
# # HISTORICAL MESSAGE END
# insertion_event,
# historical_message_event,
# batch_event,
# base_insertion_event,
# # HISTORICAL MESSAGE START
# event_after,
# # Latest in the room (newest messages)
# ]
# The order that we get after passing reverse chronological events in
# that mostly passes. Only the insertion event is rejected but the
# historical messages appear /messages scrollback.
# pulled_events: List[EventBase] = [
# # Beginning of room (oldest messages)
# # *list(state_map.values()),
# room_create_event,
# pl_event,
# as_membership_event,
# state_map.get((EventTypes.JoinRules, "")),
# state_map.get((EventTypes.RoomHistoryVisibility, "")),
# event_before,
# event_after,
# base_insertion_event,
# batch_event,
# historical_message_event,
# insertion_event,
# # Latest in the room (newest messages)
# ]
import logging
logger = logging.getLogger(__name__)
logger.info(
"pulled_events=%s",
json.dumps(
[_debug_event_string(event) for event in pulled_events],
indent=4,
),
)
for event, _ in known_event_dict.values():
if event.internal_metadata.outlier:
self.fail("Our pristine events should not be marked as an outlier")
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_events(
self.OTHER_SERVER_NAME,
[
# Make copies of events since Synapse modifies the
# internal_metadata in place and we want to keep our
# pristine copies
make_event_from_dict(pulled_event.get_pdu_json(), room_version)
for pulled_event in pulled_events
],
backfilled=True,
)
)
from_token = self.get_success(
self.hs.get_event_sources().get_current_token_for_pagination(room_id)
)
actual_events_in_room_reverse_chronological, _ = self.get_success(
main_store.paginate_room_events(
room_id, from_key=from_token.room_key, limit=100, direction="b"
)
)
# We have to reverse the list to make it chronological.
actual_events_in_room_chronological = list(
reversed(actual_events_in_room_reverse_chronological)
)
expected_event_order = [
# Beginning of room (oldest messages)
# *list(state_map.values()),
room_create_event,
as_membership_event,
pl_event,
state_map.get((EventTypes.JoinRules, "")),
state_map.get((EventTypes.RoomHistoryVisibility, "")),
event_before,
# HISTORICAL MESSAGE END
insertion_event,
historical_message_event,
batch_event,
base_insertion_event,
# HISTORICAL MESSAGE START
event_after,
# Latest in the room (newest messages)
]
event_id_diff = {event.event_id for event in expected_event_order} - {
event.event_id for event in actual_events_in_room_chronological
}
event_diff_ordered = [
event for event in expected_event_order if event.event_id in event_id_diff
]
event_id_extra = {
event.event_id for event in actual_events_in_room_chronological
} - {event.event_id for event in expected_event_order}
event_extra_ordered = [
event
for event in actual_events_in_room_chronological
if event.event_id in event_id_extra
]
assertion_message = (
"Debug info:\nActual events missing from expected list: %s\nActual events contain %d additional events compared to expected: %s\nExpected event order: %s\nActual event order: %s"
% (
json.dumps(
[_debug_event_string(event) for event in event_diff_ordered],
indent=4,
),
len(event_extra_ordered),
json.dumps(
[_debug_event_string(event) for event in event_extra_ordered],
indent=4,
),
json.dumps(
[_debug_event_string(event) for event in expected_event_order],
indent=4,
),
json.dumps(
[
_debug_event_string(event)
for event in actual_events_in_room_chronological
],
indent=4,
),
)
)
# assert (
# actual_events_in_room_chronological == expected_event_order
# ), assertion_message
self.assertEqual(
[event.event_id for event in actual_events_in_room_chronological],
[event.event_id for event in expected_event_order],
assertion_message,
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with a Complement test is that we'd need to call Synapse's Delete Room Admin API, which would only be specific to Synapse. I could write this test, but I wouldn't be able to publish it anywhere permanent, and the steps in it would be no different than manually testing.

Manually testing again, with the continue backfill doesn't drop any events. Thanks for catching that :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cracked and settled for writing a unit test for backfill, while including a bit that specifically checks that backfill won't rely on the event cache to determine whether to persist an event. The result is a test for the backfill + the specific behaviour that PR removes.

def test_backfill_ignores_known_events(self) -> None:
"""
Tests that events that we already know about are ignored when backfilling.
"""
# Set up users
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
other_server = "otherserver"
other_user = "@otheruser:" + other_server
# Create a room to backfill events into
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(self.store.get_room_version(room_id))
# Build an event to backfill
event = event_from_pdu_json(
{
"type": EventTypes.Message,
"content": {"body": "hello world", "msgtype": "m.text"},
"room_id": room_id,
"sender": other_user,
"depth": 32,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
room_version,
)
# Ensure the event is not already in the DB
self.get_failure(
self.store.get_event(event.event_id),
NotFoundError,
)
# Backfill the event and check that it has entered the DB.
# We mock out the FederationClient.backfill method, to pretend that a remote
# server has returned our fake event.
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
self.hs.get_federation_client().backfill = federation_client_backfill_mock
# We also mock the persist method with a side effect of itself. This allows us
# to track when it has been called while preserving its function.
persist_events_and_notify_mock = Mock(
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
)
self.hs.get_federation_event_handler().persist_events_and_notify = (
persist_events_and_notify_mock
)
# Small side-tangent. We populate the event cache with the event, even though
# it is not yet in the DB. This is an invalid scenario that can currently occur
# due to not properly invalidating the event cache.
# See https://github.com/matrix-org/synapse/issues/13476.
#
# As a result, backfill should not rely on the event cache to check whether
# we already have an event in the DB.
# TODO: Remove this bit when the event cache is properly invalidated.
cache_entry = EventCacheEntry(
event=event,
redacted_event=None,
)
self.store._get_event_cache.set_local((event.event_id,), cache_entry)
# We now call FederationEventHandler.backfill (a separate method) to trigger
# a backfill request. It should receive the fake event.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
)
)
# Check that our fake event was persisted.
persist_events_and_notify_mock.assert_called_once()
persist_events_and_notify_mock.reset_mock()
# Now we repeat the backfill, having the homeserver receive the fake event
# again.
self.get_success(
self.hs.get_federation_event_handler().backfill(
other_user,
room_id,
limit=10,
extremities=[],
),
)
# This time, we expect no event persistence to have occurred, as we already
# have this event.
persist_events_and_notify_mock.assert_not_called()


# Continue on with the events that are new to us.
new_events.append(event)

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)
sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
Expand Down Expand Up @@ -846,18 +878,6 @@ async def _process_pulled_event(

event_id = event.event_id

existing = await self._store.get_event(
event_id, allow_none=True, allow_rejected=True
)
if existing:
if not existing.internal_metadata.is_outlier():
logger.info(
"_process_pulled_event: Ignoring received event %s which we have already seen",
event_id,
)
return
logger.info("De-outliering event %s", event_id)

try:
self._sanity_check_event(event)
except SynapseError as err:
Expand Down