This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
User directory background update speedup #15435
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Speed up the user directory background update. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -102,44 +102,34 @@ async def _populate_user_directory_createtables( | |||
) -> int: | ||||
# Get all the rooms that we want to process. | ||||
def _make_staging_area(txn: LoggingTransaction) -> None: | ||||
sql = ( | ||||
"CREATE TABLE IF NOT EXISTS " | ||||
+ TEMP_TABLE | ||||
+ "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" | ||||
) | ||||
txn.execute(sql) | ||||
|
||||
sql = ( | ||||
"CREATE TABLE IF NOT EXISTS " | ||||
+ TEMP_TABLE | ||||
+ "_position(position TEXT NOT NULL)" | ||||
) | ||||
txn.execute(sql) | ||||
|
||||
# Get rooms we want to process from the database | ||||
sql = """ | ||||
SELECT room_id, count(*) FROM current_state_events | ||||
sql = f""" | ||||
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS | ||||
SELECT room_id, count(*) AS events | ||||
FROM current_state_events | ||||
GROUP BY room_id | ||||
""" | ||||
txn.execute(sql) | ||||
rooms = list(txn.fetchall()) | ||||
self.db_pool.simple_insert_many_txn( | ||||
txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms | ||||
txn.execute( | ||||
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_rm ON {TEMP_TABLE}_rooms (room_id)" | ||||
) | ||||
del rooms | ||||
|
||||
sql = ( | ||||
"CREATE TABLE IF NOT EXISTS " | ||||
+ TEMP_TABLE | ||||
+ "_users(user_id TEXT NOT NULL)" | ||||
txn.execute( | ||||
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_evs ON {TEMP_TABLE}_rooms (events)" | ||||
) | ||||
txn.execute(sql) | ||||
|
||||
txn.execute("SELECT name FROM users") | ||||
users = list(txn.fetchall()) | ||||
sql = f""" | ||||
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position ( | ||||
position TEXT NOT NULL | ||||
) | ||||
""" | ||||
txn.execute(sql) | ||||
|
||||
self.db_pool.simple_insert_many_txn( | ||||
txn, TEMP_TABLE + "_users", keys=("user_id",), values=users | ||||
sql = f""" | ||||
CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS | ||||
SELECT name AS user_id FROM users | ||||
""" | ||||
txn.execute(sql) | ||||
txn.execute( | ||||
f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_users_idx ON {TEMP_TABLE}_users (user_id)" | ||||
) | ||||
|
||||
new_pos = await self.get_max_stream_id_in_current_state_deltas() | ||||
|
@@ -222,12 +212,13 @@ def _get_next_batch( | |||
if not rooms_to_work_on: | ||||
return None | ||||
|
||||
# Get how many are left to process, so we can give status on how | ||||
# far we are in processing | ||||
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") | ||||
result = txn.fetchone() | ||||
assert result is not None | ||||
progress["remaining"] = result[0] | ||||
if "remaining" not in progress: | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand what this is trying to do. Does this mean we write a progress count at the end of the first iteration and never update the counter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah sorry. We fetch the count once, and then afterwards we decrement the count every time we delete a row from the table.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, perfect. |
||||
# Get how many are left to process, so we can give status on how | ||||
# far we are in processing | ||||
txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") | ||||
result = txn.fetchone() | ||||
assert result is not None | ||||
progress["remaining"] = result[0] | ||||
|
||||
return rooms_to_work_on | ||||
|
||||
|
@@ -332,7 +323,14 @@ def _get_next_batch( | |||
|
||||
if processed_event_count > batch_size: | ||||
# Don't process any more rooms, we've hit our batch size. | ||||
return processed_event_count | ||||
break | ||||
|
||||
await self.db_pool.runInteraction( | ||||
"populate_user_directory", | ||||
self.db_pool.updates._background_update_progress_txn, | ||||
"populate_user_directory_process_rooms", | ||||
progress, | ||||
) | ||||
|
||||
return processed_event_count | ||||
|
||||
|
@@ -356,13 +354,14 @@ def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: | |||
|
||||
users_to_work_on = [x[0] for x in user_result] | ||||
|
||||
# Get how many are left to process, so we can give status on how | ||||
# far we are in processing | ||||
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" | ||||
txn.execute(sql) | ||||
count_result = txn.fetchone() | ||||
assert count_result is not None | ||||
progress["remaining"] = count_result[0] | ||||
if "remaining" not in progress: | ||||
# Get how many are left to process, so we can give status on how | ||||
# far we are in processing | ||||
sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" | ||||
txn.execute(sql) | ||||
count_result = txn.fetchone() | ||||
assert count_result is not None | ||||
progress["remaining"] = count_result[0] | ||||
|
||||
return users_to_work_on | ||||
|
||||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't fully sure why we needed an index at all. I guess the point is:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
And for the rooms table we process rooms in order of the
events
column, descending.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was an oversight here. We did not add these indices to preexisting
TEMP_TABLE
s. So this change will make future user dir rebuild speedy, but won't help anyone who is doing a rebuild before they upgrade.Discussion in synapse-dev: https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$oXx8Qg7YkhIqTZkIBwgBEW8qRtk-_AzA4M-85PSR3zU?via=matrix.org&via=element.io&via=beeper.com