Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refresh remote profiles that have been marked as stale, in order to fill the user directory. [rei:userdirpriv] #14756

Merged
merged 11 commits into from
Mar 16, 2023
1 change: 1 addition & 0 deletions changelog.d/14756.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change.
4 changes: 2 additions & 2 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, hs: "HomeServer"):

self._third_party_rules = hs.get_third_party_event_rules()

async def get_profile(self, user_id: str) -> JsonDict:
async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
target_user = UserID.from_string(user_id)

if self.hs.is_mine(target_user):
Expand All @@ -81,7 +81,7 @@ async def get_profile(self, user_id: str) -> JsonDict:
destination=target_user.domain,
query_type="profile",
args={"user_id": user_id},
ignore_backoff=True,
ignore_backoff=ignore_backoff,
)
return result
except RequestSendFailed as e:
Expand Down
242 changes: 242 additions & 0 deletions synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,22 @@
# limitations under the License.

import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple

from twisted.internet.interfaces import IDelayedCall

import synapse.metrics
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
from synapse.types import UserID
from synapse.util.metrics import Measure
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import non_null_str_or_none

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -33,6 +40,25 @@
# then be coalesced such that only one /profile request is made).
USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000

# Maximum number of remote servers that we will attempt to refresh profiles for
# in one go.
MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5

# As long as we have servers to refresh (without backoff), keep adding more
# every 15 seconds.
INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15


def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int:
"""
Calculates the time of a next retry given `now_ts` in ms and the number
of failures encountered thus far.

Currently the sequence goes:
1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week
"""
return now_ts + 60_000 * (5 ** min(retry_count, 7))


class UserDirectoryHandler(StateDeltasHandler):
"""Handles queries and updates for the user_directory.
Expand Down Expand Up @@ -69,19 +95,36 @@ def __init__(self, hs: "HomeServer"):
self.update_user_directory = hs.config.worker.should_update_user_directory
self.search_all_users = hs.config.userdirectory.user_directory_search_all_users
self.spam_checker = hs.get_spam_checker()
self._hs = hs

# The current position in the current_state_delta stream
self.pos: Optional[int] = None

# Guard to ensure we only process deltas one at a time
self._is_processing = False

# Guard to ensure we only have one process for refreshing remote profiles
self._is_refreshing_remote_profiles = False
# Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None

# Guard to ensure we only have one process for refreshing remote profiles
# for the given servers.
# Set of server names.
self._is_refreshing_remote_profiles_for_servers: Set[str] = set()

if self.update_user_directory:
self.notifier.add_replication_callback(self.notify_new_event)

# We kick this off so that we don't have to wait for a change before
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)

# Kick off the profile refresh process on startup
self._refresh_remote_profiles_call_later = self.clock.call_later(
10, self.kick_off_remote_profile_refresh_process
)

async def search_users(
self, user_id: str, search_term: str, limit: int
) -> SearchResult:
Expand Down Expand Up @@ -483,6 +526,20 @@ async def _handle_possible_remote_profile_change(
next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
retry_counter=0,
)
# Schedule a wake-up to refresh the user directory for this server.
# We intentionally wake up this server directly because we don't want
# other servers ahead of it in the queue to get in the way of updating
# the profile if the server only just sent us an event.
self.clock.call_later(
USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
self.kick_off_remote_profile_refresh_process_for_remote_server,
UserID.from_string(user_id).domain,
)
# Schedule a wake-up to handle any backoffs that may occur in the future.
self.clock.call_later(
2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
self.kick_off_remote_profile_refresh_process,
)
return

prev_name = prev_event.content.get("displayname")
Expand All @@ -505,3 +562,188 @@ async def _handle_possible_remote_profile_change(
# Only update if something has changed, or we didn't have a previous event
# in the first place.
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)

def kick_off_remote_profile_refresh_process(self) -> None:
"""Called when there may be remote users with stale profiles to be refreshed"""
if not self.update_user_directory:
return

if self._is_refreshing_remote_profiles:
return

if self._refresh_remote_profiles_call_later:
if self._refresh_remote_profiles_call_later.active():
self._refresh_remote_profiles_call_later.cancel()
self._refresh_remote_profiles_call_later = None

async def process() -> None:
try:
await self._unsafe_refresh_remote_profiles()
finally:
self._is_refreshing_remote_profiles = False

self._is_refreshing_remote_profiles = True
run_as_background_process("user_directory.refresh_remote_profiles", process)

async def _unsafe_refresh_remote_profiles(self) -> None:
limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len(
self._is_refreshing_remote_profiles_for_servers
)
if limit <= 0:
# nothing to do: already refreshing the maximum number of servers
# at once.
# Come back later.
self._refresh_remote_profiles_call_later = self.clock.call_later(
INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
self.kick_off_remote_profile_refresh_process,
)
return

servers_to_refresh = (
await self.store.get_remote_servers_with_profiles_to_refresh(
now_ts=self.clock.time_msec(), limit=limit
)
)

if not servers_to_refresh:
# Do we have any backing-off servers that we should try again
# for eventually?
# By setting `now` is a point in the far future, we can ask for
# which server/user is next to be refreshed, even though it is
# not actually refreshable *now*.
end_of_time = 1 << 62
backing_off_servers = (
await self.store.get_remote_servers_with_profiles_to_refresh(
now_ts=end_of_time, limit=1
)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
)
if backing_off_servers:
# Find out when the next user is refreshable and schedule a
# refresh then.
backing_off_server_name = backing_off_servers[0]
users = await self.store.get_remote_users_to_refresh_on_server(
backing_off_server_name, now_ts=end_of_time, limit=1
)
if not users:
return
_, _, next_try_at_ts = users[0]
self._refresh_remote_profiles_call_later = self.clock.call_later(
((next_try_at_ts - self.clock.time_msec()) // 1000) + 2,
self.kick_off_remote_profile_refresh_process,
)

return

for server_to_refresh in servers_to_refresh:
self.kick_off_remote_profile_refresh_process_for_remote_server(
server_to_refresh
)

self._refresh_remote_profiles_call_later = self.clock.call_later(
INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
self.kick_off_remote_profile_refresh_process,
)

def kick_off_remote_profile_refresh_process_for_remote_server(
self, server_name: str
) -> None:
"""Called when there may be remote users with stale profiles to be refreshed
on the given server."""
if not self.update_user_directory:
return

if server_name in self._is_refreshing_remote_profiles_for_servers:
return

async def process() -> None:
try:
await self._unsafe_refresh_remote_profiles_for_remote_server(
server_name
)
finally:
self._is_refreshing_remote_profiles_for_servers.remove(server_name)

self._is_refreshing_remote_profiles_for_servers.add(server_name)
run_as_background_process(
"user_directory.refresh_remote_profiles_for_remote_server", process
)

async def _unsafe_refresh_remote_profiles_for_remote_server(
self, server_name: str
) -> None:
logger.info("Refreshing profiles in user directory for %s", server_name)

while True:
# Get a handful of users to process.
next_batch = await self.store.get_remote_users_to_refresh_on_server(
server_name, now_ts=self.clock.time_msec(), limit=10
)
if not next_batch:
# Finished for now
return

for user_id, retry_counter, _ in next_batch:
# Request the profile of the user.
try:
profile = await self._hs.get_profile_handler().get_profile(
user_id, ignore_backoff=False
)
except NotRetryingDestination as e:
logger.info(
"Failed to refresh profile for %r because the destination is undergoing backoff",
user_id,
)
# As a special-case, we back off until the destination is no longer
# backed off from.
await self.store.set_remote_user_profile_in_user_dir_stale(
user_id,
e.retry_last_ts + e.retry_interval,
retry_counter=retry_counter + 1,
)
continue
except SynapseError as e:
if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND:
# The profile doesn't exist.
# TODO Does this mean we should clear it from our user
# directory?
await self.store.clear_remote_user_profile_in_user_dir_stale(
user_id
)
logger.warning(
"Refresh of remote profile %r: not found (%r)",
user_id,
e.msg,
)
continue

logger.warning(
"Failed to refresh profile for %r because %r", user_id, e
)
await self.store.set_remote_user_profile_in_user_dir_stale(
user_id,
calculate_time_of_next_retry(
self.clock.time_msec(), retry_counter + 1
),
retry_counter=retry_counter + 1,
)
continue
except Exception:
logger.error(
"Failed to refresh profile for %r due to unhandled exception",
user_id,
exc_info=True,
)
await self.store.set_remote_user_profile_in_user_dir_stale(
user_id,
calculate_time_of_next_retry(
self.clock.time_msec(), retry_counter + 1
),
retry_counter=retry_counter + 1,
)
continue

await self.store.update_profile_in_user_dir(
user_id,
display_name=non_null_str_or_none(profile.get("displayname")),
avatar_url=non_null_str_or_none(profile.get("avatar_url")),
)
74 changes: 74 additions & 0 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,80 @@ async def set_remote_user_profile_in_user_dir_stale(
desc="set_remote_user_profile_in_user_dir_stale",
)

async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
"""
Marks a remote user as no longer having a possibly-stale user directory profile.

