Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Speed up getting receipts for initial rooms #17592

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 8 additions & 29 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3073,38 +3073,17 @@ async def get_receipts_extension_response(
# from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response
# as the number of users in the room increases. (this behavior is part of the spec)
initial_rooms = {
room_id
initial_rooms_and_event_ids = [
(room_id, event.event_id)
for room_id in initial_rooms
if room_id in actual_room_response_map
}
if initial_rooms:
initial_receipts = await self.store.get_linearized_receipts_for_rooms(
room_ids=initial_rooms,
to_key=to_token.receipt_key,
for event in actual_room_response_map[room_id].timeline_events
]
if initial_rooms_and_event_ids:
initial_receipts = await self.store.get_linearized_receipts_for_events(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why this is an optimisation? Is it because not all rooms will have events in the timeline?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for rooms we've not sent down previously, i.e. the equivalent of an "initial sync" for the room. This optimisation basically just sends down receipts for events we're including, rather than all receipts that have ever been sent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code comment above is trying to explain that

room_and_event_ids=initial_rooms_and_event_ids,
)

for receipt in initial_receipts:
relevant_event_ids = {
event.event_id
for event in actual_room_response_map[
receipt["room_id"]
].timeline_events
}

content = {
event_id: content_value
for event_id, content_value in receipt["content"].items()
if event_id in relevant_event_ids
}
if content:
fetched_receipts.append(
{
"type": receipt["type"],
"room_id": receipt["room_id"],
"content": content,
}
)
fetched_receipts.extend(initial_receipts)

fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string()
Expand Down
78 changes: 78 additions & 0 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_tuple_in_list_sql_clause,
)
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import MultiWriterIdGenerator
Expand Down Expand Up @@ -481,6 +482,83 @@ def f(
}
return results

async def get_linearized_receipts_for_events(
self,
room_and_event_ids: Collection[Tuple[str, str]],
) -> Sequence[JsonMapping]:
"""Get all receipts for the given set of events.

Arguments:
room_and_event_ids: A collection of 2-tuples of room ID and
event IDs to fetch receipts for

Returns:
A list of receipts, one per room.
"""

def get_linearized_receipts_for_events_txn(
txn: LoggingTransaction,
room_id_event_id_tuples: Collection[Tuple[str, str]],
) -> List[Tuple[str, str, str, str, Optional[str], str]]:
clause, args = make_tuple_in_list_sql_clause(
self.database_engine, ("room_id", "event_id"), room_id_event_id_tuples
)

sql = f"""
SELECT room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE {clause}
"""

txn.execute(sql, args)

return txn.fetchall()

# room_id -> event_id -> receipt_type -> user_id -> receipt data
room_to_content: Dict[str, Dict[str, Dict[str, Dict[str, JsonMapping]]]] = {}
for batch in batch_iter(room_and_event_ids, 1000):
batch_results = await self.db_pool.runInteraction(
"get_linearized_receipts_for_events",
get_linearized_receipts_for_events_txn,
batch,
)

for (
room_id,
receipt_type,
user_id,
event_id,
thread_id,
data,
) in batch_results:
content = room_to_content.setdefault(room_id, {})
user_receipts = content.setdefault(event_id, {}).setdefault(
receipt_type, {}
)

receipt_data = db_to_json(data)
if thread_id is not None:
receipt_data["thread_id"] = thread_id

# MSC4102: always replace threaded receipts with unthreaded ones
# if there is a clash. Specifically:
# - if there is no existing receipt, great, set the data.
# - if there is an existing receipt, is it threaded (thread_id
# present)? YES: replace if this receipt has no thread id.
# NO: do not replace. This means we will drop some receipts, but
# MSC4102 is designed to drop semantically meaningless receipts,
# so this is okay. Previously, we would drop meaningful data!
if user_id in user_receipts:
if "thread_id" in user_receipts[user_id] and not thread_id:
user_receipts[user_id] = receipt_data
else:
user_receipts[user_id] = receipt_data

return [
{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}
for room_id, content in room_to_content.items()
]

@cached(
num_args=2,
)
Expand Down