Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Migrate the user directory initial population to a background task (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl authored Mar 18, 2019
1 parent 651ad8b commit 282c973
Show file tree
Hide file tree
Showing 8 changed files with 405 additions and 301 deletions.
1 change: 1 addition & 0 deletions changelog.d/4864.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.
173 changes: 13 additions & 160 deletions synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
world_readable or publically joinable room. We keep a database table up to date
by streaming changes of the current state and recalculating whether users should
be in the directory or not when necessary.
For each user in the directory we also store a room_id which is public and that the
user is joined to. This allows us to ignore history_visibility and join_rules changes
for that user in all other public rooms, as we know they'll still be in at least
one public room.
"""

INITIAL_ROOM_SLEEP_MS = 50
INITIAL_ROOM_SLEEP_COUNT = 100
INITIAL_ROOM_BATCH_SIZE = 100
INITIAL_USER_SLEEP_MS = 10

def __init__(self, hs):
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
Expand All @@ -59,17 +49,6 @@ def __init__(self, hs):
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
self.search_all_users = hs.config.user_directory_search_all_users

# If we're a worker, don't sleep when doing the initial room work, as it
# won't monopolise the master's CPU.
if hs.config.worker_app:
self.INITIAL_ROOM_SLEEP_MS = 0
self.INITIAL_USER_SLEEP_MS = 0

# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
self.initially_handled_users = set()

# The current position in the current_state_delta stream
self.pos = None

Expand Down Expand Up @@ -132,7 +111,7 @@ def handle_local_profile_change(self, user_id, profile):
# Support users are for diagnostics and should not appear in the user directory.
if not is_support:
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None
user_id, profile.display_name, profile.avatar_url
)

@defer.inlineCallbacks
Expand All @@ -149,10 +128,9 @@ def _unsafe_process(self):
if self.pos is None:
self.pos = yield self.store.get_user_directory_stream_pos()

# If still None then we need to do the initial fill of directory
# If still None then the initial background update hasn't happened yet
if self.pos is None:
yield self._do_initial_spam()
self.pos = yield self.store.get_user_directory_stream_pos()
defer.returnValue(None)

# Loop round handling deltas until we're up to date
while True:
Expand All @@ -173,133 +151,6 @@ def _unsafe_process(self):

yield self.store.update_user_directory_stream_pos(self.pos)

@defer.inlineCallbacks
def _do_initial_spam(self):
"""Populates the user_directory from the current state of the DB, used
when synapse first starts with user_directory support
"""
new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()

# Delete any existing entries just in case there are any
yield self.store.delete_all_from_user_dir()

# We process by going through each existing room at a time.
room_ids = yield self.store.get_all_rooms()

logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
num_processed_rooms = 0

for room_id in room_ids:
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)

logger.info("Processed all rooms.")

if self.search_all_users:
num_processed_users = 0
user_ids = yield self.store.get_all_local_users()
logger.info(
"Doing initial update of user directory. %d users", len(user_ids)
)
for user_id in user_ids:
# We add profiles for all users even if they don't match the
# include pattern, just in case we want to change it in future
logger.info(
"Handling user %d/%d", num_processed_users + 1, len(user_ids)
)
yield self._handle_local_user(user_id)
num_processed_users += 1
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)

logger.info("Processed all users")

self.initially_handled_users = None

yield self.store.update_user_directory_stream_pos(new_pos)

@defer.inlineCallbacks
def _handle_initial_room(self, room_id):
"""
Called when we initially fill out user_directory one room at a time
"""
is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
if not is_in_room:
return

is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
)

users_with_profile = yield self.state.get_current_user_in_room(room_id)
user_ids = set(users_with_profile)
unhandled_users = user_ids - self.initially_handled_users

yield self.store.add_profiles_to_user_dir(
{user_id: users_with_profile[user_id] for user_id in unhandled_users}
)

self.initially_handled_users |= unhandled_users

# We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources.
# We also batch up inserts/updates, but try to avoid too many at once.
to_insert = set()
count = 0

if is_public:
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)

if self.store.get_if_app_services_interested_in_user(user_id):
count += 1
continue

to_insert.add(user_id)
if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_in_public_rooms(room_id, to_insert)
to_insert.clear()

if to_insert:
yield self.store.add_users_in_public_rooms(room_id, to_insert)
to_insert.clear()
else:

for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)

if not self.is_mine_id(user_id):
count += 1
continue

if self.store.get_if_app_services_interested_in_user(user_id):
count += 1
continue

for other_user_id in user_ids:
if user_id == other_user_id:
continue

if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
count += 1

user_set = (user_id, other_user_id)
to_insert.add(user_set)

if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_private_room(
room_id, not is_public, to_insert
)
to_insert.clear()

if to_insert:
yield self.store.add_users_who_share_private_room(room_id, to_insert)
to_insert.clear()

@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
Expand Down Expand Up @@ -449,7 +300,9 @@ def _handle_local_user(self, user_id):

row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.add_profiles_to_user_dir({user_id: profile})
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)

@defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
Expand All @@ -461,9 +314,9 @@ def _handle_new_user(self, room_id, user_id, profile):
"""
logger.debug("Adding new user to dir, %r", user_id)

row = yield self.store.get_user_in_directory(user_id)
if not row:
yield self.store.add_profiles_to_user_dir({user_id: profile})
yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url
)

is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
room_id
Expand All @@ -479,7 +332,9 @@ def _handle_new_user(self, room_id, user_id, profile):
# First, if they're our user then we need to update for every user
if self.is_mine_id(user_id):

is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
is_appservice = self.store.get_if_app_services_interested_in_user(
user_id
)

# We don't care about appservice users.
if not is_appservice:
Expand Down Expand Up @@ -546,9 +401,7 @@ def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
new_avatar = event.content.get("avatar_url")

if prev_name != new_name or prev_avatar != new_avatar:
yield self.store.update_profile_in_user_dir(
user_id, new_name, new_avatar, room_id
)
yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)

@defer.inlineCallbacks
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def average_items_per_ms(self):
Returns:
A duration in ms as a float
"""
if self.total_item_count == 0:
if self.avg_duration_ms == 0:
return 0
elif self.total_item_count == 0:
return None
else:
# Use the exponential moving average so that we can adapt to
Expand All @@ -64,7 +66,9 @@ def total_items_per_ms(self):
Returns:
A duration in ms as a float
"""
if self.total_item_count == 0:
if self.total_duration_ms == 0:
return 0
elif self.total_item_count == 0:
return None
else:
return float(self.total_item_count) / float(self.total_duration_ms)
Expand Down
30 changes: 30 additions & 0 deletions synapse/storage/schema/delta/53/user_dir_populate.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Set up staging tables
INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_user_directory_createtables', '{}');

-- Run through each room and update the user directory according to who is in it
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_user_directory_process_rooms', '{}', 'populate_user_directory_createtables');

-- Insert all users, if search_all_users is on
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_user_directory_process_users', '{}', 'populate_user_directory_rooms');

-- Clean up staging tables
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_user_directory_cleanup', '{}', 'populate_user_directory_process_users');
Loading

0 comments on commit 282c973

Please sign in to comment.