Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise get_rooms_for_user (drop with_stream_ordering) #13787

Merged
merged 17 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Linting fixes
  • Loading branch information
Fizzadar committed Sep 13, 2022
commit b745518c475060f223769665b787932642eae07b
4 changes: 1 addition & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,7 @@ async def get_user_ids_changed(
possibly_left = possibly_changed | possibly_left

# Double check if we still share rooms with the given user.
users_rooms = await self.store.get_rooms_for_users(
possibly_left
)
users_rooms = await self.store.get_rooms_for_users(possibly_left)
for changed_user_id, entries in users_rooms.items():
if any(room_id in room_ids for room_id in entries):
possibly_left.discard(changed_user_id)
Expand Down
10 changes: 2 additions & 8 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1474,9 +1474,7 @@ async def _generate_sync_entry_for_device_list(
since_token.device_list_key
)
if changed_users is not None:
result = await self.store.get_rooms_for_users(
changed_users
)
result = await self.store.get_rooms_for_users(changed_users)

for changed_user_id, entries in result.items():
# Check if the changed user shares any rooms with the user,
Expand Down Expand Up @@ -1517,11 +1515,7 @@ async def _generate_sync_entry_for_device_list(
newly_left_users.update(left_users)

# Remove any users that we still share a room with.
left_users_rooms = (
await self.store.get_rooms_for_users(
newly_left_users
)
)
left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
for user_id, entries in left_users_rooms.items():
if any(rid in joined_rooms for rid in entries):
newly_left_users.discard(user_id)
Expand Down
4 changes: 1 addition & 3 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
self.get_rooms_for_user.invalidate(
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,))
else:
raise Exception("Unknown events stream row type %s" % (row.type,))

Expand Down
16 changes: 7 additions & 9 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,15 +617,15 @@ def _get_users_server_still_shares_room_with_txn(
)

@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user(
self, user_id: str
) -> FrozenSet[str]:
async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
"""Returns a set of room_ids the user is currently joined to.

If a remote user only returns rooms this server is currently
participating in.
"""
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(user_id)
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
user_id
)
if rooms:
return frozenset(r.room_id for r in rooms)

Expand All @@ -648,15 +648,15 @@ def _get_rooms_for_user_txn(
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may as well just use simple_select_onecol here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(txn)
return frozenset(row[0] for row in txn)

@cachedList(
cached_method_name="get_rooms_for_user",
list_name="user_ids",
)
async def get_rooms_for_users(
self, user_ids: Collection[str]
) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
) -> Dict[str, FrozenSet[str]]:
"""A batched version of `get_rooms_for_user`.

Returns:
Expand Down Expand Up @@ -689,9 +689,7 @@ def _get_rooms_for_users_txn(

txn.execute(sql, [Membership.JOIN] + args)

result: Dict[str, Set[str]] = {
user_id: set() for user_id in user_ids
}
result: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids}
for user_id, room_id in txn:
result[user_id].add(room_id)

Expand Down