Skip to content

Add support for sending notification counts in simplified sliding sync #18290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18290.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for sending notification counts and thread notification counts in simplified sliding sync mode.
52 changes: 45 additions & 7 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@
import itertools
import logging
from itertools import chain
from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
)

from prometheus_client import Histogram
from typing_extensions import assert_never
Expand All @@ -38,6 +48,7 @@
tag_args,
trace,
)
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.storage.databases.main.stream import PaginateFunction
Expand Down Expand Up @@ -245,11 +256,31 @@ async def current_sync_for_user(
to_token=to_token,
)

# fetch the user's receipts between the two points: these will be factor
# in deciding whether to send the room, since it may have changed their
# notification counts
receipts = await self.store.get_linearized_receipts_for_user_in_rooms(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can share the work with the receipts extension if it's enabled. Seems less than ideal to have to run get_linearized_receipts_for_user_in_rooms(...) twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle, yes, although this uses a the function get_linearized_receipts_for_rooms which returns a different type for reasons I don't really understand: JsonMapping vs ReceiptInRoom: is one just converting to a real type and the other isn't? Apart from that, I guess it should be possible to either move the receipts logic upwards to fetch all the receipts and pass them into the main sync part.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh actually, the receipts extension calls get_linearized_receipts_for_user_in_rooms too, so presumably we can just pass this in and then we'll be doing the same number of database queries as before. The receipts extension doesn't give a 'from' key, though, which seems weird.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, done.

user_id=user_id,
room_ids=interested_rooms.relevant_room_map.keys(),
from_key=from_token.stream_token.receipt_key if from_token else None,
to_key=to_token.receipt_key,
)

# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = self.room_lists.filter_relevant_rooms_to_send(
sync_config.user,
previous_connection_state,
from_token.stream_token if from_token else None,
to_token,
interested_rooms.relevant_room_map,
receipts,
)

lists = interested_rooms.lists
relevant_room_map = interested_rooms.relevant_room_map
all_rooms = interested_rooms.all_rooms
room_membership_for_user_map = interested_rooms.room_membership_for_user_map
relevant_rooms_to_send_map = interested_rooms.relevant_rooms_to_send_map

# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
Expand All @@ -272,6 +303,7 @@ async def handle_room(room_id: str) -> None:
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
room_receipts=receipts[room_id] if room_id in receipts else None,
)

# Filter out empty room results during incremental sync
Expand All @@ -296,6 +328,7 @@ async def handle_room(room_id: str) -> None:
actual_room_response_map=rooms,
from_token=from_token,
to_token=to_token,
user_receipts=receipts,
)

