Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Update event push action and receipt tables to support threads. #13753

Merged
merged 9 commits into from
Sep 14, 2022
1 change: 1 addition & 0 deletions changelog.d/13753.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prepatory work for storing thread IDs for notifications and receipts.
29 changes: 15 additions & 14 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ async def _get_power_levels_and_sender_level(
return pl_event.content if pl_event else {}, sender_level

async def _get_mutual_relations(
self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
) -> Dict[str, Set[Tuple[str, str]]]:
"""
Fetch event metadata for events which related to the same event as the given event.

If the given event has no relation information, returns an empty dictionary.

Args:
event_id: The event ID which is targeted by relations.
parent_id: The event ID which is targeted by relations.
rules: The push rules which will be processed for this event.

Returns:
Expand All @@ -220,12 +220,6 @@ async def _get_mutual_relations(
if not self._relations_match_enabled:
return {}

# If the event does not have a relation, then cannot have any mutual
# relations.
relation = relation_from_event(event)
if not relation:
return {}

# Pre-filter to figure out which relation types are interesting.
rel_types = set()
for rule, enabled in rules:
Expand All @@ -246,9 +240,7 @@ async def _get_mutual_relations(
return {}

# If any valid rules were found, fetch the mutual relations.
return await self.store.get_mutual_event_relations(
relation.parent_id, rel_types
)
return await self.store.get_mutual_event_relations(parent_id, rel_types)

@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
Expand Down Expand Up @@ -281,9 +273,17 @@ async def action_for_event_by_user(
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)

relations = await self._get_mutual_relations(
event, itertools.chain(*rules_by_user.values())
)
relation = relation_from_event(event)
# If the event does not have a relation, then cannot have any mutual
# relations or thread ID.
relations = {}
thread_id = "main"
if relation:
relations = await self._get_mutual_relations(
relation.parent_id, itertools.chain(*rules_by_user.values())
)
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id

evaluator = PushRuleEvaluatorForEvent(
event,
Expand Down Expand Up @@ -352,6 +352,7 @@ async def action_for_event_by_user(
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)


Expand Down
111 changes: 108 additions & 3 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -232,6 +233,94 @@ def __init__(
replaces_index="event_push_summary_user_rm",
)

self.db_pool.updates.register_background_index_update(
"event_push_summary_unique_index2",
index_name="event_push_summary_unique_index2",
table="event_push_summary",
columns=["user_id", "room_id", "thread_id"],
unique=True,
)

self.db_pool.updates.register_background_update_handler(
"event_push_backfill_thread_id",
self._background_backfill_thread_id,
)

async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
event_push_actions_done = progress.get("event_push_actions_done", False)
clokep marked this conversation as resolved.
Show resolved Hide resolved

def add_thread_id_txn(
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
) -> int:
sql = f"""
SELECT stream_ordering
FROM {table_name}
WHERE
thread_id IS NULL
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT ?
"""
txn.execute(sql, (start_stream_ordering, batch_size))

# No more rows to process.
rows = txn.fetchall()
if not rows:
progress[f"{table_name}_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
return 0

# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]

sql = f"""
UPDATE {table_name}
SET thread_id = 'main'
WHERE stream_ordering <= ? AND thread_id IS NULL
"""
txn.execute(sql, (max_stream_ordering,))

# Update progress.
processed_rows = txn.rowcount
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)

return processed_rows

# First update the event_push_actions table, then the event_push_summary table.
#
# Note that the event_push_actions_staging table is ignored since it is
# assumed that items in that table will only exist for a short period of
# time.
if not event_push_actions_done:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_summary",
progress.get("max_event_push_summary_stream_ordering", 0),
)

# Only done after the event_push_summary table is done.
if not result:
await self.db_pool.updates._end_background_update(
"event_push_backfill_thread_id"
)

return result

@cached(tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self,
Expand Down Expand Up @@ -670,6 +759,7 @@ async def add_push_actions_to_staging(
event_id: str,
user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
count_as_unread: bool,
thread_id: str,
) -> None:
"""Add the push actions for the event to the push action staging area.

Expand All @@ -678,6 +768,7 @@ async def add_push_actions_to_staging(
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
count_as_unread: Whether this event should increment unread counts.
thread_id: The thread this event is parent of, if applicable.
"""
if not user_id_actions:
return
Expand All @@ -686,7 +777,7 @@ async def add_push_actions_to_staging(
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(
user_id: str, actions: Collection[Union[Mapping, str]]
) -> Tuple[str, str, str, int, int, int]:
) -> Tuple[str, str, str, int, int, int, str]:
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 1 if "notify" in actions else 0
return (
Expand All @@ -696,11 +787,20 @@ def _gen_entry(
notif, # notif column
is_highlight, # highlight column
int(count_as_unread), # unread column
thread_id, # thread_id column
)

await self.db_pool.simple_insert_many(
"event_push_actions_staging",
keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
keys=(
"event_id",
"user_id",
"actions",
"notif",
"highlight",
"unread",
"thread_id",
),
values=[
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.items()
Expand Down Expand Up @@ -981,6 +1081,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
)

# Replace the previous summary with the new counts.
#
# TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
Expand All @@ -990,6 +1092,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
},
)

Expand Down Expand Up @@ -1138,17 +1241,19 @@ def _rotate_notifs_before_txn(

logger.info("Rotating notifications, handling %d rows", len(summaries))

# TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
value_values=[
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
for summary in summaries.values()
],
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2192,9 +2192,9 @@ def _set_push_actions_for_event_and_users_txn(
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight, unread
topological_ordering, notif, highlight, unread, thread_id
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should be providing the thread_id like we do the room_id? (I.e. not storing it in event_push_actions_staging and only storing it in event_push_actions)

FROM event_push_actions_staging
WHERE event_id = ?
"""
Expand Down
20 changes: 20 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ def __init__(
prefilled_cache=receipts_stream_prefill,
)

self.db_pool.updates.register_background_index_update(
"receipts_linearized_unique_index",
index_name="receipts_linearized_unique_index",
table="receipts_linearized",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

self.db_pool.updates.register_background_index_update(
"receipts_graph_unique_index",
index_name="receipts_graph_unique_index",
table="receipts_graph",
columns=["room_id", "receipt_type", "user_id"],
where_clause="thread_id IS NULL",
unique=True,
)

def get_max_receipt_stream_id(self) -> int:
"""Get the current max stream ID for receipts stream"""
return self._receipts_id_gen.get_current_token()
Expand Down Expand Up @@ -676,6 +694,7 @@ def _insert_linearized_receipt_txn(
"stream_id": stream_id,
"event_id": event_id,
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
Expand Down Expand Up @@ -823,6 +842,7 @@ def _insert_graph_receipt_txn(
values={
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
Expand Down
30 changes: 30 additions & 0 deletions synapse/storage/schema/main/delta/72/06thread_notifications.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a nullable column for thread ID to the event push actions tables; this
-- will be filled in with a default value for any previously existing rows.
--
-- After migration this can be made non-nullable.

ALTER TABLE event_push_actions_staging ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_actions ADD COLUMN thread_id TEXT;
ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT;

-- Update the unique index for `event_push_summary`.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7006, 'event_push_summary_unique_index2', '{}');

INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2');
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright 2022 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Add a nullable column for thread ID to the receipts table; this allows a
-- receipt per user, per room, as well as an unthreaded receipt (corresponding
-- to a null thread ID).
clokep marked this conversation as resolved.
Show resolved Hide resolved

ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT;
ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT;

-- Rebuild the unique constraint with the thread_id.
ALTER TABLE receipts_linearized
ADD CONSTRAINT receipts_linearized_uniqueness_thread
UNIQUE (room_id, receipt_type, user_id, thread_id);

ALTER TABLE receipts_graph
ADD CONSTRAINT receipts_graph_uniqueness_thread
UNIQUE (room_id, receipt_type, user_id, thread_id);
Loading