Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 2b7c180

Browse files
authored
Start fewer opentracing spans (#8640)
#8567 started a span for every background process. This is good as it means all Synapse code that gets run should be in a span (unless in the sentinel logging context), but it means we generate about 15x the number of spans as we did previously. This PR attempts to reduce that number by a) not starting one for send commands to Redis, and b) deferring starting background processes until after we're sure they're necessary. I don't really know how much this will help.
1 parent 34a5696 commit 2b7c180

File tree

8 files changed

+96
-53
lines changed

8 files changed

+96
-53
lines changed

changelog.d/8640.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce number of OpenTracing spans started.

synapse/handlers/appservice.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# limitations under the License.
1515

1616
import logging
17-
from typing import Dict, List, Optional
17+
from typing import Dict, List, Optional, Union
1818

1919
from prometheus_client import Counter
2020

@@ -30,7 +30,10 @@
3030
event_processing_loop_counter,
3131
event_processing_loop_room_count,
3232
)
33-
from synapse.metrics.background_process_metrics import run_as_background_process
33+
from synapse.metrics.background_process_metrics import (
34+
run_as_background_process,
35+
wrap_as_background_process,
36+
)
3437
from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
3538
from synapse.util.metrics import Measure
3639

@@ -53,7 +56,7 @@ def __init__(self, hs):
5356
self.current_max = 0
5457
self.is_processing = False
5558

