Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 437d6d1

Browse files
author
Sean Quah
committed
Use (stream id, room id) position to fetch unconverted device changes
Signed-off-by: Sean Quah <seanq@matrix.org>
1 parent 23befb4 commit 437d6d1

File tree

2 files changed

+54
-6
lines changed

2 files changed

+54
-6
lines changed

synapse/handlers/device.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,13 +682,33 @@ async def _handle_new_device_update_async(self) -> None:
682682
hosts_already_sent_to: Set[str] = set()
683683

684684
try:
685+
stream_id, room_id = await self.store.get_device_change_last_converted_pos()
686+
685687
while True:
686688
self._handle_new_device_update_new_data = False
687-
rows = await self.store.get_uncoverted_outbound_room_pokes()
689+
max_stream_id = self.store.get_device_stream_token()
690+
rows = await self.store.get_uncoverted_outbound_room_pokes(
691+
stream_id, room_id
692+
)
688693
if not rows:
689694
# If the DB returned nothing then there is nothing left to
690695
# do, *unless* a new device list update happened during the
691696
# DB query.
697+
698+
# Advance `(stream_id, room_id)`.
699+
# `max_stream_id` comes from *before* the query for unconverted
700+
# rows, which means that any unconverted rows must have a larger
701+
# stream ID.
702+
if max_stream_id > stream_id:
703+
stream_id, room_id = max_stream_id, ""
704+
await self.store.set_device_change_last_converted_pos(
705+
stream_id, room_id
706+
)
707+
else:
708+
assert max_stream_id == stream_id
709+
# Avoid moving `room_id` backwards.
710+
pass
711+
692712
if self._handle_new_device_update_new_data:
693713
continue
694714
else:
@@ -752,6 +772,12 @@ async def _handle_new_device_update_async(self) -> None:
752772
hosts_already_sent_to.update(hosts)
753773
current_stream_id = stream_id
754774

775+
# Advance `(stream_id, room_id)`.
776+
_, _, room_id, stream_id, _ = rows[-1]
777+
await self.store.set_device_change_last_converted_pos(
778+
stream_id, room_id
779+
)
780+
755781
finally:
756782
self._handle_new_device_update_is_processing = False
757783

synapse/storage/databases/main/devices.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,27 +2008,49 @@ def _add_device_outbound_room_poke_txn(
20082008
)
20092009

20102010
async def get_uncoverted_outbound_room_pokes(
2011-
self, limit: int = 10
2011+
self, start_stream_id: int, start_room_id: str, limit: int = 10
20122012
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
20132013
"""Get device list changes by room that have not yet been handled and
20142014
written to `device_lists_outbound_pokes`.
20152015
2016+
Args:
2017+
start_stream_id: Together with `start_room_id`, indicates the position after
2018+
which to return device list changes.
2019+
start_room_id: Together with `start_stream_id`, indicates the position after
2020+
which to return device list changes.
2021+
limit: The maximum number of device list changes to return.
2022+
20162023
Returns:
2017-
A list of user ID, device ID, room ID, stream ID and optional opentracing context.
2024+
A list of user ID, device ID, room ID, stream ID and optional opentracing
2025+
context, in order of ascending (stream ID, room ID).
20182026
"""
20192027

20202028
sql = """
20212029
SELECT user_id, device_id, room_id, stream_id, opentracing_context
20222030
FROM device_lists_changes_in_room
2023-
WHERE NOT converted_to_destinations
2024-
ORDER BY stream_id
2031+
WHERE
2032+
(stream_id > ? OR (stream_id = ? AND room_id > ?)) AND
2033+
stream_id <= ? AND
2034+
NOT converted_to_destinations
2035+
ORDER BY stream_id ASC, room_id ASC
20252036
LIMIT ?
20262037
"""
20272038

20282039
def get_uncoverted_outbound_room_pokes_txn(
20292040
txn: LoggingTransaction,
20302041
) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
2031-
txn.execute(sql, (limit,))
2042+
txn.execute(
2043+
sql,
2044+
(
2045+
start_stream_id,
2046+
start_stream_id,
2047+
start_room_id,
2048+
# Avoid returning rows if there may be uncommitted device list
2049+
# changes with smaller stream IDs.
2050+
self._device_list_id_gen.get_current_token(),
2051+
limit,
2052+
),
2053+
)
20322054

20332055
return [
20342056
(

0 commit comments

Comments
 (0)