Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 2ad91ec

Browse files
authored
Set thread_id column to non-null for event_push_{actions,actions_staging,summary} (#15597)
Updates the database schema to require a thread_id (by adding a constraint that the column is non-null) for event_push_actions, event_push_actions_staging, and event_push_actions_summary. For PostgreSQL we add the constraint as NOT VALID, then VALIDATE the constraint a background job to avoid locking the table during an upgrade. Each table is updated as a separate schema delta to avoid deadlocks between them. For SQLite we simply rebuild the table & copy the data.
1 parent a1154df commit 2ad91ec

9 files changed

+292
-223
lines changed

changelog.d/15597.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make the `thread_id` column on `event_push_actions`, `event_push_actions_staging`, and `event_push_summary` non-null.

synapse/storage/background_updates.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,50 @@ async def updater(progress: JsonDict, batch_size: int) -> int:
561561
updater, oneshot=True
562562
)
563563

564+
def register_background_validate_constraint(
565+
self, update_name: str, constraint_name: str, table: str
566+
) -> None:
567+
"""Helper for store classes to do a background validate constraint.
568+
569+
This only applies on PostgreSQL.
570+
571+
To use:
572+
573+
1. use a schema delta file to add a background update. Example:
574+
INSERT INTO background_updates (update_name, progress_json) VALUES
575+
('validate_my_constraint', '{}');
576+
577+
2. In the Store constructor, call this method
578+
579+
Args:
580+
update_name: update_name to register for
581+
constraint_name: name of constraint to validate
582+
table: table the constraint is applied to
583+
"""
584+
585+
def runner(conn: Connection) -> None:
586+
c = conn.cursor()
587+
588+
sql = f"""
589+
ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name};
590+
"""
591+
logger.debug("[SQL] %s", sql)
592+
c.execute(sql)
593+
594+
async def updater(progress: JsonDict, batch_size: int) -> int:
595+
assert isinstance(
596+
self.db_pool.engine, engines.PostgresEngine
597+
), "validate constraint background update registered for non-Postres database"
598+
599+
logger.info("Validating constraint %s to %s", constraint_name, table)
600+
await self.db_pool.runWithConnection(runner)
601+
await self._end_background_update(update_name)
602+
return 1
603+
604+
self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
605+
updater, oneshot=True
606+
)
607+
564608
async def create_index_in_background(
565609
self,
566610
index_name: str,

synapse/storage/databases/main/event_push_actions.py

Lines changed: 31 additions & 223 deletions
Original file line numberDiff line numberDiff line change
@@ -289,179 +289,52 @@ def __init__(
289289
unique=True,
290290
)
291291

292-
self.db_pool.updates.register_background_update_handler(
293-
"event_push_backfill_thread_id",
294-
self._background_backfill_thread_id,
292+
self.db_pool.updates.register_background_validate_constraint(
293+
"event_push_actions_staging_thread_id",
294+
constraint_name="event_push_actions_staging_thread_id",
295+
table="event_push_actions_staging",
295296
)
296-
297-
# Indexes which will be used to quickly make the thread_id column non-null.
298-
self.db_pool.updates.register_background_index_update(
299-
"event_push_actions_thread_id_null",
300-
index_name="event_push_actions_thread_id_null",
297+
self.db_pool.updates.register_background_validate_constraint(
298+
"event_push_actions_thread_id",
299+
constraint_name="event_push_actions_thread_id",
301300
table="event_push_actions",
302-
columns=["thread_id"],
303-
where_clause="thread_id IS NULL",
304301
)
305-
self.db_pool.updates.register_background_index_update(
306-
"event_push_summary_thread_id_null",
307-
index_name="event_push_summary_thread_id_null",
302+
self.db_pool.updates.register_background_validate_constraint(
303+
"event_push_summary_thread_id",
304+
constraint_name="event_push_summary_thread_id",
308305
table="event_push_summary",
309-
columns=["thread_id"],
310-
where_clause="thread_id IS NULL",
311306
)
312307

313-
# Check ASAP (and then later, every 1s) to see if we have finished
314-
# background updates the event_push_actions and event_push_summary tables.
315-
self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
316-
self._event_push_backfill_thread_id_done = False
317-
318-
@wrap_as_background_process("check_event_push_backfill_thread_id")
319-
async def _check_event_push_backfill_thread_id(self) -> None:
320-
"""
321-
Has thread_id finished backfilling?
322-
323-
If not, we need to just-in-time update it so the queries work.
324-
"""
325-
done = await self.db_pool.updates.has_completed_background_update(
326-
"event_push_backfill_thread_id"
308+
self.db_pool.updates.register_background_update_handler(
309+
"event_push_drop_null_thread_id_indexes",
310+
self._background_drop_null_thread_id_indexes,
327311
)
328312

329-
if done:
330-
self._event_push_backfill_thread_id_done = True
331-
else:
332-
# Reschedule to run.
333-
self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
334-
335-
async def _background_backfill_thread_id(
313+
async def _background_drop_null_thread_id_indexes(
336314
self, progress: JsonDict, batch_size: int
337315
) -> int:
338316
"""
339-
Fill in the thread_id field for event_push_actions and event_push_summary.
340-
341-
This is preparatory so that it can be made non-nullable in the future.
342-
343-
Because all current (null) data is done in an unthreaded manner this
344-
simply assumes it is on the "main" timeline. Since event_push_actions
345-
are periodically cleared it is not possible to correctly re-calculate
346-
the thread_id.
317+
Drop the indexes used to find null thread_ids for event_push_actions and
318+
event_push_summary.
347319
"""
348-
event_push_actions_done = progress.get("event_push_actions_done", False)
349320

350-
def add_thread_id_txn(
351-
txn: LoggingTransaction, start_stream_ordering: int
352-
) -> int:
353-
sql = """
354-
SELECT stream_ordering
355-
FROM event_push_actions
356-
WHERE
357-
thread_id IS NULL
358-
AND stream_ordering > ?
359-
ORDER BY stream_ordering
360-
LIMIT ?
361-
"""
362-
txn.execute(sql, (start_stream_ordering, batch_size))
363-
364-
# No more rows to process.
365-
rows = txn.fetchall()
366-
if not rows:
367-
progress["event_push_actions_done"] = True
368-
self.db_pool.updates._background_update_progress_txn(
369-
txn, "event_push_backfill_thread_id", progress
370-
)
371-
return 0
321+
def drop_null_thread_id_indexes_txn(txn: LoggingTransaction) -> None:
322+
sql = "DROP INDEX IF EXISTS event_push_actions_thread_id_null"
323+
logger.debug("[SQL] %s", sql)
324+
txn.execute(sql)
372325

373-
# Update the thread ID for any of those rows.
374-
max_stream_ordering = rows[-1][0]
326+
sql = "DROP INDEX IF EXISTS event_push_summary_thread_id_null"
327+
logger.debug("[SQL] %s", sql)
328+
txn.execute(sql)
375329

376-
sql = """
377-
UPDATE event_push_actions
378-
SET thread_id = 'main'
379-
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
380-
"""
381-
txn.execute(
382-
sql,
383-
(
384-
start_stream_ordering,
385-
max_stream_ordering,
386-
),
387-
)
388-
389-
# Update progress.
390-
processed_rows = txn.rowcount
391-
progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
392-
self.db_pool.updates._background_update_progress_txn(
393-
txn, "event_push_backfill_thread_id", progress
394-
)
395-
396-
return processed_rows
397-
398-
def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
399-
min_user_id = progress.get("max_summary_user_id", "")
400-
min_room_id = progress.get("max_summary_room_id", "")
401-
402-
# Slightly overcomplicated query for getting the Nth user ID / room
403-
# ID tuple, or the last if there are less than N remaining.
404-
sql = """
405-
SELECT user_id, room_id FROM (
406-
SELECT user_id, room_id FROM event_push_summary
407-
WHERE (user_id, room_id) > (?, ?)
408-
AND thread_id IS NULL
409-
ORDER BY user_id, room_id
410-
LIMIT ?
411-
) AS e
412-
ORDER BY user_id DESC, room_id DESC
413-
LIMIT 1
414-
"""
415-
416-
txn.execute(sql, (min_user_id, min_room_id, batch_size))
417-
row = txn.fetchone()
418-
if not row:
419-
return 0
420-
421-
max_user_id, max_room_id = row
422-
423-
sql = """
424-
UPDATE event_push_summary
425-
SET thread_id = 'main'
426-
WHERE
427-
(?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
428-
AND thread_id IS NULL
429-
"""
430-
txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
431-
processed_rows = txn.rowcount
432-
433-
progress["max_summary_user_id"] = max_user_id
434-
progress["max_summary_room_id"] = max_room_id
435-
self.db_pool.updates._background_update_progress_txn(
436-
txn, "event_push_backfill_thread_id", progress
437-
)
438-
439-
return processed_rows
440-
441-
# First update the event_push_actions table, then the event_push_summary table.
442-
#
443-
# Note that the event_push_actions_staging table is ignored since it is
444-
# assumed that items in that table will only exist for a short period of
445-
# time.
446-
if not event_push_actions_done:
447-
result = await self.db_pool.runInteraction(
448-
"event_push_backfill_thread_id",
449-
add_thread_id_txn,
450-
progress.get("max_event_push_actions_stream_ordering", 0),
451-
)
452-
else:
453-
result = await self.db_pool.runInteraction(
454-
"event_push_backfill_thread_id",
455-
add_thread_id_summary_txn,
456-
)
457-
458-
# Only done after the event_push_summary table is done.
459-
if not result:
460-
await self.db_pool.updates._end_background_update(
461-
"event_push_backfill_thread_id"
462-
)
463-
464-
return result
330+
await self.db_pool.runInteraction(
331+
"drop_null_thread_id_indexes_txn",
332+
drop_null_thread_id_indexes_txn,
333+
)
334+
await self.db_pool.updates._end_background_update(
335+
"event_push_drop_null_thread_id_indexes"
336+
)
337+
return 0
465338

466339
async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
467340
"""Get the notification count by room for a user. Only considers notifications,
@@ -711,25 +584,6 @@ def _get_thread(thread_id: str) -> NotifCounts:
711584
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
712585
)
713586

714-
# First ensure that the existing rows have an updated thread_id field.
715-
if not self._event_push_backfill_thread_id_done:
716-
txn.execute(
717-
"""
718-
UPDATE event_push_summary
719-
SET thread_id = ?
720-
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
721-
""",
722-
(MAIN_TIMELINE, room_id, user_id),
723-
)
724-
txn.execute(
725-
"""
726-
UPDATE event_push_actions
727-
SET thread_id = ?
728-
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
729-
""",
730-
(MAIN_TIMELINE, room_id, user_id),
731-
)
732-
733587
# First we pull the counts from the summary table.
734588
#
735589
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
@@ -1545,25 +1399,6 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
15451399
(room_id, user_id, stream_ordering, *thread_args),
15461400
)
15471401

1548-
# First ensure that the existing rows have an updated thread_id field.
1549-
if not self._event_push_backfill_thread_id_done:
1550-
txn.execute(
1551-
"""
1552-
UPDATE event_push_summary
1553-
SET thread_id = ?
1554-
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
1555-
""",
1556-
(MAIN_TIMELINE, room_id, user_id),
1557-
)
1558-
txn.execute(
1559-
"""
1560-
UPDATE event_push_actions
1561-
SET thread_id = ?
1562-
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
1563-
""",
1564-
(MAIN_TIMELINE, room_id, user_id),
1565-
)
1566-
15671402
# Fetch the notification counts between the stream ordering of the
15681403
# latest receipt and what was previously summarised.
15691404
unread_counts = self._get_notif_unread_count_for_user_room(
@@ -1698,19 +1533,6 @@ def _rotate_notifs_before_txn(
16981533
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
16991534
"""
17001535

1701-
# Ensure that any new actions have an updated thread_id.
1702-
if not self._event_push_backfill_thread_id_done:
1703-
txn.execute(
1704-
"""
1705-
UPDATE event_push_actions
1706-
SET thread_id = ?
1707-
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
1708-
""",
1709-
(MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
1710-
)
1711-
1712-
# XXX Do we need to update summaries here too?
1713-
17141536
# Calculate the new counts that should be upserted into event_push_summary
17151537
sql = """
17161538
SELECT user_id, room_id, thread_id,
@@ -1773,20 +1595,6 @@ def _rotate_notifs_before_txn(
17731595

17741596
logger.info("Rotating notifications, handling %d rows", len(summaries))
17751597

1776-
# Ensure that any updated threads have the proper thread_id.
1777-
if not self._event_push_backfill_thread_id_done:
1778-
txn.execute_batch(
1779-
"""
1780-
UPDATE event_push_summary
1781-
SET thread_id = ?
1782-
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
1783-
""",
1784-
[
1785-
(MAIN_TIMELINE, room_id, user_id)
1786-
for user_id, room_id, _ in summaries
1787-
],
1788-
)
1789-
17901598
self.db_pool.simple_upsert_many_txn(
17911599
txn,
17921600
table="event_push_summary",

synapse/storage/schema/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@
110110
# Queries against `event_stream_ordering` columns in membership tables must
111111
# be disambiguated.
112112
#
113+
# The threads_id column must written to with non-null values for the
114+
# event_push_actions, event_push_actions_staging, and event_push_summary tables.
115+
#
113116
# insertions to the column `full_user_id` of tables profiles and user_filters can no
114117
# longer be null
115118
76
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/* Copyright 2023 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- Force the background updates from 06thread_notifications.sql to run in the
17+
-- foreground as code will now require those to be "done".
18+
19+
DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
20+
21+
-- Overwrite any null thread_id values.
22+
UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
23+
UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
24+
UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
25+
26+
-- Drop the background updates to calculate the indexes used to find null thread_ids.
27+
DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';
28+
DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null';

0 commit comments

Comments
 (0)