Skip to content

Commit e0d2d76

Browse files
committed
Send timeouts in order, and get user
1 parent 7959679 commit e0d2d76

File tree

2 files changed

+24
-26
lines changed

2 files changed

+24
-26
lines changed

synapse/handlers/delayed_events.py

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,11 @@
3030
)
3131

3232
import attr
33-
from twisted.internet import defer
3433
from twisted.internet.interfaces import IDelayedCall
3534

3635
from synapse.api.constants import EventTypes
3736
from synapse.api.errors import Codes, NotFoundError, ShadowBanError, SynapseError
3837
from synapse.events import EventBase
39-
from synapse.logging.context import make_deferred_yieldable
4038
from synapse.logging.opentracing import set_tag
4139
from synapse.metrics.background_process_metrics import run_as_background_process
4240
from synapse.storage.databases.main.delayed_events import (
@@ -92,15 +90,8 @@ def __init__(self, hs: "HomeServer"):
9290
async def _schedule_db_events() -> None:
9391
# TODO: Sync all state first, so that affected delayed state events will be cancelled
9492
events, remaining_timeout_delays = await self.store.process_all_delays(self._get_current_ts())
95-
await make_deferred_yieldable(
96-
defer.gatherResults(
97-
[
98-
run_as_background_process("_send_event", self._send_event, *args)
99-
for args in events
100-
],
101-
consumeErrors=True,
102-
)
103-
)
93+
for args in events:
94+
await self._send_event(*args)
10495

10596
for delay_id, user_localpart, relative_delay in remaining_timeout_delays:
10697
self._schedule(delay_id, user_localpart, relative_delay)
@@ -243,14 +234,11 @@ async def update(self, requester: Requester, delay_id: str, action: str) -> None
243234
self._schedule(delay_id, user_localpart, delay)
244235

245236
elif enum_action == _UpdateDelayedEventAction.SEND:
246-
await self._send_now(delay_id, user_localpart)
247-
248-
async def _send_now(self, delay_id: DelayID, user_localpart: UserLocalpart) -> None:
249-
args, removed_timeout_delay_ids = await self.store.pop_event(delay_id, user_localpart)
237+
args, removed_timeout_delay_ids = await self.store.pop_event(delay_id, user_localpart)
250238

251-
for timeout_delay_id in removed_timeout_delay_ids:
252-
self._unschedule(timeout_delay_id, user_localpart)
253-
await self._send_event(user_localpart, *args)
239+
for timeout_delay_id in removed_timeout_delay_ids:
240+
self._unschedule(timeout_delay_id, user_localpart)
241+
await self._send_event(user_localpart, *args)
254242

255243
async def _send_on_timeout(self, delay_id: DelayID, user_localpart: UserLocalpart) -> None:
256244
del self._delayed_calls[_DelayedCallKey(delay_id, user_localpart)]

synapse/storage/databases/main/delayed_events.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@
6161
JsonDict,
6262
]
6363

64+
# TODO: If a Tuple type hint can be extended, extend the above one
65+
DelayedPartialEventWithUser = Tuple[
66+
UserLocalpart,
67+
RoomID,
68+
EventType,
69+
Optional[StateKey],
70+
Optional[Timestamp],
71+
JsonDict,
72+
]
73+
6474

6575
# TODO: Try to support workers
6676
class DelayedEventsStore(SQLBaseStore):
@@ -295,7 +305,7 @@ async def get_all_for_user(
295305
]
296306

297307
async def process_all_delays(self, current_ts: Timestamp) -> Tuple[
298-
List[DelayedPartialEvent],
308+
List[DelayedPartialEventWithUser],
299309
List[Tuple[DelayID, UserLocalpart, Delay]],
300310
]:
301311
"""
@@ -305,34 +315,34 @@ async def process_all_delays(self, current_ts: Timestamp) -> Tuple[
305315
"""
306316

307317
def process_all_delays_txn(txn: LoggingTransaction) -> Tuple[
308-
List[DelayedPartialEvent],
318+
List[DelayedPartialEventWithUser],
309319
List[Tuple[DelayID, UserLocalpart, Delay]],
310320
]:
311-
events: List[DelayedPartialEvent] = []
321+
events: List[DelayedPartialEventWithUser] = []
312322
removed_timeout_delay_ids: Set[DelayID] = set()
313323

314324
txn.execute(
315325
"""
316326
WITH delay_send_times AS (
317-
SELECT delay_rowid, running_since + delay AS send_ts
327+
SELECT delay_rowid, user_localpart, running_since + delay AS send_ts
318328
FROM delayed_events JOIN delayed_event_timeouts USING (delay_rowid)
319329
)
320-
SELECT delay_rowid
330+
SELECT delay_rowid, user_localpart
321331
FROM delay_send_times
322332
WHERE send_ts < ?
323333
ORDER BY send_ts
324334
""",
325335
(current_ts,),
326336
)
327-
for (delay_rowid,) in txn:
337+
for row in txn:
328338
try:
329339
event, removed_timeout_delay_id = self._pop_event_txn(
330340
txn,
331-
keyvalues={"delay_rowid": delay_rowid},
341+
keyvalues={"delay_rowid": row[0]},
332342
)
333343
except NotFoundError:
334344
pass
335-
events.append(event)
345+
events.append((UserLocalpart(row[1]), *event))
336346
removed_timeout_delay_ids |= removed_timeout_delay_id
337347

338348
txn.execute(

0 commit comments

Comments
 (0)