Skip to content

Sliding Sync: Make sure we get up-to-date information from get_sliding_sync_rooms_for_user(...) #17692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17692.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make sure we get up-to-date state information when using the new Sliding Sync tables to derive room membership.
1 change: 1 addition & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)

def _invalidate_state_caches_all(self, room_id: str) -> None:
"""Invalidates caches that are based on the current state, but does
Expand Down
14 changes: 14 additions & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.events import SLIDING_SYNC_RELEVANT_STATE_SET
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.util.caches.descriptors import CachedFunction
Expand Down Expand Up @@ -271,12 +272,20 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._attempt_to_invalidate_cache(
"get_rooms_for_user", (data.state_key,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
)
elif data.type == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache(
"get_room_encryption", (data.room_id,)
)
elif data.type == EventTypes.Create:
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))

if (data.type, data.state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busting the cache wherever we bust it for get_room_encryption already

elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
# Similar to the above, but the entire caches are invalidated. This is
Expand All @@ -285,6 +294,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand Down Expand Up @@ -365,6 +375,9 @@ def _invalidate_caches_for_event(
elif etype == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))

if (etype, state_key) in SLIDING_SYNC_RELEVANT_STATE_SET:
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)

if relates_to:
self._attempt_to_invalidate_cache(
"get_relations_for_event",
Expand Down Expand Up @@ -477,6 +490,7 @@ def _invalidate_caches_for_room(self, room_id: str) -> None:
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
Expand Down
9 changes: 8 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,9 @@ def f(txn: LoggingTransaction) -> None:
self._invalidate_cache_and_stream(
txn, self.get_forgotten_rooms_for_user, (user_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_sliding_sync_rooms_for_user, (user_id,)
)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Busting the cache whenever we bust the cache for did_forget/get_forgotten_rooms_for_user


await self.db_pool.runInteraction("forget_membership", f)

Expand Down Expand Up @@ -1393,6 +1396,10 @@ async def get_sliding_sync_rooms_for_user(
def get_sliding_sync_rooms_for_user_txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
# XXX: If you use any new columns that can change (like from
# `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the
# `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add
# tests).
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
Expand All @@ -1415,7 +1422,7 @@ def get_sliding_sync_rooms_for_user_txn(
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
room_type=row[7],
is_encrypted=row[8],
is_encrypted=bool(row[8]),
)
for row in txn
}
Expand Down
191 changes: 136 additions & 55 deletions tests/rest/client/sliding_sync/test_sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,43 +722,37 @@ def test_filter_regardless_of_membership_server_left_room(self) -> None:
self.helper.join(space_room_id, user1_id, tok=user1_tok)

# Make an initial Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint,
{
"lists": {
"all-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
"filters": {},
},
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 1,
"filters": {
"is_encrypted": True,
"room_types": [RoomTypes.SPACE],
},
sync_body = {
"lists": {
"all-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
"filters": {},
},
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 1,
"filters": {
"is_encrypted": True,
"room_types": [RoomTypes.SPACE],
},
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
from_token = channel.json_body["pos"]
},
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, just updating to use the do_sync helper


# Make sure the response has the lists we requested
self.assertListEqual(
list(channel.json_body["lists"].keys()),
list(response_body["lists"].keys()),
["all-list", "foo-list"],
channel.json_body["lists"].keys(),
response_body["lists"].keys(),
)

# Make sure the lists have the correct rooms
self.assertListEqual(
list(channel.json_body["lists"]["all-list"]["ops"]),
list(response_body["lists"]["all-list"]["ops"]),
[
{
"op": "SYNC",
Expand All @@ -768,7 +762,7 @@ def test_filter_regardless_of_membership_server_left_room(self) -> None:
],
)
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
Expand All @@ -783,35 +777,30 @@ def test_filter_regardless_of_membership_server_left_room(self) -> None:
self.helper.leave(space_room_id, user2_id, tok=user2_tok)

# Make an incremental Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint + f"?pos={from_token}",
{
"lists": {
"all-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
"filters": {},
},
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 1,
"filters": {
"is_encrypted": True,
"room_types": [RoomTypes.SPACE],
},
sync_body = {
"lists": {
"all-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
"filters": {},
},
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 1,
"filters": {
"is_encrypted": True,
"room_types": [RoomTypes.SPACE],
},
}
},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
},
}
}
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)

# Make sure the lists have the correct rooms even though we `newly_left`
self.assertListEqual(
list(channel.json_body["lists"]["all-list"]["ops"]),
list(response_body["lists"]["all-list"]["ops"]),
[
{
"op": "SYNC",
Expand All @@ -821,7 +810,7 @@ def test_filter_regardless_of_membership_server_left_room(self) -> None:
],
)
self.assertListEqual(
list(channel.json_body["lists"]["foo-list"]["ops"]),
list(response_body["lists"]["foo-list"]["ops"]),
[
{
"op": "SYNC",
Expand All @@ -831,6 +820,98 @@ def test_filter_regardless_of_membership_server_left_room(self) -> None:
],
)

def test_filter_is_encrypted_up_to_date(self) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New tests

"""
Make sure we get up-to-date `is_encrypted` status for a joined room
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

room_id = self.helper.create_room_as(user1_id, tok=user1_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
"filters": {
"is_encrypted": True,
},
},
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
set(),
exact=True,
)

# Update the encryption status
self.helper.send_state(
room_id,
EventTypes.RoomEncryption,
{EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
tok=user1_tok,
)

# We should see the room now because it's encrypted
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
{room_id},
exact=True,
)

def test_forgotten_up_to_date(self) -> None:
"""
Make sure we get up-to-date `forgotten` status for rooms
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")

room_id = self.helper.create_room_as(user2_id, tok=user2_tok)

# User1 is banned from the room (was never in the room)
self.helper.ban(room_id, src=user2_id, targ=user1_id, tok=user2_tok)

sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
"filters": {},
},
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
{room_id},
exact=True,
)

# User1 forgets the room
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{room_id}/forget",
content={},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)

# We should no longer see the forgotten room
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
set(),
exact=True,
)

def test_sort_list(self) -> None:
"""
Test that the `lists` are sorted by `stream_ordering`
Expand Down
Loading