56-
async def notify_interested_services(self, max_token: RoomStreamToken):
59+
def notify_interested_services(self, max_token: RoomStreamToken):
5760
"""Notifies (pushes) all application services interested in this event.
5861
5962
Pushing is done asynchronously, so this method won't block for any
@@ -72,6 +75,12 @@ async def notify_interested_services(self, max_token: RoomStreamToken):
7275
if self.is_processing:
7376
return
7477

78+
# We only start a new background process if necessary rather than
79+
# optimistically (to cut down on overhead).
80+
self._notify_interested_services(max_token)
81+
82+
@wrap_as_background_process("notify_interested_services")
83+
async def _notify_interested_services(self, max_token: RoomStreamToken):
7584
with Measure(self.clock, "notify_interested_services"):
7685
self.is_processing = True
7786
try:
@@ -166,8 +175,11 @@ async def handle_room_events(events):
166175
finally:
167176
self.is_processing = False
168177

169-
async def notify_interested_services_ephemeral(
170-
self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
178+
def notify_interested_services_ephemeral(
179+
self,
180+
stream_key: str,
181+
new_token: Optional[int],
182+
users: Collection[Union[str, UserID]] = [],
171183
):
172184
"""This is called by the notifier in the background
173185
when a ephemeral event handled by the homeserver.
@@ -183,13 +195,34 @@ async def notify_interested_services_ephemeral(
183195
new_token: The latest stream token
184196
users: The user(s) involved with the event.
185197
"""
198+
if not self.notify_appservices:
199+
return
200+
201+
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
202+
return
203+
186204
services = [
187205
service
188206
for service in self.store.get_app_services()
189207
if service.supports_ephemeral
190208
]
191-
if not services or not self.notify_appservices:
209+
if not services:
192210
return
211+
212+
# We only start a new background process if necessary rather than
213+
# optimistically (to cut down on overhead).
214+
self._notify_interested_services_ephemeral(
215+
services, stream_key, new_token, users
216+
)
217+
218+
@wrap_as_background_process("notify_interested_services_ephemeral")
219+
async def _notify_interested_services_ephemeral(
220+
self,
221+
services: List[ApplicationService],
222+
stream_key: str,
223+
new_token: Optional[int],
224+
users: Collection[Union[str, UserID]],
225+
):
193226
logger.info("Checking interested services for %s" % (stream_key))
194227
with Measure(self.clock, "notify_interested_services_ephemeral"):
195228
for service in services:
@@ -237,14 +270,17 @@ async def _handle_receipts(self, service: ApplicationService):
237270
return receipts
238271

239272
async def _handle_presence(
240-
self, service: ApplicationService, users: Collection[UserID]
273+
self, service: ApplicationService, users: Collection[Union[str, UserID]]
241274
):
242275
events = [] # type: List[JsonDict]
243276
presence_source = self.event_sources.sources["presence"]
244277
from_key = await self.store.get_type_stream_id_for_appservice(
245278
service, "presence"
246279
)
247280
for user in users:
281+
if isinstance(user, str):
282+
user = UserID.from_string(user)
283+
248284
interested = await service.is_interested_in_presence(user, self.store)
249285
if not interested:
250286
continue

synapse/logging/opentracing.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ def ensure_active_span_inner_2(*args, **kwargs):
317317

318318

319319
@contextlib.contextmanager
320-
def _noop_context_manager(*args, **kwargs):
320+
def noop_context_manager(*args, **kwargs):
321321
"""Does exactly what it says on the tin"""
322322
yield
323323

@@ -413,7 +413,7 @@ def start_active_span(
413413
"""
414414

415415
if opentracing is None:
416-
return _noop_context_manager()
416+
return noop_context_manager()
417417

418418
return opentracing.tracer.start_active_span(
419419
operation_name,
@@ -428,7 +428,7 @@ def start_active_span(
428428

429429
def start_active_span_follows_from(operation_name, contexts):
430430
if opentracing is None:
431-
return _noop_context_manager()
431+
return noop_context_manager()
432432

433433
references = [opentracing.follows_from(context) for context in contexts]
434434
scope = start_active_span(operation_name, references=references)
@@ -459,7 +459,7 @@ def start_active_span_from_request(
459459
# Also, twisted uses byte arrays while opentracing expects strings.
460460

461461
if opentracing is None:
462-
return _noop_context_manager()
462+
return noop_context_manager()
463463

464464
header_dict = {
465465
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
@@ -497,7 +497,7 @@ def start_active_span_from_edu(
497497
"""
498498

499499
if opentracing is None:
500-
return _noop_context_manager()
500+
return noop_context_manager()
501501

502502
carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
503503
"opentracing", {}

synapse/metrics/background_process_metrics.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from twisted.internet import defer
2525

2626
from synapse.logging.context import LoggingContext, PreserveLoggingContext
27-
from synapse.logging.opentracing import start_active_span
27+
from synapse.logging.opentracing import noop_context_manager, start_active_span
2828

2929
if TYPE_CHECKING:
3030
import resource
@@ -167,7 +167,7 @@ def update_metrics(self):
167167
)
168168

169169

170-
def run_as_background_process(desc: str, func, *args, **kwargs):
170+
def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
171171
"""Run the given function in its own logcontext, with resource metrics
172172
173173
This should be used to wrap processes which are fired off to run in the
@@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
181181
Args:
182182
desc: a description for this background process type
183183
func: a function, which may return a Deferred or a coroutine
184+
bg_start_span: Whether to start an opentracing span. Defaults to True.
185+
Should only be disabled for processes that will not log to or tag
186+
a span.
184187
args: positional args for func
185188
kwargs: keyword args for func
186189
@@ -199,7 +202,10 @@ async def run():
199202
with BackgroundProcessLoggingContext(desc) as context:
200203
context.request = "%s-%i" % (desc, count)
201204
try:
202-
with start_active_span(desc, tags={"request_id": context.request}):
205+
ctx = noop_context_manager()
206+
if bg_start_span:
207+
ctx = start_active_span(desc, tags={"request_id": context.request})
208+
with ctx:
203209
result = func(*args, **kwargs)
204210

205211
if inspect.isawaitable(result):

synapse/notifier.py

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
from synapse.logging.context import PreserveLoggingContext
4141
from synapse.logging.utils import log_function
4242
from synapse.metrics import LaterGauge
43-
from synapse.metrics.background_process_metrics import run_as_background_process
4443
from synapse.streams.config import PaginationConfig
4544
from synapse.types import (
4645
Collection,
@@ -310,44 +309,37 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
310309
"""
311310

312311
# poke any interested application service.
313-
run_as_background_process(
314-
"_notify_app_services", self._notify_app_services, max_room_stream_token
315-
)
316-
317-
run_as_background_process(
318-
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
319-
)
312+
self._notify_app_services(max_room_stream_token)
313+
self._notify_pusher_pool(max_room_stream_token)
320314

321315
if self.federation_sender:
322316
self.federation_sender.notify_new_events(max_room_stream_token)
323317

324-
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
318+
def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
325319
try:
326-
await self.appservice_handler.notify_interested_services(
327-
max_room_stream_token
328-
)
320+
self.appservice_handler.notify_interested_services(max_room_stream_token)
329321
except Exception:
330322
logger.exception("Error notifying application services of event")
331323

332-
async def _notify_app_services_ephemeral(
324+
def _notify_app_services_ephemeral(
333325
self,
334326
stream_key: str,
335327
new_token: Union[int, RoomStreamToken],
336-
users: Collection[UserID] = [],
328+
users: Collection[Union[str, UserID]] = [],
337329
):
338330
try:
339331
stream_token = None
340332
if isinstance(new_token, int):
341333
stream_token = new_token
342-
await self.appservice_handler.notify_interested_services_ephemeral(
334+
self.appservice_handler.notify_interested_services_ephemeral(
343335
stream_key, stream_token, users
344336
)
345337
except Exception:
346338
logger.exception("Error notifying application services of event")
347339

348-
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
340+
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
349341
try:
350-
await self._pusher_pool.on_new_notifications(max_room_stream_token)
342+
self._pusher_pool.on_new_notifications(max_room_stream_token)
351343
except Exception:
352344
logger.exception("Error pusher pool of event")
353345

@@ -384,12 +376,8 @@ def on_new_event(
384376
self.notify_replication()
385377

386378
# Notify appservices
387-
run_as_background_process(
388-
"_notify_app_services_ephemeral",
389-
self._notify_app_services_ephemeral,
390-
stream_key,
391-
new_token,
392-
users,
379+
self._notify_app_services_ephemeral(
380+
stream_key, new_token, users,
393381
)
394382

395383
def on_new_replication_data(self) -> None:

synapse/push/pusherpool.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
from prometheus_client import Gauge
2121

22-
from synapse.metrics.background_process_metrics import run_as_background_process
22+
from synapse.metrics.background_process_metrics import (
23+
run_as_background_process,
24+
wrap_as_background_process,
25+
)
2326
from synapse.push import PusherConfigException
2427
from synapse.push.emailpusher import EmailPusher
2528
from synapse.push.httppusher import HttpPusher
@@ -187,7 +190,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
187190
)
188191
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
189192

190-
async def on_new_notifications(self, max_token: RoomStreamToken):
193+
def on_new_notifications(self, max_token: RoomStreamToken):
191194
if not self.pushers:
192195
# nothing to do here.
193196
return
@@ -201,6 +204,17 @@ async def on_new_notifications(self, max_token: RoomStreamToken):
201204
# Nothing to do
202205
return
203206

207+
# We only start a new background process if necessary rather than
208+
# optimistically (to cut down on overhead).
209+
self._on_new_notifications(max_token)
210+
211+
@wrap_as_background_process("on_new_notifications")
212+
async def _on_new_notifications(self, max_token: RoomStreamToken):
213+
# We just use the minimum stream ordering and ignore the vector clock
214+
# component. This is safe to do as long as we *always* ignore the vector
215+
# clock components.
216+
max_stream_id = max_token.stream
217+
204218
prev_stream_id = self._last_room_stream_id_seen
205219
self._last_room_stream_id_seen = max_stream_id
206220

synapse/replication/tcp/redis.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ def send_command(self, cmd: Command):
166166
Args:
167167
cmd (Command)
168168
"""
169-
run_as_background_process("send-cmd", self._async_send_command, cmd)
169+
run_as_background_process(
170+
"send-cmd", self._async_send_command, cmd, bg_start_span=False
171+
)
170172

171173
async def _async_send_command(self, cmd: Command):
172174
"""Encode a replication command and send it over our outbound connection"""

tests/handlers/test_appservice.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def setUp(self):
4242
hs.get_clock.return_value = MockClock()
4343
self.handler = ApplicationServicesHandler(hs)
4444

45-
@defer.inlineCallbacks
4645
def test_notify_interested_services(self):
4746
interested_service = self._mkservice(is_interested=True)
4847
services = [
@@ -62,14 +61,12 @@ def test_notify_interested_services(self):
6261
defer.succeed((0, [event])),
6362
defer.succeed((0, [])),
6463
]
65-
yield defer.ensureDeferred(
66-
self.handler.notify_interested_services(RoomStreamToken(None, 0))
67-
)
64+
self.handler.notify_interested_services(RoomStreamToken(None, 0))
65+
6866
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
6967
interested_service, event
7068
)
7169

72-
@defer.inlineCallbacks
7370
def test_query_user_exists_unknown_user(self):
7471
user_id = "@someone:anywhere"
7572
services = [self._mkservice(is_interested=True)]
@@ -83,12 +80,11 @@ def test_query_user_exists_unknown_user(self):
8380
defer.succeed((0, [event])),
8481
defer.succeed((0, [])),
8582
]
86-
yield defer.ensureDeferred(
87-
self.handler.notify_interested_services(RoomStreamToken(None, 0))
88-
)
83+
84+
self.handler.notify_interested_services(RoomStreamToken(None, 0))
85+
8986
self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
9087

91-
@defer.inlineCallbacks
9288
def test_query_user_exists_known_user(self):
9389
user_id = "@someone:anywhere"
9490
services = [self._mkservice(is_interested=True)]
@@ -102,9 +98,9 @@ def test_query_user_exists_known_user(self):
10298
defer.succeed((0, [event])),
10399
defer.succeed((0, [])),
104100
]
105-
yield defer.ensureDeferred(
106-
self.handler.notify_interested_services(RoomStreamToken(None, 0))
107-
)
101+
102+
self.handler.notify_interested_services(RoomStreamToken(None, 0))
103+
108104
self.assertFalse(
109105
self.mock_as_api.query_user.called,
110106
"query_user called when it shouldn't have been.",

0 commit comments

Comments
 (0)