Skip to content

Sliding Sync: Speed up getting receipts for initial rooms #17592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17592.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correctly track read receipts that should be sent down in experimental sliding sync.
37 changes: 8 additions & 29 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3073,38 +3073,17 @@ async def get_receipts_extension_response(
# from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response
# as the number of users in the room increases. (this behavior is part of the spec)
initial_rooms = {
room_id
initial_rooms_and_event_ids = [
(room_id, event.event_id)
for room_id in initial_rooms
if room_id in actual_room_response_map
}
if initial_rooms:
initial_receipts = await self.store.get_linearized_receipts_for_rooms(
room_ids=initial_rooms,
to_key=to_token.receipt_key,
for event in actual_room_response_map[room_id].timeline_events
]
if initial_rooms_and_event_ids:
initial_receipts = await self.store.get_linearized_receipts_for_events(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why this is an optimisation? Is it because not all rooms will have events in the timeline?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for rooms we've not sent down previously, i.e. the equivalent of an "initial sync" for the room. This optimisation basically just sends down receipts for events we're including, rather than all receipts that have ever been sent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code comment above is trying to explain that

room_and_event_ids=initial_rooms_and_event_ids,
)

for receipt in initial_receipts:
relevant_event_ids = {
event.event_id
for event in actual_room_response_map[
receipt["room_id"]
].timeline_events
}

content = {
event_id: content_value
for event_id, content_value in receipt["content"].items()
if event_id in relevant_event_ids
}
if content:
fetched_receipts.append(
{
"type": receipt["type"],
"room_id": receipt["room_id"],
"content": content,
}
)
fetched_receipts.extend(initial_receipts)

fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string()
Expand Down
84 changes: 84 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_tuple_in_list_sql_clause,
)
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import MultiWriterIdGenerator
Expand Down Expand Up @@ -481,6 +482,83 @@ def f(
}
return results

async def get_linearized_receipts_for_events(
self,
room_and_event_ids: Collection[Tuple[str, str]],
) -> Sequence[JsonMapping]:
"""Get all receipts for the given set of events.

Arguments:
room_and_event_ids: A collection of 2-tuples of room ID and
event IDs to fetch receipts for

Returns:
A list of receipts, one per room.
"""

def get_linearized_receipts_for_events_txn(
txn: LoggingTransaction,
room_id_event_id_tuples: Collection[Tuple[str, str]],
) -> List[Tuple[str, str, str, str, Optional[str], str]]:
clause, args = make_tuple_in_list_sql_clause(
self.database_engine, ("room_id", "event_id"), room_id_event_id_tuples
)

sql = f"""
SELECT room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE {clause}
"""

txn.execute(sql, args)

return txn.fetchall()

# room_id -> event_id -> receipt_type -> user_id -> receipt data
room_to_content: Dict[str, Dict[str, Dict[str, Dict[str, JsonMapping]]]] = {}
for batch in batch_iter(room_and_event_ids, 1000):
batch_results = await self.db_pool.runInteraction(
"get_linearized_receipts_for_events",
get_linearized_receipts_for_events_txn,
batch,
)

for (
room_id,
receipt_type,
user_id,
event_id,
thread_id,
data,
) in batch_results:
content = room_to_content.setdefault(room_id, {})
user_receipts = content.setdefault(event_id, {}).setdefault(
receipt_type, {}
)

receipt_data = db_to_json(data)
if thread_id is not None:
receipt_data["thread_id"] = thread_id

# MSC4102: always replace threaded receipts with unthreaded ones
# if there is a clash. Specifically:
# - if there is no existing receipt, great, set the data.
# - if there is an existing receipt, is it threaded (thread_id
# present)? YES: replace if this receipt has no thread id.
# NO: do not replace. This means we will drop some receipts, but
# MSC4102 is designed to drop semantically meaningless receipts,
# so this is okay. Previously, we would drop meaningful data!
if user_id in user_receipts:
if "thread_id" in user_receipts[user_id] and not thread_id:
user_receipts[user_id] = receipt_data
else:
user_receipts[user_id] = receipt_data

return [
{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}
for room_id, content in room_to_content.items()
]

@cached(
num_args=2,
)
Expand Down Expand Up @@ -996,6 +1074,12 @@ def __init__(
self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
self._background_receipts_graph_unique_index,
)
self.db_pool.updates.register_background_index_update(
update_name="receipts_room_id_event_id_index",
index_name="receipts_linearized_event_id",
table="receipts_linearized",
columns=("room_id", "event_id"),
)

async def _populate_receipt_event_stream_ordering(
self, progress: JsonDict, batch_size: int
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8602, 'receipts_room_id_event_id_index', '{}');
Loading