Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 7e5d3b0

Browse files
authored
Collect information for PushRuleEvaluator in parallel. (#16590)
Fetch information needed for push rule evaluation in parallel. Ideally this would use query pipelining, but this is not available in psycopg2. Due to the database thread pool this may result in little to no parallelization.
1 parent 1dd3074 commit 7e5d3b0

File tree

4 files changed

+87
-34
lines changed

4 files changed

+87
-34
lines changed

changelog.d/16590.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Run push rule evaluator setup in parallel.

synapse/push/bulk_push_rule_evaluator.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525
Sequence,
2626
Tuple,
2727
Union,
28+
cast,
2829
)
2930

3031
from prometheus_client import Counter
3132

33+
from twisted.internet.defer import Deferred
34+
3235
from synapse.api.constants import (
3336
MAIN_TIMELINE,
3437
EventContentFields,
@@ -40,11 +43,15 @@
4043
from synapse.event_auth import auth_types_for_event, get_user_power_level
4144
from synapse.events import EventBase, relation_from_event
4245
from synapse.events.snapshot import EventContext
46+
from synapse.logging.context import make_deferred_yieldable, run_in_background
4347
from synapse.state import POWER_KEY
4448
from synapse.storage.databases.main.roommember import EventIdMembership
49+
from synapse.storage.roommember import ProfileInfo
4550
from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator
4651
from synapse.types import JsonValue
4752
from synapse.types.state import StateFilter
53+
from synapse.util import unwrapFirstError
54+
from synapse.util.async_helpers import gather_results
4855
from synapse.util.caches import register_cache
4956
from synapse.util.metrics import measure_func
5057
from synapse.visibility import filter_event_for_clients_with_state
@@ -342,15 +349,41 @@ async def _action_for_event_by_user(
342349
rules_by_user = await self._get_rules_for_event(event)
343350
actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
344351

345-
room_member_count = await self.store.get_number_joined_users_in_room(
346-
event.room_id
347-
)
348-
352+
# Gather a bunch of info in parallel.
353+
#
354+
# This has a lot of ignored types and casting due to the use of @cached
355+
# decorated functions passed into run_in_background.
356+
#
357+
# See https://github.com/matrix-org/synapse/issues/16606
349358
(
350-
power_levels,
351-
sender_power_level,
352-
) = await self._get_power_levels_and_sender_level(
353-
event, context, event_id_to_event
359+
room_member_count,
360+
(power_levels, sender_power_level),
361+
related_events,
362+
profiles,
363+
) = await make_deferred_yieldable(
364+
cast(
365+
"Deferred[Tuple[int, Tuple[dict, Optional[int]], Dict[str, Dict[str, JsonValue]], Mapping[str, ProfileInfo]]]",
366+
gather_results(
367+
(
368+
run_in_background( # type: ignore[call-arg]
369+
self.store.get_number_joined_users_in_room, event.room_id # type: ignore[arg-type]
370+
),
371+
run_in_background(
372+
self._get_power_levels_and_sender_level,
373+
event,
374+
context,
375+
event_id_to_event,
376+
),
377+
run_in_background(self._related_events, event),
378+
run_in_background( # type: ignore[call-arg]
379+
self.store.get_subset_users_in_room_with_profiles,
380+
event.room_id, # type: ignore[arg-type]
381+
rules_by_user.keys(), # type: ignore[arg-type]
382+
),
383+
),
384+
consumeErrors=True,
385+
).addErrback(unwrapFirstError),
386+
)
354387
)
355388

356389
# Find the event's thread ID.
@@ -366,8 +399,6 @@ async def _action_for_event_by_user(
366399
# the parent is part of a thread.
367400
thread_id = await self.store.get_thread_id(relation.parent_id)
368401

369-
related_events = await self._related_events(event)
370-
371402
# It's possible that old room versions have non-integer power levels (floats or
372403
# strings; even the occasional `null`). For old rooms, we interpret these as if
373404
# they were integers. Do this here for the `@room` power level threshold.
@@ -400,11 +431,6 @@ async def _action_for_event_by_user(
400431
self.hs.config.experimental.msc1767_enabled, # MSC3931 flag
401432
)
402433

403-
users = rules_by_user.keys()
404-
profiles = await self.store.get_subset_users_in_room_with_profiles(
405-
event.room_id, users
406-
)
407-
408434
for uid, rules in rules_by_user.items():
409435
if event.sender == uid:
410436
continue

synapse/storage/databases/main/push_rule.py

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@
2828
cast,
2929
)
3030

31+
from twisted.internet import defer
32+
3133
from synapse.api.errors import StoreError
3234
from synapse.config.homeserver import ExperimentalConfig
35+
from synapse.logging.context import make_deferred_yieldable, run_in_background
3336
from synapse.replication.tcp.streams import PushRulesStream
3437
from synapse.storage._base import SQLBaseStore
3538
from synapse.storage.database import (
@@ -51,7 +54,8 @@
5154
)
5255
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
5356
from synapse.types import JsonDict
54-
from synapse.util import json_encoder
57+
from synapse.util import json_encoder, unwrapFirstError
58+
from synapse.util.async_helpers import gather_results
5559
from synapse.util.caches.descriptors import cached, cachedList
5660
from synapse.util.caches.stream_change_cache import StreamChangeCache
5761

@@ -249,23 +253,33 @@ async def bulk_get_push_rules(
249253
user_id: [] for user_id in user_ids
250254
}
251255

252-
rows = cast(
253-
List[Tuple[str, str, int, int, str, str]],
254-
await self.db_pool.simple_select_many_batch(
255-
table="push_rules",
256-
column="user_name",
257-
iterable=user_ids,
258-
retcols=(
259-
"user_name",
260-
"rule_id",
261-
"priority_class",
262-
"priority",
263-
"conditions",
264-
"actions",
256+
# gatherResults loses all type information.
257+
rows, enabled_map_by_user = await make_deferred_yieldable(
258+
gather_results(
259+
(
260+
cast(
261+
"defer.Deferred[List[Tuple[str, str, int, int, str, str]]]",
262+
run_in_background(
263+
self.db_pool.simple_select_many_batch,
264+
table="push_rules",
265+
column="user_name",
266+
iterable=user_ids,
267+
retcols=(
268+
"user_name",
269+
"rule_id",
270+
"priority_class",
271+
"priority",
272+
"conditions",
273+
"actions",
274+
),
275+
desc="bulk_get_push_rules",
276+
batch_size=1000,
277+
),
278+
),
279+
run_in_background(self.bulk_get_push_rules_enabled, user_ids),
265280
),
266-
desc="bulk_get_push_rules",
267-
batch_size=1000,
268-
),
281+
consumeErrors=True,
282+
).addErrback(unwrapFirstError)
269283
)
270284

271285
# Sort by highest priority_class, then highest priority.
@@ -276,8 +290,6 @@ async def bulk_get_push_rules(
276290
(rule_id, priority_class, conditions, actions)
277291
)
278292

279-
enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids)
280-
281293
results: Dict[str, FilteredPushRules] = {}
282294

283295
for user_id, rules in raw_rules.items():

synapse/util/async_helpers.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ async def yieldable_gather_results_delaying_cancellation(
345345
T1 = TypeVar("T1")
346346
T2 = TypeVar("T2")
347347
T3 = TypeVar("T3")
348+
T4 = TypeVar("T4")
348349

349350

350351
@overload
@@ -380,6 +381,19 @@ def gather_results(
380381
...
381382

382383

384+
@overload
385+
def gather_results(
386+
deferredList: Tuple[
387+
"defer.Deferred[T1]",
388+
"defer.Deferred[T2]",
389+
"defer.Deferred[T3]",
390+
"defer.Deferred[T4]",
391+
],
392+
consumeErrors: bool = ...,
393+
) -> "defer.Deferred[Tuple[T1, T2, T3, T4]]":
394+
...
395+
396+
383397
def gather_results( # type: ignore[misc]
384398
deferredList: Tuple["defer.Deferred[T1]", ...],
385399
consumeErrors: bool = False,

0 commit comments

Comments
 (0)