Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Insert a thread_id when inserting new data.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Sep 12, 2022
1 parent 59e2789 commit fc8e0fa
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 19 deletions.
29 changes: 15 additions & 14 deletions synapse/push/bulk_push_rule_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ async def _get_power_levels_and_sender_level(
return pl_event.content if pl_event else {}, sender_level

async def _get_mutual_relations(
self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
) -> Dict[str, Set[Tuple[str, str]]]:
"""
Fetch event metadata for events which related to the same event as the given event.
If the given event has no relation information, returns an empty dictionary.
Args:
event_id: The event ID which is targeted by relations.
parent_id: The event ID which is targeted by relations.
rules: The push rules which will be processed for this event.
Returns:
Expand All @@ -220,12 +220,6 @@ async def _get_mutual_relations(
if not self._relations_match_enabled:
return {}

# If the event does not have a relation, then cannot have any mutual
# relations.
relation = relation_from_event(event)
if not relation:
return {}

# Pre-filter to figure out which relation types are interesting.
rel_types = set()
for rule, enabled in rules:
Expand All @@ -246,9 +240,7 @@ async def _get_mutual_relations(
return {}

# If any valid rules were found, fetch the mutual relations.
return await self.store.get_mutual_event_relations(
relation.parent_id, rel_types
)
return await self.store.get_mutual_event_relations(parent_id, rel_types)

@measure_func("action_for_event_by_user")
async def action_for_event_by_user(
Expand Down Expand Up @@ -281,9 +273,17 @@ async def action_for_event_by_user(
sender_power_level,
) = await self._get_power_levels_and_sender_level(event, context)

relations = await self._get_mutual_relations(
event, itertools.chain(*rules_by_user.values())
)
relation = relation_from_event(event)
# If the event does not have a relation, then cannot have any mutual
# relations or thread ID.
relations = {}
thread_id = "main"
if relation:
relations = await self._get_mutual_relations(
relation.parent_id, itertools.chain(*rules_by_user.values())
)
if relation.rel_type == RelationTypes.THREAD:
thread_id = relation.parent_id

evaluator = PushRuleEvaluatorForEvent(
event,
Expand Down Expand Up @@ -352,6 +352,7 @@ async def action_for_event_by_user(
event.event_id,
actions_by_user,
count_as_unread,
thread_id,
)


Expand Down
22 changes: 19 additions & 3 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ async def add_push_actions_to_staging(
event_id: str,
user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
count_as_unread: bool,
thread_id: str,
) -> None:
"""Add the push actions for the event to the push action staging area.
Expand All @@ -767,6 +768,7 @@ async def add_push_actions_to_staging(
user_id_actions: A mapping of user_id to list of push actions, where
an action can either be a string or dict.
count_as_unread: Whether this event should increment unread counts.
thread_id: The thread this event is parent of, if applicable.
"""
if not user_id_actions:
return
Expand All @@ -775,7 +777,7 @@ async def add_push_actions_to_staging(
# can be used to insert into the `event_push_actions_staging` table.
def _gen_entry(
user_id: str, actions: Collection[Union[Mapping, str]]
) -> Tuple[str, str, str, int, int, int]:
) -> Tuple[str, str, str, int, int, int, str]:
is_highlight = 1 if _action_has_highlight(actions) else 0
notif = 1 if "notify" in actions else 0
return (
Expand All @@ -785,11 +787,20 @@ def _gen_entry(
notif, # notif column
is_highlight, # highlight column
int(count_as_unread), # unread column
thread_id, # thread_id column
)

await self.db_pool.simple_insert_many(
"event_push_actions_staging",
keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
keys=(
"event_id",
"user_id",
"actions",
"notif",
"highlight",
"unread",
"thread_id",
),
values=[
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.items()
Expand Down Expand Up @@ -1070,6 +1081,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
)

# Replace the previous summary with the new counts.
#
# TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
Expand All @@ -1079,6 +1092,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
"unread_count": unread_count,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
},
)

Expand Down Expand Up @@ -1227,17 +1241,19 @@ def _rotate_notifs_before_txn(

logger.info("Rotating notifications, handling %d rows", len(summaries))

# TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
value_values=[
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
for summary in summaries.values()
],
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2192,9 +2192,9 @@ def _set_push_actions_for_event_and_users_txn(
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
topological_ordering, notif, highlight, unread
topological_ordering, notif, highlight, unread, thread_id
)
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
FROM event_push_actions_staging
WHERE event_id = ?
"""
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ def _insert_linearized_receipt_txn(
"stream_id": stream_id,
"event_id": event_id,
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
Expand Down Expand Up @@ -841,6 +842,7 @@ def _insert_graph_receipt_txn(
values={
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
"thread_id": None,
},
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
Expand Down
1 change: 1 addition & 0 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ def build_event(
event.event_id,
{user_id: actions for user_id, actions in push_actions},
False,
"main",
)
)
return event, context

0 comments on commit fc8e0fa

Please sign in to comment.