Skip to content

Commit 7c99a5f

Browse files
committed
Add spans to Fine Performance Metrics bokeh dashboard
1 parent 7072e66 commit 7c99a5f

File tree

4 files changed

+143
-3
lines changed

4 files changed

+143
-3
lines changed

distributed/dashboard/components/scheduler.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
from distributed.diagnostics.task_stream import colors as ts_color_lookup
9191
from distributed.metrics import time
9292
from distributed.scheduler import Scheduler
93+
from distributed.spans import SpansSchedulerExtension
9394
from distributed.utils import Log, log_errors
9495

9596
if dask.config.get("distributed.dashboard.export-tool"):
@@ -3392,6 +3393,7 @@ class FinePerformanceMetrics(DashboardComponent):
33923393
visible_functions: list[str]
33933394
visible_activities: list[str]
33943395
function_selector: MultiChoice
3396+
span_tag_selector: MultiChoice
33953397
unit_selector: Select
33963398
task_exec_by_activity_chart: figure | None
33973399
task_exec_by_prefix_chart: figure | None
@@ -3418,12 +3420,19 @@ def __init__(self, scheduler: Scheduler, **kwargs: Any):
34183420
value=[],
34193421
options=[],
34203422
)
3423+
self.span_tag_selector = MultiChoice(
3424+
title="Filter by span tag",
3425+
placeholder="Select specific span tags",
3426+
value=[],
3427+
options=[],
3428+
)
34213429
self.unit_selector = Select(title="Unit selection", options=["seconds"])
34223430
self.unit_selector.value = "seconds"
34233431
self.unit_selector.on_change("value", self._handle_change_unit)
34243432

