Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 9b3dc01

Browse files
committed
Merge pull request #5559 from matrix-org/erikj/refactor_changed_devices
2 parents 619ccfd + e79ec03 commit 9b3dc01

File tree

4 files changed

+98
-37
lines changed

4 files changed

+98
-37
lines changed

changelog.d/5559.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Optimise devices changed query to not pull unnecessary rows from the database, reducing database load.

synapse/handlers/device.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,13 @@ def get_user_ids_changed(self, user_id, from_token):
101101

102102
room_ids = yield self.store.get_rooms_for_user(user_id)
103103

104-
# First we check if any devices have changed
105-
changed = yield self.store.get_user_whose_devices_changed(
106-
from_token.device_list_key
104+
# First we check if any devices have changed for users that we share
105+
# rooms with.
106+
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
107+
user_id
108+
)
109+
changed = yield self.store.get_users_whose_devices_changed(
110+
from_token.device_list_key, users_who_share_room
107111
)
108112

109113
# Then work out if any users have since joined
@@ -188,10 +192,6 @@ def get_user_ids_changed(self, user_id, from_token):
188192
break
189193

190194
if possibly_changed or possibly_left:
191-
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
192-
user_id
193-
)
194-
195195
# Take the intersection of the users whose devices may have changed
196196
# and those that actually still share a room with the user
197197
possibly_joined = possibly_changed & users_who_share_room

synapse/handlers/sync.py

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,40 +1058,74 @@ def _generate_sync_entry_for_device_list(
10581058
newly_left_rooms,
10591059
newly_left_users,
10601060
):
1061+
"""Generate the DeviceLists section of sync
1062+
1063+
Args:
1064+
sync_result_builder (SyncResultBuilder)
1065+
newly_joined_rooms (set[str]): Set of rooms user has joined since
1066+
previous sync
1067+
newly_joined_or_invited_users (set[str]): Set of users that have
1068+
joined or been invited to a room since previous sync.
1069+
newly_left_rooms (set[str]): Set of rooms user has left since
1070+
previous sync
1071+
newly_left_users (set[str]): Set of users that have left a room
1072+
we're in since previous sync
1073+
1074+
Returns:
1075+
Deferred[DeviceLists]
1076+
"""
1077+
10611078
user_id = sync_result_builder.sync_config.user.to_string()
10621079
since_token = sync_result_builder.since_token
10631080

1081+
# We're going to mutate these fields, so lets copy them rather than
1082+
# assume they won't get used later.
1083+
newly_joined_or_invited_users = set(newly_joined_or_invited_users)
1084+
newly_left_users = set(newly_left_users)
1085+
10641086
if since_token and since_token.device_list_key:
1065-
changed = yield self.store.get_user_whose_devices_changed(
1066-
since_token.device_list_key
1087+
# We want to figure out what user IDs the client should refetch
1088+
# device keys for, and which users we aren't going to track changes
1089+
# for anymore.
1090+
#
1091+
# For the first step we check:
1092+
# a. if any users we share a room with have updated their devices,
1093+
# and
1094+
# b. we also check if we've joined any new rooms, or if a user has
1095+
# joined a room we're in.
1096+
#
1097+
# For the second step we just find any users we no longer share a
1098+
# room with by looking at all users that have left a room plus users
1099+
# that were in a room we've left.
1100+
1101+
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
1102+
user_id
1103+
)
1104+
1105+
# Step 1a, check for changes in devices of users we share a room with
1106+
users_that_have_changed = yield self.store.get_users_whose_devices_changed(
1107+
since_token.device_list_key, users_who_share_room
10671108
)
10681109

1069-
# TODO: Be more clever than this, i.e. remove users who we already
1070-
# share a room with?
1110+
# Step 1b, check for newly joined rooms
10711111
for room_id in newly_joined_rooms:
10721112
joined_users = yield self.state.get_current_users_in_room(room_id)
10731113
newly_joined_or_invited_users.update(joined_users)
10741114

1075-
for room_id in newly_left_rooms:
1076-
left_users = yield self.state.get_current_users_in_room(room_id)
1077-
newly_left_users.update(left_users)
1078-
10791115
# TODO: Check that these users are actually new, i.e. either they
10801116
# weren't in the previous sync *or* they left and rejoined.
1081-
changed.update(newly_joined_or_invited_users)
1117+
users_that_have_changed.update(newly_joined_or_invited_users)
10821118

1083-
if not changed and not newly_left_users:
1084-
defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
1119+
# Now find users that we no longer track
1120+
for room_id in newly_left_rooms:
1121+
left_users = yield self.state.get_current_users_in_room(room_id)
1122+
newly_left_users.update(left_users)
10851123

1086-
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
1087-
user_id
1088-
)
1124+
# Remove any users that we still share a room with.
1125+
newly_left_users -= users_who_share_room
10891126

10901127
defer.returnValue(
1091-
DeviceLists(
1092-
changed=users_who_share_room & changed,
1093-
left=set(newly_left_users) - users_who_share_room,
1094-
)
1128+
DeviceLists(changed=users_that_have_changed, left=newly_left_users)
10951129
)
10961130
else:
10971131
defer.returnValue(DeviceLists(changed=[], left=[]))

synapse/storage/devices.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from synapse.metrics.background_process_metrics import run_as_background_process
2525
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
2626
from synapse.storage.background_updates import BackgroundUpdateStore
27+
from synapse.util import batch_iter
2728
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
2829

2930
logger = logging.getLogger(__name__)
@@ -391,22 +392,47 @@ def _get_devices_with_keys_by_user_txn(self, txn, user_id):
391392

392393
return now_stream_id, []
393394

394-
@defer.inlineCallbacks
395-
def get_user_whose_devices_changed(self, from_key):
396-
"""Get set of users whose devices have changed since `from_key`.
395+
def get_users_whose_devices_changed(self, from_key, user_ids):
396+
"""Get set of users whose devices have changed since `from_key` that
397+
are in the given list of user_ids.
398+
399+
Args:
400+
from_key (str): The device lists stream token
401+
user_ids (Iterable[str])
402+
403+
Returns:
404+
Deferred[set[str]]: The set of user_ids whose devices have changed
405+
since `from_key`
397406
"""
398407
from_key = int(from_key)
399-
changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
400-
if changed is not None:
401-
defer.returnValue(set(changed))
402408

403-
sql = """
404-
SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
405-
"""
406-
rows = yield self._execute(
407-
"get_user_whose_devices_changed", None, sql, from_key
409+
# Get set of users who *may* have changed. Users not in the returned
410+
# list have definitely not changed.
411+
to_check = list(
412+
self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
413+
)
414+
415+
if not to_check:
416+
return defer.succeed(set())
417+
418+
def _get_users_whose_devices_changed_txn(txn):
419+
changes = set()
420+
421+
sql = """
422+
SELECT DISTINCT user_id FROM device_lists_stream
423+
WHERE stream_id > ?
424+
AND user_id IN (%s)
425+
"""
426+
427+
for chunk in batch_iter(to_check, 100):
428+
txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
429+
changes.update(user_id for user_id, in txn)
430+
431+
return changes
432+
433+
return self.runInteraction(
434+
"get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
408435
)
409-
defer.returnValue(set(row[0] for row in rows))
410436

411437
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
412438
"""Return a list of `(stream_id, user_id, destination)` which is the

0 commit comments

Comments
 (0)