Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit c801245

Browse files
clokepH-Shay
authored andcommitted
Batch fetch bundled annotations (#14491)
Avoid an n+1 query problem and fetch the bundled aggregations for m.annotation relations in a single query instead of a query per event. This applies similar logic for as was previously done for edits in 8b309ad (#11660) and threads in b65acea (#11752).
1 parent ce8877b commit c801245

File tree

5 files changed

+202
-141
lines changed

5 files changed

+202
-141
lines changed

changelog.d/14491.feature

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.4/client-server-api/#aggregations) which return bundled aggregations.

synapse/handlers/relations.py

+113-84
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,16 @@
1313
# limitations under the License.
1414
import enum
1515
import logging
16-
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple
16+
from typing import (
17+
TYPE_CHECKING,
18+
Collection,
19+
Dict,
20+
FrozenSet,
21+
Iterable,
22+
List,
23+
Optional,
24+
Tuple,
25+
)
1726

1827
import attr
1928

@@ -259,48 +268,64 @@ async def redact_events_related_to(
259268
e.msg,
260269
)
261270

262-
async def get_annotations_for_event(
263-
self,
264-
event_id: str,
265-
room_id: str,
266-
limit: int = 5,
267-
ignored_users: FrozenSet[str] = frozenset(),
268-
) -> List[JsonDict]:
269-
"""Get a list of annotations on the event, grouped by event type and
271+
async def get_annotations_for_events(
272+
self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
273+
) -> Dict[str, List[JsonDict]]:
274+
"""Get a list of annotations to the given events, grouped by event type and
270275
aggregation key, sorted by count.
271276
272-
This is used e.g. to get the what and how many reactions have happend
277+
This is used e.g. to get the what and how many reactions have happened
273278
on an event.
274279
275280
Args:
276-
event_id: Fetch events that relate to this event ID.
277-
room_id: The room the event belongs to.
278-
limit: Only fetch the `limit` groups.
281+
event_ids: Fetch events that relate to these event IDs.
279282
ignored_users: The users ignored by the requesting user.
280283
281284
Returns:
282-
List of groups of annotations that match. Each row is a dict with
283-
`type`, `key` and `count` fields.
285+
A map of event IDs to a list of groups of annotations that match.
286+
Each entry is a dict with `type`, `key` and `count` fields.
284287
"""
285288
# Get the base results for all users.
286-
full_results = await self._main_store.get_aggregation_groups_for_event(
287-
event_id, room_id, limit
289+
full_results = await self._main_store.get_aggregation_groups_for_events(
290+
event_ids
288291
)
289292

293+
# Avoid additional logic if there are no ignored users.
294+
if not ignored_users:
295+
return {
296+
event_id: results
297+
for event_id, results in full_results.items()
298+
if results
299+
}
300+
290301
# Then subtract off the results for any ignored users.
291302
ignored_results = await self._main_store.get_aggregation_groups_for_users(
292-
event_id, room_id, limit, ignored_users
303+
[event_id for event_id, results in full_results.items() if results],
304+
ignored_users,
293305
)
294306

295-
filtered_results = []
296-
for result in full_results:
297-
key = (result["type"], result["key"])
298-
if key in ignored_results:
299-
result = result.copy()
300-
result["count"] -= ignored_results[key]
301-
if result["count"] <= 0:
302-
continue
303-
filtered_results.append(result)
307+
filtered_results = {}
308+
for event_id, results in full_results.items():
309+
# If no annotations, skip.
310+
if not results:
311+
continue
312+
313+
# If there are not ignored results for this event, copy verbatim.
314+
if event_id not in ignored_results:
315+
filtered_results[event_id] = results
316+
continue
317+
318+
# Otherwise, subtract out the ignored results.
319+
event_ignored_results = ignored_results[event_id]
320+
for result in results:
321+
key = (result["type"], result["key"])
322+
if key in event_ignored_results:
323+
# Ensure to not modify the cache.
324+
result = result.copy()
325+
result["count"] -= event_ignored_results[key]
326+
if result["count"] <= 0:
327+
continue
328+
filtered_results.setdefault(event_id, []).append(result)
304329

305330
return filtered_results
306331

@@ -366,59 +391,62 @@ async def _get_threads_for_events(
366391
results = {}
367392

368393
for event_id, summary in summaries.items():
369-
if summary:
370-
thread_count, latest_thread_event = summary
371-
372-
# Subtract off the count of any ignored users.
373-
for ignored_user in ignored_users:
374-
thread_count -= ignored_results.get((event_id, ignored_user), 0)
375-
376-
# This is gnarly, but if the latest event is from an ignored user,
377-
# attempt to find one that isn't from an ignored user.
378-
if latest_thread_event.sender in ignored_users:
379-
room_id = latest_thread_event.room_id
380-
381-
# If the root event is not found, something went wrong, do
382-
# not include a summary of the thread.
383-
event = await self._event_handler.get_event(user, room_id, event_id)
384-
if event is None:
385-
continue
394+
# If no thread, skip.
395+
if not summary:
396+
continue
386397

387-
potential_events, _ = await self.get_relations_for_event(
388-
event_id,
389-
event,
390-
room_id,
391-
RelationTypes.THREAD,
392-
ignored_users,
393-
)
398+
thread_count, latest_thread_event = summary
394399

395-
# If all found events are from ignored users, do not include
396-
# a summary of the thread.
397-
if not potential_events:
398-
continue
400+
# Subtract off the count of any ignored users.
401+
for ignored_user in ignored_users:
402+
thread_count -= ignored_results.get((event_id, ignored_user), 0)
399403

400-
# The *last* event returned is the one that is cared about.
401-
event = await self._event_handler.get_event(
402-
user, room_id, potential_events[-1].event_id
403-
)
404-
# It is unexpected that the event will not exist.
405-
if event is None:
406-
logger.warning(
407-
"Unable to fetch latest event in a thread with event ID: %s",
408-
potential_events[-1].event_id,
409-
)
410-
continue
411-
latest_thread_event = event
412-
413-
results[event_id] = _ThreadAggregation(
414-
latest_event=latest_thread_event,
415-
count=thread_count,
416-
# If there's a thread summary it must also exist in the
417-
# participated dictionary.
418-
current_user_participated=events_by_id[event_id].sender == user_id
419-
or participated[event_id],
404+
# This is gnarly, but if the latest event is from an ignored user,
405+
# attempt to find one that isn't from an ignored user.
406+
if latest_thread_event.sender in ignored_users:
407+
room_id = latest_thread_event.room_id
408+
409+
# If the root event is not found, something went wrong, do
410+
# not include a summary of the thread.
411+
event = await self._event_handler.get_event(user, room_id, event_id)
412+
if event is None:
413+
continue
414+
415+
potential_events, _ = await self.get_relations_for_event(
416+
event_id,
417+
event,
418+
room_id,
419+
RelationTypes.THREAD,
420+
ignored_users,
420421
)
421422

423+
# If all found events are from ignored users, do not include
424+
# a summary of the thread.
425+
if not potential_events:
426+
continue
427+
428+
# The *last* event returned is the one that is cared about.
429+
event = await self._event_handler.get_event(
430+
user, room_id, potential_events[-1].event_id
431+
)
432+
# It is unexpected that the event will not exist.
433+
if event is None:
434+
logger.warning(
435+
"Unable to fetch latest event in a thread with event ID: %s",
436+
potential_events[-1].event_id,
437+
)
438+
continue
439+
latest_thread_event = event
440+
441+
results[event_id] = _ThreadAggregation(
442+
latest_event=latest_thread_event,
443+
count=thread_count,
444+
# If there's a thread summary it must also exist in the
445+
# participated dictionary.
446+
current_user_participated=events_by_id[event_id].sender == user_id
447+
or participated[event_id],
448+
)
449+
422450
return results
423451

424452
@trace
@@ -496,17 +524,18 @@ async def get_bundled_aggregations(
496524
# (as that is what makes it part of the thread).
497525
relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD
498526

499-
# Fetch other relations per event.
500-
for event in events_by_id.values():
501-
# Fetch any annotations (ie, reactions) to bundle with this event.
502-
annotations = await self.get_annotations_for_event(
503-
event.event_id, event.room_id, ignored_users=ignored_users
504-
)
527+
# Fetch any annotations (ie, reactions) to bundle with this event.
528+
annotations_by_event_id = await self.get_annotations_for_events(
529+
events_by_id.keys(), ignored_users=ignored_users
530+
)
531+
for event_id, annotations in annotations_by_event_id.items():
505532
if annotations:
506-
results.setdefault(
507-
event.event_id, BundledAggregations()
508-
).annotations = {"chunk": annotations}
533+
results.setdefault(event_id, BundledAggregations()).annotations = {
534+
"chunk": annotations
535+
}
509536

537+
# Fetch other relations per event.
538+
for event in events_by_id.values():
510539
# Fetch any references to bundle with this event.
511540
references, next_token = await self.get_relations_for_event(
512541
event.event_id,

0 commit comments

Comments
 (0)