if has_lists or has_room_subscriptions:
Expand Down Expand Up @@ -543,6 +576,7 @@ async def get_room_sync_data(
to_token: StreamToken,
newly_joined: bool,
is_dm: bool,
room_receipts: Optional[Sequence[ReceiptInRoom]],
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
Expand All @@ -560,6 +594,8 @@ async def get_room_sync_data(
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
is_dm: Whether the room is a DM room
room_receipts: Any read receipts from the in question in that room between
from_token and to_token
"""
user = sync_config.user

Expand Down Expand Up @@ -1312,6 +1348,11 @@ async def get_room_sync_data(

set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)

unread_notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
room_id,
sync_config.user.to_string(),
)

return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
Expand All @@ -1329,11 +1370,8 @@ async def get_room_sync_data(
bump_stamp=bump_stamp,
joined_count=joined_count,
invited_count=invited_count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
Comment on lines -1332 to -1336
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikjohnston Are we even interested in including notification values?

I've also asked this question on the MSC itself: matrix-org/matrix-spec-proposals#4186 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbkr It would also be useful to understand how notification counts are calculated on Element Web today. I assume the Element X apps are accomplishing something without these counts so could Element Web adopt the same strategy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, EX doesn't do backfill as such but does obviously keep history and generally wake up to receive messages via push, so it calculates them all locally and in practice this seems to be enough. Element Web doesn't do either of these things and so currently does a slightly funky combination of trusting server counts for unencrypted rooms and fixing them up locally for encrypted rooms, except when the server sets them to 0 when we know to clear the count.

Ultimately we would probably like to backfill to calculate the notification counts entirely client side, although probably just for e2e rooms: doing this for large public rooms is arguably a lot of work unnecessarily, so I feel like the path to Doing It Right involves having these counts at least to use in unencrypted rooms (where Doing It Right in this case involves Matrix in its dual role of trying to be a both an e2e messenger and one for large public groups, but that's where we are).

Meanwhile, EW also downloads the entire history of e2e rooms on desktop in the seshat search index, but unfortunately that's it's own thing, so really we would have to replace seshat first, otherwise we'd have to bodge the two together or end up downloading e2e room history twice.

notif_counts=unread_notifs,
room_receipts=room_receipts,
)

@trace
Expand Down
41 changes: 19 additions & 22 deletions synapse/handlers/sliding_sync/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def get_extensions_response(
actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult],
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
user_receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> SlidingSyncResult.Extensions:
"""Handle extension requests.

Expand All @@ -95,6 +96,7 @@ async def get_extensions_response(
Sliding Sync response.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
user_receipts: Map of room ID to list of the syncing user's receipts in the room.
"""

if sync_config.extensions is None:
Expand Down Expand Up @@ -142,6 +144,7 @@ async def get_extensions_response(
receipts_request=sync_config.extensions.receipts,
to_token=to_token,
from_token=from_token,
user_receipts=user_receipts,
)

typing_coro = None
Expand Down Expand Up @@ -619,6 +622,7 @@ async def get_receipts_extension_response(
receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
user_receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
"""Handle Receipts extension (MSC3960)

Expand All @@ -635,6 +639,7 @@ async def get_receipts_extension_response(
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
user_receipts: Map of room ID to list of the syncing user's receipts in the room.
"""
# Skip if the extension is not enabled
if not receipts_request.enabled:
Expand Down Expand Up @@ -726,15 +731,6 @@ async def handle_previously_room(room_id: str) -> None:
)

if initial_rooms:
# We also always send down receipts for the current user.
user_receipts = (
await self.store.get_linearized_receipts_for_user_in_rooms(
user_id=sync_config.user.to_string(),
room_ids=initial_rooms,
to_key=to_token.receipt_key,
)
)

# For rooms we haven't previously sent down, we could send all receipts
# from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response
Expand All @@ -752,22 +748,23 @@ async def handle_previously_room(room_id: str) -> None:
# Combine the receipts for a room and add them to
# `fetched_receipts`
for room_id in initial_receipts.keys() | user_receipts.keys():
receipt_content = ReceiptInRoom.merge_to_content(
list(
itertools.chain(
initial_receipts.get(room_id, []),
user_receipts.get(room_id, []),
if room_id in initial_rooms:
receipt_content = ReceiptInRoom.merge_to_content(
list(
itertools.chain(
initial_receipts.get(room_id, []),
user_receipts.get(room_id, []),
)
)
)
)

fetched_receipts.append(
{
"room_id": room_id,
"type": EduTypes.RECEIPT,
"content": receipt_content,
}
)
fetched_receipts.append(
{
"room_id": room_id,
"type": EduTypes.RECEIPT,
"content": receipt_content,
}
)

fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string()
Expand Down
38 changes: 14 additions & 24 deletions synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
Expand All @@ -44,6 +45,7 @@
from synapse.events import StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import start_active_span, trace
from synapse.storage.databases.main.receipts import ReceiptInRoom
from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
Expand Down Expand Up @@ -102,10 +104,6 @@ class SlidingSyncInterestedRooms:
lists: A mapping from list name to the list result for the response
relevant_room_map: A map from rooms that match the sync request to
their room sync config.
relevant_rooms_to_send_map: Subset of `relevant_room_map` that
includes the rooms that *may* have relevant updates. Rooms not
in this map will definitely not have room updates (though
extensions may have updates in these rooms).
newly_joined_rooms: The set of rooms that were joined in the token range
and the user is still joined to at the end of this range.
newly_left_rooms: The set of rooms that we left in the token range
Expand All @@ -115,7 +113,6 @@ class SlidingSyncInterestedRooms:

lists: Mapping[str, SlidingSyncResult.SlidingWindowList]
relevant_room_map: Mapping[str, RoomSyncConfig]
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig]
all_rooms: Set[str]
room_membership_for_user_map: Mapping[str, RoomsForUserType]

Expand All @@ -128,7 +125,6 @@ def empty() -> "SlidingSyncInterestedRooms":
return SlidingSyncInterestedRooms(
lists={},
relevant_room_map={},
relevant_rooms_to_send_map={},
all_rooms=set(),
room_membership_for_user_map={},
newly_joined_rooms=set(),
Expand Down Expand Up @@ -547,16 +543,9 @@ async def _compute_interested_rooms_new_tables(

relevant_room_map[room_id] = room_sync_config

# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)

return SlidingSyncInterestedRooms(
lists=lists,
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
Expand Down Expand Up @@ -735,35 +724,31 @@ async def _compute_interested_rooms_fallback(

relevant_room_map[room_id] = room_sync_config

# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)

return SlidingSyncInterestedRooms(
lists=lists,
relevant_room_map=relevant_room_map,
relevant_rooms_to_send_map=relevant_rooms_to_send_map,
all_rooms=all_rooms,
room_membership_for_user_map=room_membership_for_user_map,
newly_joined_rooms=newly_joined_room_ids,
newly_left_rooms=newly_left_room_ids,
dm_room_ids=dm_room_ids,
)

async def _filter_relevant_rooms_to_send(
def filter_relevant_rooms_to_send(
self,
user_id: UserID,
previous_connection_state: PerConnectionState,
from_token: Optional[StreamToken],
relevant_room_map: Dict[str, RoomSyncConfig],
) -> Dict[str, RoomSyncConfig]:
to_token: StreamToken,
relevant_room_map: Mapping[str, RoomSyncConfig],
receipts: Mapping[str, Sequence[ReceiptInRoom]],
) -> Mapping[str, RoomSyncConfig]:
"""Filters the `relevant_room_map` down to those rooms that may have
updates we need to fetch and return."""

# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map
relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig] = relevant_room_map
if relevant_room_map:
with start_active_span("filter_relevant_rooms_to_send"):
if from_token:
Expand Down Expand Up @@ -814,6 +799,11 @@ async def _filter_relevant_rooms_to_send(
)
)
rooms_should_send.update(rooms_that_have_updates)

# Any rooms with receipts should be considered for sending as their
# notification counts may have changed.
rooms_should_send.update(receipts.keys())

relevant_rooms_to_send_map = {
room_id: room_sync_config
for room_id, room_sync_config in relevant_room_map.items()
Expand Down
13 changes: 11 additions & 2 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,19 @@ async def encode_rooms(
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
"notification_count": room_result.notif_counts.main_timeline.notify_count,
"highlight_count": room_result.notif_counts.main_timeline.highlight_count,
}

if len(room_result.notif_counts.threads) > 0:
serialized_rooms[room_id]["unread_thread_notifications"] = {
thread_id: {
"notification_count": counts.notify_count,
"highlight_count": counts.highlight_count,
}
for thread_id, counts in room_result.notif_counts.threads.items()
}

if room_result.bump_stamp is not None:
serialized_rooms[room_id]["bump_stamp"] = room_result.bump_stamp

Expand Down
7 changes: 7 additions & 0 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,20 @@ def _get_unread_counts_by_receipt_txn(
# If the user has no receipts in the room, retrieve the stream ordering for
# the latest membership event from this user in this room (which we assume is
# a join).
# Sometimes (usually state resets) there can be no membership event either,
# so we allow None and return no notifications which is probably about
# the best we can do short of failing outright.
event_id = self.db_pool.simple_select_one_onecol_txn(
txn=txn,
table="local_current_membership",
keyvalues={"room_id": room_id, "user_id": user_id},
retcol="event_id",
allow_none=True,
)

if event_id is None:
return _EMPTY_ROOM_NOTIF_COUNTS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit I'm particularly unsure about but was needed to make https://github.com/element-hq/synapse/blob/develop/tests/rest/client/sliding_sync/test_sliding_sync.py#L1111 test pass again since this function just threw in that case. This is basically lying. We could fail the sync, but the test explicitly tests that it works. We could assume the stream ordering is the lowest possible which might be more accurate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in a lot of cases, a normal user would probably have a read receipt in the room and we would return that like normal with the logic above.

But in our tests, we never send any read receipts so we hit this logic path. I think it makes sense to return empty notification counts in this case as they've literally never read anything in the room before and were removed before they even tried.

stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)

return self._get_unread_counts_by_pos_txn(
Expand Down
Loading
Loading