Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove support for aggregating reactions (#15172)
Browse files Browse the repository at this point in the history
It turns out that no clients rely on server-side aggregation of `m.annotation`
relationships: it's just not very useful as currently implemented.

It's also non-trivial to calculate.

I want to remove it from MSC2677, so to keep the implementation in line, let's
remove it here.
  • Loading branch information
richvdh authored Feb 28, 2023
1 parent b2fd03d commit 2b78981
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 377 deletions.
1 change: 1 addition & 0 deletions changelog.d/15172.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove support for server-side aggregation of reactions.
5 changes: 0 additions & 5 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,6 @@ def _inject_bundled_aggregations(
# being serialized.
serialized_aggregations = {}

if event_aggregations.annotations:
serialized_aggregations[
RelationTypes.ANNOTATION
] = event_aggregations.annotations

if event_aggregations.references:
serialized_aggregations[
RelationTypes.REFERENCE
Expand Down
76 changes: 1 addition & 75 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@ class BundledAggregations:
Some values require additional processing during serialization.
"""

annotations: Optional[JsonDict] = None
references: Optional[JsonDict] = None
replace: Optional[EventBase] = None
thread: Optional[_ThreadAggregation] = None

def __bool__(self) -> bool:
return bool(self.annotations or self.references or self.replace or self.thread)
return bool(self.references or self.replace or self.thread)


class RelationsHandler:
Expand Down Expand Up @@ -227,67 +226,6 @@ async def redact_events_related_to(
e.msg,
)

async def get_annotations_for_events(
self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
) -> Dict[str, List[JsonDict]]:
"""Get a list of annotations to the given events, grouped by event type and
aggregation key, sorted by count.
This is used e.g. to get the what and how many reactions have happened
on an event.
Args:
event_ids: Fetch events that relate to these event IDs.
ignored_users: The users ignored by the requesting user.
Returns:
A map of event IDs to a list of groups of annotations that match.
Each entry is a dict with `type`, `key` and `count` fields.
"""
# Get the base results for all users.
full_results = await self._main_store.get_aggregation_groups_for_events(
event_ids
)

# Avoid additional logic if there are no ignored users.
if not ignored_users:
return {
event_id: results
for event_id, results in full_results.items()
if results
}

# Then subtract off the results for any ignored users.
ignored_results = await self._main_store.get_aggregation_groups_for_users(
[event_id for event_id, results in full_results.items() if results],
ignored_users,
)

filtered_results = {}
for event_id, results in full_results.items():
# If no annotations, skip.
if not results:
continue

# If there are not ignored results for this event, copy verbatim.
if event_id not in ignored_results:
filtered_results[event_id] = results
continue

# Otherwise, subtract out the ignored results.
event_ignored_results = ignored_results[event_id]
for result in results:
key = (result["type"], result["key"])
if key in event_ignored_results:
# Ensure to not modify the cache.
result = result.copy()
result["count"] -= event_ignored_results[key]
if result["count"] <= 0:
continue
filtered_results.setdefault(event_id, []).append(result)

return filtered_results

async def get_references_for_events(
self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
) -> Dict[str, List[_RelatedEvent]]:
Expand Down Expand Up @@ -531,17 +469,6 @@ async def get_bundled_aggregations(
# (as that is what makes it part of the thread).
relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD

async def _fetch_annotations() -> None:
"""Fetch any annotations (ie, reactions) to bundle with this event."""
annotations_by_event_id = await self.get_annotations_for_events(
events_by_id.keys(), ignored_users=ignored_users
)
for event_id, annotations in annotations_by_event_id.items():
if annotations:
results.setdefault(event_id, BundledAggregations()).annotations = {
"chunk": annotations
}

async def _fetch_references() -> None:
"""Fetch any references to bundle with this event."""
references_by_event_id = await self.get_references_for_events(
Expand Down Expand Up @@ -575,7 +502,6 @@ async def _fetch_edits() -> None:
await make_deferred_yieldable(
gather_results(
(
run_in_background(_fetch_annotations),
run_in_background(_fetch_references),
run_in_background(_fetch_edits),
)
Expand Down
3 changes: 0 additions & 3 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,6 @@ def _invalidate_caches_for_event(
if relates_to:
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
self._attempt_to_invalidate_cache(
"get_aggregation_groups_for_event", (relates_to,)
)
self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
Expand Down
4 changes: 0 additions & 4 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2024,10 +2024,6 @@ def _handle_redact_relations(
self.store._invalidate_cache_and_stream(
txn, self.store.get_relations_for_event, (redacted_relates_to,)
)
if rel_type == RelationTypes.ANNOTATION:
self.store._invalidate_cache_and_stream(
txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
)
if rel_type == RelationTypes.REFERENCE:
self.store._invalidate_cache_and_stream(
txn, self.store.get_references_for_event, (redacted_relates_to,)
Expand Down
3 changes: 0 additions & 3 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -1219,9 +1219,6 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
)
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_aggregation_groups_for_event, cache_tuple # type: ignore[attr-defined]
)
self._invalidate_cache_and_stream( # type: ignore[attr-defined]
txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
)
Expand Down
137 changes: 0 additions & 137 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,143 +397,6 @@ async def event_is_target_of_relation(self, parent_id: str) -> bool:
)
return result is not None

@cached()
async def get_aggregation_groups_for_event(
self, event_id: str
) -> Sequence[JsonDict]:
raise NotImplementedError()

@cachedList(
cached_method_name="get_aggregation_groups_for_event", list_name="event_ids"
)
async def get_aggregation_groups_for_events(
self, event_ids: Collection[str]
) -> Mapping[str, Optional[List[JsonDict]]]:
"""Get a list of annotations on the given events, grouped by event type and
aggregation key, sorted by count.
This is used e.g. to get the what and how many reactions have happend
on an event.
Args:
event_ids: Fetch events that relate to these event IDs.
Returns:
A map of event IDs to a list of groups of annotations that match.
Each entry is a dict with `type`, `key` and `count` fields.
"""
# The number of entries to return per event ID.
limit = 5

clause, args = make_in_list_sql_clause(
self.database_engine, "relates_to_id", event_ids
)
args.append(RelationTypes.ANNOTATION)

sql = f"""
SELECT
relates_to_id,
annotation.type,
aggregation_key,
COUNT(DISTINCT annotation.sender)
FROM events AS annotation
INNER JOIN event_relations USING (event_id)
INNER JOIN events AS parent ON
parent.event_id = relates_to_id
AND parent.room_id = annotation.room_id
WHERE
{clause}
AND relation_type = ?
GROUP BY relates_to_id, annotation.type, aggregation_key
ORDER BY relates_to_id, COUNT(*) DESC
"""

def _get_aggregation_groups_for_events_txn(
txn: LoggingTransaction,
) -> Mapping[str, List[JsonDict]]:
txn.execute(sql, args)

result: Dict[str, List[JsonDict]] = {}
for event_id, type, key, count in cast(
List[Tuple[str, str, str, int]], txn
):
event_results = result.setdefault(event_id, [])

# Limit the number of results per event ID.
if len(event_results) == limit:
continue

event_results.append({"type": type, "key": key, "count": count})

return result

return await self.db_pool.runInteraction(
"get_aggregation_groups_for_events", _get_aggregation_groups_for_events_txn
)

async def get_aggregation_groups_for_users(
self, event_ids: Collection[str], users: FrozenSet[str]
) -> Dict[str, Dict[Tuple[str, str], int]]:
"""Fetch the partial aggregations for an event for specific users.
This is used, in conjunction with get_aggregation_groups_for_event, to
remove information from the results for ignored users.
Args:
event_ids: Fetch events that relate to these event IDs.
users: The users to fetch information for.
Returns:
A map of event ID to a map of (event type, aggregation key) to a
count of users.
"""

if not users:
return {}

events_sql, args = make_in_list_sql_clause(
self.database_engine, "relates_to_id", event_ids
)

users_sql, users_args = make_in_list_sql_clause(
self.database_engine, "annotation.sender", users
)
args.extend(users_args)
args.append(RelationTypes.ANNOTATION)

sql = f"""
SELECT
relates_to_id,
annotation.type,
aggregation_key,
COUNT(DISTINCT annotation.sender)
FROM events AS annotation
INNER JOIN event_relations USING (event_id)
INNER JOIN events AS parent ON
parent.event_id = relates_to_id
AND parent.room_id = annotation.room_id
WHERE {events_sql} AND {users_sql} AND relation_type = ?
GROUP BY relates_to_id, annotation.type, aggregation_key
ORDER BY relates_to_id, COUNT(*) DESC
"""

def _get_aggregation_groups_for_users_txn(
txn: LoggingTransaction,
) -> Dict[str, Dict[Tuple[str, str], int]]:
txn.execute(sql, args)

result: Dict[str, Dict[Tuple[str, str], int]] = {}
for event_id, type, key, count in cast(
List[Tuple[str, str, str, int]], txn
):
result.setdefault(event_id, {})[(type, key)] = count

return result

return await self.db_pool.runInteraction(
"get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn
)

@cached()
async def get_references_for_event(self, event_id: str) -> List[JsonDict]:
raise NotImplementedError()
Expand Down
Loading

0 comments on commit 2b78981

Please sign in to comment.