Args:
user_id: the remote user who no longer has a stale profile on this server.
"""
await self.db_pool.simple_delete(
table="user_directory_stale_remote_users",
keyvalues={"user_id": user_id},
desc="clear_remote_user_profile_in_user_dir_stale",
)

async def get_remote_servers_with_profiles_to_refresh(
self, now_ts: int, limit: int
) -> List[str]:
"""
Get a list of up to `limit` server names which have users whose
locally-cached profiles we believe to be stale
and are refreshable given the current time `now_ts` in milliseconds.
"""

def _get_remote_servers_with_refreshable_profiles_txn(
txn: LoggingTransaction,
) -> List[str]:
sql = """
SELECT user_server_name
FROM user_directory_stale_remote_users
WHERE next_try_at_ts < ?
GROUP BY user_server_name
ORDER BY MIN(next_try_at_ts), user_server_name
LIMIT ?
"""
txn.execute(sql, (now_ts, limit))
return [row[0] for row in txn]

return await self.db_pool.runInteraction(
"get_remote_servers_with_profiles_to_refresh",
_get_remote_servers_with_refreshable_profiles_txn,
)

async def get_remote_users_to_refresh_on_server(
self, server_name: str, now_ts: int, limit: int
) -> List[Tuple[str, int, int]]:
"""
Get a list of up to `limit` user IDs from the server `server_name`
whose locally-cached profiles we believe to be stale
and are refreshable given the current time `now_ts` in milliseconds.

Returns:
tuple of:
- User ID
- Retry counter (number of failures so far)
- Time the retry is scheduled for, in milliseconds
"""

def _get_remote_users_to_refresh_on_server_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, int, int]]:
sql = """
SELECT user_id, retry_counter, next_try_at_ts
FROM user_directory_stale_remote_users
WHERE user_server_name = ? AND next_try_at_ts < ?
ORDER BY next_try_at_ts
LIMIT ?
"""
txn.execute(sql, (server_name, now_ts, limit))
return cast(List[Tuple[str, int, int]], txn.fetchall())

return await self.db_pool.runInteraction(
"get_remote_users_to_refresh_on_server",
_get_remote_users_to_refresh_on_server_txn,
)

async def update_profile_in_user_dir(
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
) -> None:
Expand Down
Loading