|
14 | 14 |
|
15 | 15 | import abc
|
16 | 16 | import logging
|
| 17 | +from collections import OrderedDict |
17 | 18 | from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
18 | 19 |
|
| 20 | +import attr |
19 | 21 | from prometheus_client import Counter
|
| 22 | +from typing_extensions import Literal |
20 | 23 |
|
21 | 24 | from twisted.internet import defer
|
22 | 25 |
|
|
33 | 36 | event_processing_loop_room_count,
|
34 | 37 | events_processed_counter,
|
35 | 38 | )
|
36 |
| -from synapse.metrics.background_process_metrics import run_as_background_process |
| 39 | +from synapse.metrics.background_process_metrics import ( |
| 40 | + run_as_background_process, |
| 41 | + wrap_as_background_process, |
| 42 | +) |
37 | 43 | from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
|
| 44 | +from synapse.util import Clock |
38 | 45 | from synapse.util.metrics import Measure
|
39 | 46 |
|
40 | 47 | if TYPE_CHECKING:
|
@@ -137,6 +144,84 @@ async def get_replication_rows(
|
137 | 144 | raise NotImplementedError()
|
138 | 145 |
|
139 | 146 |
|
| 147 | +@attr.s |
| 148 | +class _PresenceQueue: |
| 149 | + """A queue of destinations that need to be woken up due to new presence |
| 150 | + updates. |
| 151 | +
|
| 152 | + Staggers waking up of per destination queues to ensure that we don't attempt |
| 153 | + to start TLS connections with many hosts all at once, leading to pinned CPU. |
| 154 | + """ |
| 155 | + |
| 156 | + # The maximum duration in seconds between queuing up a destination and it |
| 157 | + # being woken up. |
| 158 | + _MAX_TIME_IN_QUEUE = 30.0 |
| 159 | + |
| 160 | + # The maximum duration in seconds between waking up consecutive destination |
| 161 | + # queues. |
| 162 | + _MAX_DELAY = 0.1 |
| 163 | + |
| 164 | + sender: "FederationSender" = attr.ib() |
| 165 | + clock: Clock = attr.ib() |
| 166 | + queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict) |
| 167 | + processing: bool = attr.ib(default=False) |
| 168 | + |
| 169 | + def add_to_queue(self, destination: str) -> None: |
| 170 | + """Add a destination to the queue to be woken up.""" |
| 171 | + |
| 172 | + self.queue[destination] = None |
| 173 | + |
| 174 | + if not self.processing: |
| 175 | + self._handle() |
| 176 | + |
| 177 | + @wrap_as_background_process("_PresenceQueue.handle") |
| 178 | + async def _handle(self) -> None: |
| 179 | + """Background process to drain the queue.""" |
| 180 | + |
| 181 | + if not self.queue: |
| 182 | + return |
| 183 | + |
| 184 | + assert not self.processing |
| 185 | + self.processing = True |
| 186 | + |
| 187 | + try: |
| 188 | + # We start with a delay that should drain the queue quickly enough that |
| 189 | + # we process all destinations in the queue in _MAX_TIME_IN_QUEUE |
| 190 | + # seconds. |
| 191 | + # |
| 192 | + # We also add an upper bound to the delay, to gracefully handle the |
| 193 | + # case where the queue only has a few entries in it. |
| 194 | + current_sleep_seconds = min( |
| 195 | + self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue) |
| 196 | + ) |
| 197 | + |
| 198 | + while self.queue: |
| 199 | + destination, _ = self.queue.popitem(last=False) |
| 200 | + |
| 201 | + queue = self.sender._get_per_destination_queue(destination) |
| 202 | + |
| 203 | + if not queue._new_data_to_send: |
| 204 | + # The per destination queue has already been woken up. |
| 205 | + continue |
| 206 | + |
| 207 | + queue.attempt_new_transaction() |
| 208 | + |
| 209 | + await self.clock.sleep(current_sleep_seconds) |
| 210 | + |
| 211 | + if not self.queue: |
| 212 | + break |
| 213 | + |
| 214 | + # More destinations may have been added to the queue, so we may |
| 215 | + # need to reduce the delay to ensure everything gets processed |
| 216 | + # within _MAX_TIME_IN_QUEUE seconds. |
| 217 | + current_sleep_seconds = min( |
| 218 | + current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue) |
| 219 | + ) |
| 220 | + |
| 221 | + finally: |
| 222 | + self.processing = False |
| 223 | + |
| 224 | + |
140 | 225 | class FederationSender(AbstractFederationSender):
|
141 | 226 | def __init__(self, hs: "HomeServer"):
|
142 | 227 | self.hs = hs
|
@@ -208,6 +293,8 @@ def __init__(self, hs: "HomeServer"):
|
208 | 293 |
|
209 | 294 | self._external_cache = hs.get_external_cache()
|
210 | 295 |
|
| 296 | + self._presence_queue = _PresenceQueue(self, self.clock) |
| 297 | + |
211 | 298 | def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
|
212 | 299 | """Get or create a PerDestinationQueue for the given destination
|
213 | 300 |
|
@@ -517,7 +604,12 @@ def send_presence_to_destinations(
|
517 | 604 | self._instance_name, destination
|
518 | 605 | ):
|
519 | 606 | continue
|
520 |
| - self._get_per_destination_queue(destination).send_presence(states) |
| 607 | + |
| 608 | + self._get_per_destination_queue(destination).send_presence( |
| 609 | + states, start_loop=False |
| 610 | + ) |
| 611 | + |
| 612 | + self._presence_queue.add_to_queue(destination) |
521 | 613 |
|
522 | 614 | def build_and_send_edu(
|
523 | 615 | self,
|
|
0 commit comments