7070from synapse .util import json_decoder , json_encoder
7171from synapse .util .caches .descriptors import cached , cachedList
7272from synapse .util .caches .lrucache import LruCache
73- from synapse .util .caches .stream_change_cache import (
74- AllEntitiesChangedResult ,
75- StreamChangeCache ,
76- )
73+ from synapse .util .caches .stream_change_cache import StreamChangeCache
7774from synapse .util .cancellation import cancellable
7875from synapse .util .iterutils import batch_iter
7976from synapse .util .stringutils import shortstr
@@ -132,6 +129,20 @@ def __init__(
132129 prefilled_cache = device_list_prefill ,
133130 )
134131
132+ device_list_room_prefill , min_device_list_room_id = self .db_pool .get_cache_dict (
133+ db_conn ,
134+ "device_lists_changes_in_room" ,
135+ entity_column = "room_id" ,
136+ stream_column = "stream_id" ,
137+ max_value = device_list_max ,
138+ limit = 10000 ,
139+ )
140+ self ._device_list_room_stream_cache = StreamChangeCache (
141+ "DeviceListRoomStreamChangeCache" ,
142+ min_device_list_room_id ,
143+ prefilled_cache = device_list_room_prefill ,
144+ )
145+
135146 (
136147 user_signature_stream_prefill ,
137148 user_signature_stream_list_id ,
@@ -209,6 +220,13 @@ def _invalidate_caches_for_devices(
209220 row .entity , token
210221 )
211222
223+ def device_lists_in_rooms_have_changed (
224+ self , room_ids : StrCollection , token : int
225+ ) -> None :
226+ "Record that device lists have changed in rooms"
227+ for room_id in room_ids :
228+ self ._device_list_room_stream_cache .entity_has_changed (room_id , token )
229+
212230 def get_device_stream_token (self ) -> int :
213231 return self ._device_list_id_gen .get_current_token ()
214232
@@ -832,16 +850,6 @@ async def get_cached_devices_for_user(
832850 )
833851 return {device [0 ]: db_to_json (device [1 ]) for device in devices }
834852
835- def get_cached_device_list_changes (
836- self ,
837- from_key : int ,
838- ) -> AllEntitiesChangedResult :
839- """Get set of users whose devices have changed since `from_key`, or None
840- if that information is not in our cache.
841- """
842-
843- return self ._device_list_stream_cache .get_all_entities_changed (from_key )
844-
845853 @cancellable
846854 async def get_all_devices_changed (
847855 self ,
@@ -1457,7 +1465,7 @@ async def _get_min_device_lists_changes_in_room(self) -> int:
14571465
14581466 @cancellable
14591467 async def get_device_list_changes_in_rooms (
1460- self , room_ids : Collection [str ], from_id : int
1468+ self , room_ids : Collection [str ], from_id : int , to_id : int
14611469 ) -> Optional [Set [str ]]:
14621470 """Return the set of users whose devices have changed in the given rooms
14631471 since the given stream ID.
@@ -1473,9 +1481,15 @@ async def get_device_list_changes_in_rooms(
14731481 if min_stream_id > from_id :
14741482 return None
14751483
1484+ changed_room_ids = self ._device_list_room_stream_cache .get_entities_changed (
1485+ room_ids , from_id
1486+ )
1487+ if not changed_room_ids :
1488+ return set ()
1489+
14761490 sql = """
14771491 SELECT DISTINCT user_id FROM device_lists_changes_in_room
1478- WHERE {clause} AND stream_id >= ?
1492+ WHERE {clause} AND stream_id > ? AND stream_id < = ?
14791493 """
14801494
14811495 def _get_device_list_changes_in_rooms_txn (
@@ -1487,11 +1501,12 @@ def _get_device_list_changes_in_rooms_txn(
14871501 return {user_id for user_id , in txn }
14881502
14891503 changes = set ()
1490- for chunk in batch_iter (room_ids , 1000 ):
1504+ for chunk in batch_iter (changed_room_ids , 1000 ):
14911505 clause , args = make_in_list_sql_clause (
14921506 self .database_engine , "room_id" , chunk
14931507 )
14941508 args .append (from_id )
1509+ args .append (to_id )
14951510
14961511 changes |= await self .db_pool .runInteraction (
14971512 "get_device_list_changes_in_rooms" ,
@@ -1502,6 +1517,34 @@ def _get_device_list_changes_in_rooms_txn(
15021517
15031518 return changes
15041519
1520+ async def get_all_device_list_changes (self , from_id : int , to_id : int ) -> Set [str ]:
1521+ """Return the set of rooms where devices have changed since the given
1522+ stream ID.
1523+
1524+ Will raise an exception if the given stream ID is too old.
1525+ """
1526+
1527+ min_stream_id = await self ._get_min_device_lists_changes_in_room ()
1528+
1529+ if min_stream_id > from_id :
1530+ raise Exception ("stream ID is too old" )
1531+
1532+ sql = """
1533+ SELECT DISTINCT room_id FROM device_lists_changes_in_room
1534+ WHERE stream_id > ? AND stream_id <= ?
1535+ """
1536+
1537+ def _get_all_device_list_changes_txn (
1538+ txn : LoggingTransaction ,
1539+ ) -> Set [str ]:
1540+ txn .execute (sql , (from_id , to_id ))
1541+ return {room_id for room_id , in txn }
1542+
1543+ return await self .db_pool .runInteraction (
1544+ "get_all_device_list_changes" ,
1545+ _get_all_device_list_changes_txn ,
1546+ )
1547+
15051548 async def get_device_list_changes_in_room (
15061549 self , room_id : str , min_stream_id : int
15071550 ) -> Collection [Tuple [str , str ]]:
@@ -1962,8 +2005,8 @@ def _update_remote_device_list_cache_txn(
19622005 async def add_device_change_to_streams (
19632006 self ,
19642007 user_id : str ,
1965- device_ids : Collection [ str ] ,
1966- room_ids : Collection [ str ] ,
2008+ device_ids : StrCollection ,
2009+ room_ids : StrCollection ,
19672010 ) -> Optional [int ]:
19682011 """Persist that a user's devices have been updated, and which hosts
19692012 (if any) should be poked.
@@ -2122,8 +2165,8 @@ def _add_device_outbound_room_poke_txn(
21222165 self ,
21232166 txn : LoggingTransaction ,
21242167 user_id : str ,
2125- device_ids : Iterable [ str ] ,
2126- room_ids : Collection [ str ] ,
2168+ device_ids : StrCollection ,
2169+ room_ids : StrCollection ,
21272170 stream_ids : List [int ],
21282171 context : Dict [str , str ],
21292172 ) -> None :
@@ -2161,6 +2204,10 @@ def _add_device_outbound_room_poke_txn(
21612204 ],
21622205 )
21632206
2207+ txn .call_after (
2208+ self .device_lists_in_rooms_have_changed , room_ids , max (stream_ids )
2209+ )
2210+
21642211 async def get_uncoverted_outbound_room_pokes (
21652212 self , start_stream_id : int , start_room_id : str , limit : int = 10
21662213 ) -> List [Tuple [str , str , str , int , Optional [Dict [str , str ]]]]:
0 commit comments