Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update the thread_id right before use (in case the bg update hasn't finished) #14222

Merged
merged 5 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14222.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support for thread-specific notifications & receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771) and [MSC3773](https://github.com/matrix-org/matrix-spec-proposals/pull/3773)).
103 changes: 103 additions & 0 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,44 @@ def __init__(
self._background_backfill_thread_id,
)

# Indexes which will be used to quickly make the thread_id column non-null.
self.db_pool.updates.register_background_index_update(
"event_push_actions_thread_id_null",
index_name="event_push_actions_thread_id_null",
table="event_push_actions",
columns=["thread_id"],
where_clause="thread_id IS NULL",
)
self.db_pool.updates.register_background_index_update(
"event_push_summary_thread_id_null",
index_name="event_push_summary_thread_id_null",
table="event_push_summary",
columns=["thread_id"],
where_clause="thread_id IS NULL",
)

# Check ASAP (and then later, every 1s) to see if we have finished
# background updates the event_push_actions and event_push_summary tables.
self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
self._event_push_backfill_thread_id_done = False
Comment on lines +313 to +316
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe overkill, but it seems good to not constantly do those updates if we don't need to?


@wrap_as_background_process("check_event_push_backfill_thread_id")
async def _check_event_push_backfill_thread_id(self) -> None:
"""
Has thread_id finished backfilling?

If not, we need to just-in-time update it so the queries work.
"""
done = await self.db_pool.updates.has_completed_background_update(
"event_push_backfill_thread_id"
)

if done:
self._event_push_backfill_thread_id_done = True
else:
# Reschedule to run.
self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)

async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down Expand Up @@ -526,6 +564,25 @@ def _get_thread(thread_id: str) -> NotifCounts:
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)

# First ensure that the existing rows have an updated thread_id field.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)

# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
Expand Down Expand Up @@ -1341,6 +1398,25 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
(room_id, user_id, stream_ordering, *thread_args),
)

# First ensure that the existing rows have an updated thread_id field.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
(MAIN_TIMELINE, room_id, user_id),
)

# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
unread_counts = self._get_notif_unread_count_for_user_room(
Expand Down Expand Up @@ -1475,6 +1551,19 @@ def _rotate_notifs_before_txn(
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
"""

# Ensure that any new actions have an updated thread_id.
if not self._event_push_backfill_thread_id_done:
txn.execute(
"""
UPDATE event_push_actions
SET thread_id = ?
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
""",
(MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
)

# XXX Do we need to update summaries here too?

# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id, thread_id,
Expand Down Expand Up @@ -1537,6 +1626,20 @@ def _rotate_notifs_before_txn(

logger.info("Rotating notifications, handling %d rows", len(summaries))

# Ensure that any updated threads have the proper thread_id.
if not self._event_push_backfill_thread_id_done:
txn.execute_batch(
"""
UPDATE event_push_summary
SET thread_id = ?
WHERE room_id = ? AND user_id = ? AND thread_id is NULL
""",
[
(MAIN_TIMELINE, room_id, user_id)
for user_id, room_id, _ in summaries
],
)

self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
* limitations under the License.
*/

-- The columns can now be made non-nullable.
ALTER TABLE event_push_actions_staging ALTER COLUMN thread_id SET NOT NULL;
ALTER TABLE event_push_actions ALTER COLUMN thread_id SET NOT NULL;
ALTER TABLE event_push_summary ALTER COLUMN thread_id SET NOT NULL;
-- Allow there to be multiple summaries per user/room.
DROP INDEX IF EXISTS event_push_summary_unique_index;
Comment on lines +16 to +17
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping this index is somewhat scary, I don't know how well this will handle rolling back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we can only rollback to v1.68, which I thought we checked was fine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it will work OK with that, yes.


INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7306, 'event_push_actions_thread_id_null', '{}', 'event_push_backfill_thread_id');

INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7306, 'event_push_summary_thread_id_null', '{}', 'event_push_backfill_thread_id');
Comment on lines +19 to +23
Copy link
Member Author

@clokep clokep Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose not to care about event_push_actions_staging here since it is transient and should be handled OK once the rows get into event_push_actions.

(And the table shouldn't be large so I don't think it can really have non-nulls in it? Maybe I should just do the alter table for that one now?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(And the table shouldn't be large so I don't think it can really have non-nulls in it? Maybe I should just do the alter table for that one now?)

Yeah, we also have a cleanup task for it to ensure that stale entries get removed, so it really should be quick. Though at this point it might be worth just doing the UPDATE when we do the others too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though at this point it might be worth just doing the UPDATE when we do the others too

But we don't need to since we don't join to the table; we could maybe do something when we pull rows from event_push_actions_staging and insert into event_push_actions? But the data can only be "incorrect" across a restart of Synapse, I believe, which ends up orphaning the data anyway (since the event wouldn't be persisted).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the data can only be "incorrect" across a restart of Synapse, I believe, which ends up orphaning the data anyway (since the event wouldn't be persisted).

My only mild concern is what happens when we do a rolling restart or something, but I think that's fine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the data can only be "incorrect" across a restart of Synapse, I believe, which ends up orphaning the data anyway (since the event wouldn't be persisted).

My only mild concern is what happens when we do a rolling restart or something, but I think that's fine?

(Note that Synapse 1.69 fills in that column in the staging table so shouldn't be an issue)

I don't think it will be a problem regardless:

  1. event_push_actions_staging has a null thread_id value.
  2. It gets moved over to event_push_actions (there's now a null in event_push_actions).
  3. Before we use event_push_actions the column gets updated to be "main" and things work.

There might be a slight race in here with whether bg update has ran though. 😟

This file was deleted.