From 10081061d783621f6bd02ed176605a67a388a21e Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Tue, 13 Sep 2022 09:26:46 +0100 Subject: [PATCH] Reimplement `get_rooms_for_user` and `get_rooms_for_users` This avoids the join on `events` to get stream ordering that is mostly unused. --- synapse/storage/databases/main/cache.py | 3 + synapse/storage/databases/main/events.py | 4 + synapse/storage/databases/main/roommember.py | 79 ++++++++++++++++++-- 3 files changed, 79 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 12e9a423826a..7db2071fdaa2 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -205,6 +205,9 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self.get_rooms_for_user_with_stream_ordering.invalidate( (data.state_key,) ) + self.get_rooms_for_user.invalidate( + (data.state_key,) + ) else: raise Exception("Unknown events stream row type %s" % (row.type,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a4010ee28dca..7cce6f279707 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1178,6 +1178,10 @@ def _update_current_state_txn( self.store.get_rooms_for_user_with_stream_ordering.invalidate, (member,), ) + txn.call_after( + self.store.get_rooms_for_user.invalidate, + (member,), + ) self.store._invalidate_state_caches_and_stream( txn, room_id, members_changed diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index fdb4684e128e..260042c741c7 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -15,7 +15,6 @@ import logging from typing import ( TYPE_CHECKING, - Callable, Collection, Dict, FrozenSet, @@ -52,7 +51,6 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList -from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -670,19 +668,86 @@ def _get_users_server_still_shares_room_with_txn( _get_users_server_still_shares_room_with_txn, ) - @cancellable + @cached(max_entries=500000, iterable=True) async def get_rooms_for_user( - self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None + self, user_id: str ) -> FrozenSet[str]: """Returns a set of room_ids the user is currently joined to. If a remote user only returns rooms this server is currently participating in. """ - rooms = await self.get_rooms_for_user_with_stream_ordering( - user_id, on_invalidate=on_invalidate + rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(user_id) + if rooms: + return frozenset(r.room_id for r in rooms) + + return await self.db_pool.runInteraction( + "get_rooms_for_user", + self._get_rooms_for_user_txn, + user_id, ) - return frozenset(r.room_id for r in rooms) + + def _get_rooms_for_user_txn( + self, txn: LoggingTransaction, user_id: str + ) -> FrozenSet[str]: + sql = """ + SELECT room_id + FROM current_state_events AS c + WHERE + c.type = 'm.room.member' + AND c.state_key = ? + AND c.membership = ? + """ + + txn.execute(sql, (user_id, Membership.JOIN)) + return frozenset(txn) + + @cachedList( + cached_method_name="get_rooms_for_user", + list_name="user_ids", + ) + async def get_rooms_for_users( + self, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: + """A batched version of `get_rooms_for_user`. + + Returns: + Map from user_id to set of rooms that is currently in. + """ + return await self.db_pool.runInteraction( + "get_rooms_for_users", + self._get_rooms_for_users_txn, + user_ids, + ) + + def _get_rooms_for_users_txn( + self, txn: LoggingTransaction, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[str]]: + + clause, args = make_in_list_sql_clause( + self.database_engine, + "c.state_key", + user_ids, + ) + + sql = f""" + SELECT c.state_key, room_id + FROM current_state_events AS c + WHERE + c.type = 'm.room.member' + AND c.membership = ? + AND {clause} + """ + + txn.execute(sql, [Membership.JOIN] + args) + + result: Dict[str, Set[str]] = { + user_id: set() for user_id in user_ids + } + for user_id, room_id in txn: + result[user_id].add(room_id) + + return {user_id: frozenset(v) for user_id, v in result.items()} @cached(max_entries=10000) async def does_pair_of_users_share_a_room(