2727
2828from synapse .api .constants import EduTypes
2929from synapse .api .errors import CodeMessageException , Codes , NotFoundError , SynapseError
30+ from synapse .handlers .device import DeviceHandler
3031from synapse .logging .context import make_deferred_yieldable , run_in_background
3132from synapse .logging .opentracing import log_kv , set_tag , tag_args , trace
32- from synapse .replication .http .devices import ReplicationUserDevicesResyncRestServlet
3333from synapse .types import (
3434 JsonDict ,
3535 UserID ,
@@ -56,27 +56,23 @@ def __init__(self, hs: "HomeServer"):
5656 self .is_mine = hs .is_mine
5757 self .clock = hs .get_clock ()
5858
59- self ._edu_updater = SigningKeyEduUpdater (hs , self )
60-
6159 federation_registry = hs .get_federation_registry ()
6260
63- self ._is_master = hs .config .worker .worker_app is None
64- if not self ._is_master :
65- self ._user_device_resync_client = (
66- ReplicationUserDevicesResyncRestServlet .make_client (hs )
67- )
68- else :
61+ is_master = hs .config .worker .worker_app is None
62+ if is_master :
63+ edu_updater = SigningKeyEduUpdater (hs )
64+
6965 # Only register this edu handler on master as it requires writing
7066 # device updates to the db
7167 federation_registry .register_edu_handler (
7268 EduTypes .SIGNING_KEY_UPDATE ,
73- self . _edu_updater .incoming_signing_key_update ,
69+ edu_updater .incoming_signing_key_update ,
7470 )
7571 # also handle the unstable version
7672 # FIXME: remove this when enough servers have upgraded
7773 federation_registry .register_edu_handler (
7874 EduTypes .UNSTABLE_SIGNING_KEY_UPDATE ,
79- self . _edu_updater .incoming_signing_key_update ,
75+ edu_updater .incoming_signing_key_update ,
8076 )
8177
8278 # doesn't really work as part of the generic query API, because the
@@ -319,14 +315,13 @@ async def _query_devices_for_destination(
319315 # probably be tracking their device lists. However, we haven't
320316 # done an initial sync on the device list so we do it now.
321317 try :
322- if self . _is_master :
323- resync_results = await self .device_handler .device_list_updater .user_device_resync (
318+ resync_results = (
319+ await self .device_handler .device_list_updater .user_device_resync (
324320 user_id
325321 )
326- else :
327- resync_results = await self ._user_device_resync_client (
328- user_id = user_id
329- )
322+ )
323+ if resync_results is None :
324+ raise ValueError ("Device resync failed" )
330325
331326 # Add the device keys to the results.
332327 user_devices = resync_results ["devices" ]
@@ -605,6 +600,8 @@ async def claim_client_keys(destination: str) -> None:
605600 async def upload_keys_for_user (
606601 self , user_id : str , device_id : str , keys : JsonDict
607602 ) -> JsonDict :
603+ # This can only be called from the main process.
604+ assert isinstance (self .device_handler , DeviceHandler )
608605
609606 time_now = self .clock .time_msec ()
610607
@@ -732,6 +729,8 @@ async def upload_signing_keys_for_user(
732729 user_id: the user uploading the keys
733730 keys: the signing keys
734731 """
732+ # This can only be called from the main process.
733+ assert isinstance (self .device_handler , DeviceHandler )
735734
736735 # if a master key is uploaded, then check it. Otherwise, load the
737736 # stored master key, to check signatures on other keys
@@ -823,6 +822,9 @@ async def upload_signatures_for_device_keys(
823822 Raises:
824823 SynapseError: if the signatures dict is not valid.
825824 """
825+ # This can only be called from the main process.
826+ assert isinstance (self .device_handler , DeviceHandler )
827+
826828 failures = {}
827829
828830 # signatures to be stored. Each item will be a SignatureListItem
@@ -1200,6 +1202,9 @@ async def _retrieve_cross_signing_keys_for_remote_user(
12001202 A tuple of the retrieved key content, the key's ID and the matching VerifyKey.
12011203 If the key cannot be retrieved, all values in the tuple will instead be None.
12021204 """
1205+ # This can only be called from the main process.
1206+ assert isinstance (self .device_handler , DeviceHandler )
1207+
12031208 try :
12041209 remote_result = await self .federation .query_user_devices (
12051210 user .domain , user .to_string ()
@@ -1396,11 +1401,14 @@ class SignatureListItem:
13961401class SigningKeyEduUpdater :
13971402 """Handles incoming signing key updates from federation and updates the DB"""
13981403
1399- def __init__ (self , hs : "HomeServer" , e2e_keys_handler : E2eKeysHandler ):
1404+ def __init__ (self , hs : "HomeServer" ):
14001405 self .store = hs .get_datastores ().main
14011406 self .federation = hs .get_federation_client ()
14021407 self .clock = hs .get_clock ()
1403- self .e2e_keys_handler = e2e_keys_handler
1408+
1409+ device_handler = hs .get_device_handler ()
1410+ assert isinstance (device_handler , DeviceHandler )
1411+ self ._device_handler = device_handler
14041412
14051413 self ._remote_edu_linearizer = Linearizer (name = "remote_signing_key" )
14061414
@@ -1445,9 +1453,6 @@ async def _handle_signing_key_updates(self, user_id: str) -> None:
14451453 user_id: the user whose updates we are processing
14461454 """
14471455
1448- device_handler = self .e2e_keys_handler .device_handler
1449- device_list_updater = device_handler .device_list_updater
1450-
14511456 async with self ._remote_edu_linearizer .queue (user_id ):
14521457 pending_updates = self ._pending_updates .pop (user_id , [])
14531458 if not pending_updates :
@@ -1459,13 +1464,11 @@ async def _handle_signing_key_updates(self, user_id: str) -> None:
14591464 logger .info ("pending updates: %r" , pending_updates )
14601465
14611466 for master_key , self_signing_key in pending_updates :
1462- new_device_ids = (
1463- await device_list_updater .process_cross_signing_key_update (
1464- user_id ,
1465- master_key ,
1466- self_signing_key ,
1467- )
1467+ new_device_ids = await self ._device_handler .device_list_updater .process_cross_signing_key_update (
1468+ user_id ,
1469+ master_key ,
1470+ self_signing_key ,
14681471 )
14691472 device_ids = device_ids + new_device_ids
14701473
1471- await device_handler .notify_device_update (user_id , device_ids )
1474+ await self . _device_handler .notify_device_update (user_id , device_ids )
0 commit comments