Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use threaded receipts when fetching events for push #13878

Merged
merged 38 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9cb167c
Update filtering to include the thread notifications flag.
clokep Sep 8, 2022
8ac2f32
Ensure that the thread_id column is non-null and then require it to b…
clokep Sep 9, 2022
111fe57
Add infrastructure to pass notifications per thread.
clokep Sep 8, 2022
62aa85b
Calculate thread specific notification counts.
clokep Sep 8, 2022
cb679e2
Clarify comment.
clokep Sep 16, 2022
ba00c5f
Simplify handling of summaries with neither notifications or unread c…
clokep Sep 16, 2022
eb56567
Delete old push summaries.
clokep Sep 16, 2022
e6f97ec
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 16, 2022
4f4711a
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 19, 2022
8b63c5b
Fix postgres compatibility.
clokep Sep 19, 2022
c3783df
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 20, 2022
c4f2d50
Create a constant for "main".
clokep Sep 20, 2022
6927e59
Reduce duplicated code.
clokep Sep 20, 2022
1d05975
Lint
clokep Sep 20, 2022
28b5a1f
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 22, 2022
55d15a3
Threads must already be summarized between the stream orderings that …
clokep Sep 22, 2022
56c21e4
Don't delete empty push summaries.
clokep Sep 22, 2022
241b40c
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 23, 2022
a04258f
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 26, 2022
ddbb644
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 27, 2022
f20620f
Update constraints and indexes now that thread ID is used.
clokep Sep 12, 2022
52b0a3d
Mark threads as read separately.
clokep Sep 22, 2022
fb50244
Use MAIN_TIMELINE constant in more places.
clokep Sep 28, 2022
79452e9
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 28, 2022
b0d9008
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 28, 2022
d6d7788
Combine logic for processing receipts.
clokep Sep 28, 2022
162bd8d
Expand comment and rename variables for clarity.
clokep Sep 28, 2022
e7b5421
Clarify comment.
clokep Sep 28, 2022
5f5e9ad
Improve docstrings.
clokep Sep 28, 2022
16a60b9
Rename function.
clokep Sep 28, 2022
f6a99c8
Lint
clokep Sep 28, 2022
f279a15
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Sep 29, 2022
ef37de2
Merge remote-tracking branch 'origin/clokep/threads-notif-2' into clo…
clokep Sep 29, 2022
6b2384d
Fix typo.
clokep Sep 29, 2022
041fe7f
Only attempt to find threaded receipts newer than the latest unthread…
clokep Sep 29, 2022
4f687b2
Handle threads when fetching events for push.
clokep Sep 22, 2022
f59421c
Merge remote-tracking branch 'origin/develop' into clokep/threads-not…
clokep Oct 4, 2022
0d675df
Remove files from bad merge.
clokep Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13878.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
80 changes: 57 additions & 23 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,32 @@
]


@attr.s(slots=True, auto_attribs=True)
class _RoomReceipt:
"""
HttpPushAction instances include the information used to generate HTTP
requests to a push gateway.
"""

unthreaded_stream_ordering: int = 0
# threaded_stream_ordering includes the main pseudo-thread.
threaded_stream_ordering: Dict[str, int] = attr.Factory(dict)

def is_unread(self, thread_id: str, stream_ordering: int) -> bool:
"""Returns True if the stream ordering is unread according to the receipt information."""

# Only include push actions with a stream ordering after both the unthreaded
# and threaded receipt. Properly handles a user without any receipts present.
return (
self.unthreaded_stream_ordering < stream_ordering
and self.threaded_stream_ordering.get(thread_id, 0) < stream_ordering
)


# A _RoomReceipt with no receipts in it.
MISSING_ROOM_RECEIPT = _RoomReceipt()


@attr.s(slots=True, frozen=True, auto_attribs=True)
class HttpPushAction:
"""
Expand Down Expand Up @@ -716,7 +742,7 @@ def f(txn: LoggingTransaction) -> List[str]:

def _get_receipts_by_room_txn(
self, txn: LoggingTransaction, user_id: str
) -> Dict[str, int]:
) -> Dict[str, _RoomReceipt]:
"""
Generate a map of room ID to the latest stream ordering that has been
read by the given user.
Expand All @@ -726,7 +752,8 @@ def _get_receipts_by_room_txn(
user_id: The user to fetch receipts for.

Returns:
A map of room ID to stream ordering for all rooms the user has a receipt in.
A map including all rooms the user is in with a receipt. It maps
room IDs to _RoomReceipt instances
"""
receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
Expand All @@ -735,20 +762,26 @@ def _get_receipts_by_room_txn(
)

sql = f"""
SELECT room_id, MAX(stream_ordering)
SELECT room_id, thread_id, MAX(stream_ordering)
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause}
AND user_id = ?
GROUP BY room_id
GROUP BY room_id, thread_id
"""

args.extend((user_id,))
txn.execute(sql, args)
return {
room_id: latest_stream_ordering
for room_id, latest_stream_ordering in txn.fetchall()
}

result: Dict[str, _RoomReceipt] = {}
for room_id, thread_id, stream_ordering in txn:
room_receipt = result.setdefault(room_id, _RoomReceipt())
if thread_id is None:
room_receipt.unthreaded_stream_ordering = stream_ordering
else:
room_receipt.threaded_stream_ordering[thread_id] = stream_ordering

return result

async def get_unread_push_actions_for_user_in_range_for_http(
self,
Expand Down Expand Up @@ -781,9 +814,10 @@ async def get_unread_push_actions_for_user_in_range_for_http(

def get_push_actions_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]:
) -> List[Tuple[str, str, str, int, str, bool]]:
sql = """
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
ep.actions, ep.highlight
FROM event_push_actions AS ep
WHERE
ep.user_id = ?
Expand All @@ -793,7 +827,7 @@ def get_push_actions_txn(
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
return cast(List[Tuple[str, str, str, int, str, bool]], txn.fetchall())

push_actions = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_http", get_push_actions_txn
Expand All @@ -806,10 +840,10 @@ def get_push_actions_txn(
stream_ordering=stream_ordering,
actions=_deserialize_action(actions, highlight),
)
for event_id, room_id, stream_ordering, actions, highlight in push_actions
# Only include push actions with a stream ordering after any receipt, or without any
# receipt present (invited to but never read rooms).
if stream_ordering > receipts_by_room.get(room_id, 0)
for event_id, room_id, thread_id, stream_ordering, actions, highlight in push_actions
if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread(
thread_id, stream_ordering
)
]