34253433
selectors_row = row(
34263434
children=[
3435+
self.span_tag_selector,
34273436
self.function_selector,
34283437
self.unit_selector,
34293438
],
@@ -3501,6 +3510,7 @@ def _update_selectors(self) -> None:
35013510
35023511
- self.unit_selector
35033512
- self.function_selector
3513+
- self.span_tag_selector
35043514
"""
35053515
units = set()
35063516
functions = set()
@@ -3529,6 +3539,18 @@ def _update_selectors(self) -> None:
35293539
f"Filter by function ({len(self.function_selector.options)}):"
35303540
)
35313541

3542+
spans_ext: SpansSchedulerExtension | None = self.scheduler.extensions.get(
3543+
"spans"
3544+
)
3545+
if spans_ext:
3546+
tags = set(spans_ext.spans_search_by_tag)
3547+
tags.difference_update(self.span_tag_selector.options)
3548+
if tags:
3549+
self.span_tag_selector.options.extend(tags)
3550+
self.span_tag_selector.title = (
3551+
f"Filter by span tag ({len(self.span_tag_selector.options)}):"
3552+
)
3553+
35323554
def _format(self, val: float) -> str:
35333555
unit = self.unit_selector.value
35343556
assert isinstance(unit, str)
@@ -3573,17 +3595,28 @@ def _build_data_sources(self) -> None:
35733595

35743596
function_sel = set(self.function_selector.value)
35753597

3576-
for k, v in self.scheduler.cumulative_worker_metrics.items():
3598+
if self.span_tag_selector.value:
3599+
spans_ext: SpansSchedulerExtension = self.scheduler.extensions["spans"]
3600+
metrics = spans_ext.merge_by_tags(
3601+
*self.span_tag_selector.value
3602+
).cumulative_worker_metrics
3603+
has_span_id = False
3604+
else:
3605+
metrics = self.scheduler.cumulative_worker_metrics
3606+
has_span_id = True
3607+
3608+
for k, v in metrics.items():
35773609
if not isinstance(k, tuple):
3578-
continue
3610+
# Only happens in global metrics
3611+
continue # type: ignore[unreachable]
35793612
context, *other, activity, unit = k
35803613
assert isinstance(unit, str)
35813614
assert self.unit_selector.value
35823615
if unit != self.unit_selector.value:
35833616
continue
35843617

35853618
if context == "execute":
3586-
_, function = other
3619+
function = other[1 if has_span_id else 0]
35873620
assert isinstance(function, str)
35883621
if not function_sel or function in function_sel:
35893622
# Custom metrics can provide any hashable as the label
@@ -3595,6 +3628,7 @@ def _build_data_sources(self) -> None:
35953628
execute[activity] += v
35963629

35973630
elif context == "get-data" and not function_sel:
3631+
# Note: this will always be empty when a span is selected
35983632
assert isinstance(activity, str)
35993633
visible_activities.add(activity)
36003634
get_data[activity] += v

distributed/dashboard/tests/test_scheduler_bokeh.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,10 @@ def f():
405405
cl.update()
406406
assert cl.visible_activities == orig_activities
407407

408+
cl.span_tag_selector.value = ["foo"]
409+
cl.update()
410+
assert sorted(cl.visible_functions) == ["y", "z"]
411+
408412

409413
@gen_cluster(client=True)
410414
async def test_ClusterMemory(c, s, a, b):

distributed/spans.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,16 @@ def cumulative_worker_metrics(self) -> defaultdict[tuple[str, ...], float]:
282282
out[k] += v
283283
return out
284284

285+
@staticmethod
286+
def merge(*items: Span) -> Span:
287+
"""Merge multiple spans into a synthetic one.
288+
The input spans must not be related with each other.
289+
"""
290+
out = Span(name=("(merged)",), id_="(merged)", parent=None)
291+
out.children.extend(items)
292+
out.enqueued = min(child.enqueued for child in items)
293+
return out
294+
285295

286296
class SpansSchedulerExtension:
287297
"""Scheduler extension for spans support"""
@@ -300,6 +310,11 @@ class SpansSchedulerExtension:
300310
#: All spans, keyed by the individual tags that make up their name and sorted by
301311
#: creation time.
302312
#: This is a convenience helper structure to speed up searches.
313+
#:
314+
#: See Also
315+
#: --------
316+
#: find_by_tags
317+
#: merge_by_tags
303318
spans_search_by_tag: defaultdict[str, list[Span]]
304319

305320
def __init__(self, scheduler: Scheduler):
@@ -379,6 +394,29 @@ def _ensure_span(self, name: tuple[str, ...], ids: tuple[str, ...]) -> Span:
379394

380395
return span
381396

397+
def find_by_tags(self, *tags: str) -> Iterator[Span]:
398+
"""Yield all spans that contain any of the given tags.
399+
When a tag is shared both by a span and its (grand)children, only return the
400+
parent.
401+
"""
402+
by_level = defaultdict(list)
403+
for tag in tags:
404+
for sp in self.spans_search_by_tag[tag]:
405+
by_level[len(sp.name)].append(sp)
406+
407+
seen = set()
408+
for _, level in sorted(by_level.items()):
409+
seen.update(level)
410+
for sp in level:
411+
if sp.parent not in seen:
412+
yield sp
413+
414+
def merge_by_tags(self, *tags: str) -> Span:
415+
"""Return a synthetic Span which is the sum of all spans containing the given
416+
tags
417+
"""
418+
return Span.merge(*self.find_by_tags(*tags))
419+
382420
def heartbeat(
383421
self, ws: WorkerState, data: dict[str, dict[tuple[str, ...], float]]
384422
) -> None:

distributed/tests/test_spans.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
async_poll_for,
1414
gen_cluster,
1515
inc,
16+
slowinc,
1617
wait_for_state,
1718
)
1819

@@ -530,3 +531,66 @@ async def test_worker_metrics(c, s, a, b):
530531
]
531532
for k, v in cum_metrics.items():
532533
assert v == sum(sp._cumulative_worker_metrics[k] for sp in ext.spans.values())
534+
535+
536+
@gen_cluster(client=True)
537+
async def test_merge_by_tags(c, s, a, b):
538+
with span("foo") as foo1:
539+
await c.submit(inc, 1, key="x1")
540+
with span("bar") as bar1: # foo, bar
541+
await c.submit(inc, 2, key="x2")
542+
with span("foo") as foo2: # foo, bar, foo
543+
await c.submit(inc, 3, key="x3")
544+
with span("foo") as foo3: # foo, foo
545+
await c.submit(inc, 4, key="x4")
546+
with span("bar") as bar2: # bar
547+
await c.submit(inc, 5, key="x5")
548+
549+
ext = s.extensions["spans"]
550+
assert {s.id for s in ext.find_by_tags("foo")} == {foo1}
551+
assert {s.id for s in ext.find_by_tags("foo", "bar")} == {foo1, bar2}
552+
assert {s.id for s in ext.find_by_tags("bar", "foo")} == {foo1, bar2}
553+
assert {s.id for s in ext.find_by_tags("bar")} == {bar1, bar2}
554+
555+
def tgnames(*tags):
556+
return [tg.name for tg in ext.merge_by_tags(*tags).traverse_groups()]
557+
558+
assert tgnames("foo") == ["x1", "x2", "x3", "x4"]
559+
assert tgnames("foo", "bar") == ["x1", "x2", "x3", "x4", "x5"]
560+
assert tgnames("bar", "foo") == ["x5", "x1", "x2", "x3", "x4"]
561+
assert tgnames("bar") == ["x5", "x2", "x3"]
562+
563+
564+
@gen_cluster(client=True)
565+
async def test_merge_by_tags_metrics(c, s, a, b):
566+
with span("foo") as foo1:
567+
await c.submit(slowinc, 1, delay=0.05, key="x-1")
568+
await async_poll_for(lambda: not s.task_groups, timeout=5)
569+
570+
with span("foo") as foo2:
571+
await c.submit(slowinc, 2, delay=0.06, key="x-2")
572+
await async_poll_for(lambda: not s.task_groups, timeout=5)
573+
574+
with span("bar") as bar1:
575+
await c.submit(slowinc, 3, delay=0.07, key="x-3")
576+
await async_poll_for(lambda: not s.task_groups, timeout=5)
577+
578+
await a.heartbeat()
579+
await b.heartbeat()
580+
581+
ext = s.extensions["spans"]
582+
k = ("execute", "x", "thread-noncpu", "seconds")
583+
t_foo = ext.merge_by_tags("foo").cumulative_worker_metrics[k]
584+
t_bar = ext.merge_by_tags("bar").cumulative_worker_metrics[k]
585+
t_foo1 = ext.spans[foo1]._cumulative_worker_metrics[k]
586+
t_foo2 = ext.spans[foo2]._cumulative_worker_metrics[k]
587+
t_bar1 = ext.spans[bar1]._cumulative_worker_metrics[k]
588+
assert t_foo1 > 0
589+
assert t_foo2 > 0
590+
assert t_bar1 > 0
591+
assert t_foo == t_foo1 + t_foo2
592+
assert t_bar == t_bar1
593+
594+
assert ext.merge_by_tags("foo").enqueued == min(
595+
ext.spans[foo1].enqueued, ext.spans[foo2].enqueued
596+
)

0 commit comments

Comments
 (0)