1313# limitations under the License.
1414
1515import logging
16+ from http import HTTPStatus
1617from typing import TYPE_CHECKING , Any , Dict , List , Optional , Set , Tuple
1718
19+ from twisted .internet .interfaces import IDelayedCall
20+
1821import synapse .metrics
1922from synapse .api .constants import EventTypes , HistoryVisibility , JoinRules , Membership
23+ from synapse .api .errors import Codes , SynapseError
2024from synapse .handlers .state_deltas import MatchChange , StateDeltasHandler
2125from synapse .metrics .background_process_metrics import run_as_background_process
2226from synapse .storage .databases .main .user_directory import SearchResult
2327from synapse .storage .roommember import ProfileInfo
28+ from synapse .types import UserID
2429from synapse .util .metrics import Measure
30+ from synapse .util .retryutils import NotRetryingDestination
31+ from synapse .util .stringutils import non_null_str_or_none
2532
2633if TYPE_CHECKING :
2734 from synapse .server import HomeServer
3340# then be coalesced such that only one /profile request is made).
3441USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
3542
43+ # Maximum number of remote servers that we will attempt to refresh profiles for
44+ # in one go.
45+ MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5
46+
47+ # As long as we have servers to refresh (without backoff), keep adding more
48+ # every 15 seconds.
49+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15
50+
51+
52+ def calculate_time_of_next_retry (now_ts : int , retry_count : int ) -> int :
53+ """
54+ Calculates the time of a next retry given `now_ts` in ms and the number
55+ of failures encountered thus far.
56+
57+ Currently the sequence goes:
58+ 1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week
59+ """
60+ return now_ts + 60_000 * (5 ** min (retry_count , 7 ))
61+
3662
3763class UserDirectoryHandler (StateDeltasHandler ):
3864 """Handles queries and updates for the user_directory.
@@ -69,19 +95,36 @@ def __init__(self, hs: "HomeServer"):
6995 self .update_user_directory = hs .config .worker .should_update_user_directory
7096 self .search_all_users = hs .config .userdirectory .user_directory_search_all_users
7197 self .spam_checker = hs .get_spam_checker ()
98+ self ._hs = hs
99+
72100 # The current position in the current_state_delta stream
73101 self .pos : Optional [int ] = None
74102
75103 # Guard to ensure we only process deltas one at a time
76104 self ._is_processing = False
77105
106+ # Guard to ensure we only have one process for refreshing remote profiles
107+ self ._is_refreshing_remote_profiles = False
108+ # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
109+ self ._refresh_remote_profiles_call_later : Optional [IDelayedCall ] = None
110+
111+ # Guard to ensure we only have one process for refreshing remote profiles
112+ # for the given servers.
113+ # Set of server names.
114+ self ._is_refreshing_remote_profiles_for_servers : Set [str ] = set ()
115+
78116 if self .update_user_directory :
79117 self .notifier .add_replication_callback (self .notify_new_event )
80118
81119 # We kick this off so that we don't have to wait for a change before
82120 # we start populating the user directory
83121 self .clock .call_later (0 , self .notify_new_event )
84122
123+ # Kick off the profile refresh process on startup
124+ self ._refresh_remote_profiles_call_later = self .clock .call_later (
125+ 10 , self .kick_off_remote_profile_refresh_process
126+ )
127+
85128 async def search_users (
86129 self , user_id : str , search_term : str , limit : int
87130 ) -> SearchResult :
@@ -483,6 +526,20 @@ async def _handle_possible_remote_profile_change(
483526 next_try_at_ms = now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS ,
484527 retry_counter = 0 ,
485528 )
529+ # Schedule a wake-up to refresh the user directory for this server.
530+ # We intentionally wake up this server directly because we don't want
531+ # other servers ahead of it in the queue to get in the way of updating
532+ # the profile if the server only just sent us an event.
533+ self .clock .call_later (
534+ USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1 ,
535+ self .kick_off_remote_profile_refresh_process_for_remote_server ,
536+ UserID .from_string (user_id ).domain ,
537+ )
538+ # Schedule a wake-up to handle any backoffs that may occur in the future.
539+ self .clock .call_later (
540+ 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1 ,
541+ self .kick_off_remote_profile_refresh_process ,
542+ )
486543 return
487544
488545 prev_name = prev_event .content .get ("displayname" )
@@ -505,3 +562,188 @@ async def _handle_possible_remote_profile_change(
505562 # Only update if something has changed, or we didn't have a previous event
506563 # in the first place.
507564 await self .store .update_profile_in_user_dir (user_id , new_name , new_avatar )
565+
566+ def kick_off_remote_profile_refresh_process (self ) -> None :
567+ """Called when there may be remote users with stale profiles to be refreshed"""
568+ if not self .update_user_directory :
569+ return
570+
571+ if self ._is_refreshing_remote_profiles :
572+ return
573+
574+ if self ._refresh_remote_profiles_call_later :
575+ if self ._refresh_remote_profiles_call_later .active ():
576+ self ._refresh_remote_profiles_call_later .cancel ()
577+ self ._refresh_remote_profiles_call_later = None
578+
579+ async def process () -> None :
580+ try :
581+ await self ._unsafe_refresh_remote_profiles ()
582+ finally :
583+ self ._is_refreshing_remote_profiles = False
584+
585+ self ._is_refreshing_remote_profiles = True
586+ run_as_background_process ("user_directory.refresh_remote_profiles" , process )
587+
588+ async def _unsafe_refresh_remote_profiles (self ) -> None :
589+ limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len (
590+ self ._is_refreshing_remote_profiles_for_servers
591+ )
592+ if limit <= 0 :
593+ # nothing to do: already refreshing the maximum number of servers
594+ # at once.
595+ # Come back later.
596+ self ._refresh_remote_profiles_call_later = self .clock .call_later (
597+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES ,
598+ self .kick_off_remote_profile_refresh_process ,
599+ )
600+ return
601+
602+ servers_to_refresh = (
603+ await self .store .get_remote_servers_with_profiles_to_refresh (
604+ now_ts = self .clock .time_msec (), limit = limit
605+ )
606+ )
607+
608+ if not servers_to_refresh :
609+ # Do we have any backing-off servers that we should try again
610+ # for eventually?
611+ # By setting `now` is a point in the far future, we can ask for
612+ # which server/user is next to be refreshed, even though it is
613+ # not actually refreshable *now*.
614+ end_of_time = 1 << 62
615+ backing_off_servers = (
616+ await self .store .get_remote_servers_with_profiles_to_refresh (
617+ now_ts = end_of_time , limit = 1
618+ )
619+ )
620+ if backing_off_servers :
621+ # Find out when the next user is refreshable and schedule a
622+ # refresh then.
623+ backing_off_server_name = backing_off_servers [0 ]
624+ users = await self .store .get_remote_users_to_refresh_on_server (
625+ backing_off_server_name , now_ts = end_of_time , limit = 1
626+ )
627+ if not users :
628+ return
629+ _ , _ , next_try_at_ts = users [0 ]
630+ self ._refresh_remote_profiles_call_later = self .clock .call_later (
631+ ((next_try_at_ts - self .clock .time_msec ()) // 1000 ) + 2 ,
632+ self .kick_off_remote_profile_refresh_process ,
633+ )
634+
635+ return
636+
637+ for server_to_refresh in servers_to_refresh :
638+ self .kick_off_remote_profile_refresh_process_for_remote_server (
639+ server_to_refresh
640+ )
641+
642+ self ._refresh_remote_profiles_call_later = self .clock .call_later (
643+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES ,
644+ self .kick_off_remote_profile_refresh_process ,
645+ )
646+
647+ def kick_off_remote_profile_refresh_process_for_remote_server (
648+ self , server_name : str
649+ ) -> None :
650+ """Called when there may be remote users with stale profiles to be refreshed
651+ on the given server."""
652+ if not self .update_user_directory :
653+ return
654+
655+ if server_name in self ._is_refreshing_remote_profiles_for_servers :
656+ return
657+
658+ async def process () -> None :
659+ try :
660+ await self ._unsafe_refresh_remote_profiles_for_remote_server (
661+ server_name
662+ )
663+ finally :
664+ self ._is_refreshing_remote_profiles_for_servers .remove (server_name )
665+
666+ self ._is_refreshing_remote_profiles_for_servers .add (server_name )
667+ run_as_background_process (
668+ "user_directory.refresh_remote_profiles_for_remote_server" , process
669+ )
670+
671+ async def _unsafe_refresh_remote_profiles_for_remote_server (
672+ self , server_name : str
673+ ) -> None :
674+ logger .info ("Refreshing profiles in user directory for %s" , server_name )
675+
676+ while True :
677+ # Get a handful of users to process.
678+ next_batch = await self .store .get_remote_users_to_refresh_on_server (
679+ server_name , now_ts = self .clock .time_msec (), limit = 10
680+ )
681+ if not next_batch :
682+ # Finished for now
683+ return
684+
685+ for user_id , retry_counter , _ in next_batch :
686+ # Request the profile of the user.
687+ try :
688+ profile = await self ._hs .get_profile_handler ().get_profile (
689+ user_id , ignore_backoff = False
690+ )
691+ except NotRetryingDestination as e :
692+ logger .info (
693+ "Failed to refresh profile for %r because the destination is undergoing backoff" ,
694+ user_id ,
695+ )
696+ # As a special-case, we back off until the destination is no longer
697+ # backed off from.
698+ await self .store .set_remote_user_profile_in_user_dir_stale (
699+ user_id ,
700+ e .retry_last_ts + e .retry_interval ,
701+ retry_counter = retry_counter + 1 ,
702+ )
703+ continue
704+ except SynapseError as e :
705+ if e .code == HTTPStatus .NOT_FOUND and e .errcode == Codes .NOT_FOUND :
706+ # The profile doesn't exist.
707+ # TODO Does this mean we should clear it from our user
708+ # directory?
709+ await self .store .clear_remote_user_profile_in_user_dir_stale (
710+ user_id
711+ )
712+ logger .warning (
713+ "Refresh of remote profile %r: not found (%r)" ,
714+ user_id ,
715+ e .msg ,
716+ )
717+ continue
718+
719+ logger .warning (
720+ "Failed to refresh profile for %r because %r" , user_id , e
721+ )
722+ await self .store .set_remote_user_profile_in_user_dir_stale (
723+ user_id ,
724+ calculate_time_of_next_retry (
725+ self .clock .time_msec (), retry_counter + 1
726+ ),
727+ retry_counter = retry_counter + 1 ,
728+ )
729+ continue
730+ except Exception :
731+ logger .error (
732+ "Failed to refresh profile for %r due to unhandled exception" ,
733+ user_id ,
734+ exc_info = True ,
735+ )
736+ await self .store .set_remote_user_profile_in_user_dir_stale (
737+ user_id ,
738+ calculate_time_of_next_retry (
739+ self .clock .time_msec (), retry_counter + 1
740+ ),
741+ retry_counter = retry_counter + 1 ,
742+ )
743+ continue
744+
745+ await self .store .update_profile_in_user_dir (
746+ user_id ,
747+ display_name = non_null_str_or_none (profile .get ("displayname" )),
748+ avatar_url = non_null_str_or_none (profile .get ("avatar_url" )),
749+ )
0 commit comments