# Now sort it so it's ordered correctly, since currently it will
Expand Down Expand Up @@ -853,10 +887,10 @@ async def get_unread_push_actions_for_user_in_range_for_email(

def get_push_actions_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
) -> List[Tuple[str, str, str, int, str, bool, int]]:
sql = """
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
ep.actions, ep.highlight, e.received_ts
FROM event_push_actions AS ep
INNER JOIN events AS e USING (room_id, event_id)
WHERE
Expand All @@ -867,7 +901,7 @@ def get_push_actions_txn(
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
return cast(List[Tuple[str, str, str, int, str, bool, int]], txn.fetchall())

push_actions = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
Expand All @@ -882,10 +916,10 @@ def get_push_actions_txn(
actions=_deserialize_action(actions, highlight),
received_ts=received_ts,
)
for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions
# Only include push actions with a stream ordering after any receipt, or without any
# receipt present (invited to but never read rooms).
if stream_ordering > receipts_by_room.get(room_id, 0)
for event_id, room_id, thread_id, stream_ordering, actions, highlight, received_ts in push_actions
if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread(
thread_id, stream_ordering
)
]

# Now sort it so it's ordered correctly, since currently it will
Expand Down
57 changes: 39 additions & 18 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from twisted.test.proto_helpers import MemoryReactor

from synapse.api.constants import MAIN_TIMELINE
from synapse.api.constants import MAIN_TIMELINE, RelationTypes
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
Expand Down Expand Up @@ -66,16 +66,23 @@ def test_get_unread_push_actions_for_user_in_range(self) -> None:
user_id, token, _, other_token, room_id = self._create_users_and_room()

# Create two events, one of which is a highlight.
self.helper.send_event(
first_event_id = self.helper.send_event(
room_id,
type="m.room.message",
content={"msgtype": "m.text", "body": "msg"},
tok=other_token,
)
event_id = self.helper.send_event(
)["event_id"]
second_event_id = self.helper.send_event(
room_id,
type="m.room.message",
content={"msgtype": "m.text", "body": user_id},
content={
"msgtype": "m.text",
"body": user_id,
"m.relates_to": {
"rel_type": RelationTypes.THREAD,
"event_id": first_event_id,
},
},
tok=other_token,
)["event_id"]

Expand All @@ -95,13 +102,13 @@ def test_get_unread_push_actions_for_user_in_range(self) -> None:
)
self.assertEqual(2, len(email_actions))

# Send a receipt, which should clear any actions.
# Send a receipt, which should clear the first action.
self.get_success(
self.store.insert_receipt(
room_id,
"m.read",
user_id=user_id,
event_ids=[event_id],
event_ids=[first_event_id],
thread_id=None,
data={},
)
Expand All @@ -111,6 +118,30 @@ def test_get_unread_push_actions_for_user_in_range(self) -> None:
user_id, 0, 1000, 20
)
)
self.assertEqual(1, len(http_actions))
email_actions = self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_email(
user_id, 0, 1000, 20
)
)
self.assertEqual(1, len(email_actions))

# Send a thread receipt to clear the thread action.
self.get_success(
self.store.insert_receipt(
room_id,
"m.read",
user_id=user_id,
event_ids=[second_event_id],
thread_id=first_event_id,
data={},
)
)
http_actions = self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_http(
user_id, 0, 1000, 20
)
)
self.assertEqual([], http_actions)
email_actions = self.get_success(
self.store.get_unread_push_actions_for_user_in_range_for_email(
Expand Down Expand Up @@ -417,17 +448,7 @@ def test_count_aggregation_mixed(self) -> None:
sends both unthreaded and threaded receipts.
"""

# Create a user to receive notifications and send receipts.
user_id = self.register_user("user1235", "pass")
token = self.login("user1235", "pass")

# And another users to send events.
other_id = self.register_user("other", "pass")
other_token = self.login("other", "pass")

# Create a room and put both users in it.
room_id = self.helper.create_room_as(user_id, tok=token)
self.helper.join(room_id, other_id, tok=other_token)
user_id, token, _, other_token, room_id = self._create_users_and_room()
thread_id: str

last_event_id: str
Expand Down