Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dashboard: Fine Performance Metrics #7725

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ba43e75
Prototype: Metrics by Execution stacked barchart
milesgranger Mar 24, 2023
52ee692
Add aggregated piechart for execution timings [skip ci]
milesgranger Mar 31, 2023
b598ab8
Working dated metrics, needs function selector enabled
milesgranger Apr 3, 2023
fed73e2
Smoother updating rotating functions
milesgranger Apr 11, 2023
d585786
Properly show selected functions
milesgranger Apr 11, 2023
367ae0b
Show number of functions to select
milesgranger Apr 11, 2023
27e483c
Fix misaligned columns
milesgranger Apr 11, 2023
b1a2810
Update get-data to only show seconds ops
milesgranger Apr 11, 2023
dd5d5ff
Make pre-commit happy
milesgranger Apr 11, 2023
abe5024
Fixup: breakpoint
milesgranger Apr 11, 2023
ebc05a6
Fix color palette when empty ops
milesgranger Apr 12, 2023
da239a0
Drop: Trial blanket rerun setting
milesgranger Apr 12, 2023
6010efc
Revert rerunning failed tests on all tests
milesgranger Apr 14, 2023
c9b5039
Review feedback: Update plot titles
milesgranger May 4, 2023
64f6d8e
Review feedback: Update tooltip for execution by prefix
milesgranger May 4, 2023
cf8bc8b
Send data, by activity to pie chart
milesgranger May 4, 2023
884262d
Add FinePerformanceMetrics dashboard test
milesgranger May 4, 2023
a4feff5
Update task execution, by activity toggle bytes
milesgranger May 9, 2023
f5ee251
Merge fine metrics classes, and refactor
milesgranger May 9, 2023
0ed4cc4
Fix rotating functions in task execution by prefix
milesgranger May 9, 2023
e994cfc
Make unit selection dropdown & dynamic
milesgranger May 10, 2023
d9f98e5
Merge branch 'main' into 7679-perf-metrics-bokeh-dashboard
crusaderky May 10, 2023
846c1a1
Code review
crusaderky May 10, 2023
d32cf0f
lint
crusaderky May 10, 2023
a4e1def
Make all plots dynamic
milesgranger May 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/comm/tests/test_ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_registered(ucx_loop):


async def get_comm_pair(
listen_addr="ucx://" + HOST, listen_args=None, connect_args=None, **kwargs
listen_addr=f"ucx://{HOST}", listen_args=None, connect_args=None, **kwargs
):
listen_args = listen_args or {}
connect_args = connect_args or {}
Expand Down
297 changes: 295 additions & 2 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
from collections import OrderedDict, defaultdict
from collections.abc import Iterable
from datetime import datetime
from datetime import datetime, timedelta
from numbers import Number
from typing import Any, TypeVar

Expand All @@ -30,12 +30,14 @@
HelpTool,
HoverTool,
HTMLTemplateFormatter,
MultiChoice,
NumberFormatter,
NumeralTickFormatter,
OpenURL,
PanTool,
Range1d,
ResetTool,
Select,
Tabs,
TapTool,
Title,
Expand All @@ -44,7 +46,7 @@
)
from bokeh.models.widgets import DataTable, TableColumn
from bokeh.models.widgets.markups import Div
from bokeh.palettes import Viridis11
from bokeh.palettes import Viridis11, small_palettes
from bokeh.plotting import figure
from bokeh.themes import Theme
from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack
Expand Down Expand Up @@ -3375,6 +3377,297 @@ def update(self):
)


class FinePerformanceMetrics(DashboardComponent):
"""
The main overview of the Fine Performance Metrics page.
"""

@log_errors
def __init__(self, scheduler, **kwargs):
self.scheduler = scheduler
self.senddata = defaultdict(list)
self.sendsrc = ColumnDataSource(data=dict())
self.task_exec_data = defaultdict(list)
self.task_exec_data_limited = defaultdict(list)
self.task_exec_by_prefix_src = ColumnDataSource(data=dict())
self.task_exec_by_activity_src = ColumnDataSource(data=dict())
self.substantial_change = False
self.task_activities = []
self.init_root()

def init_root(self):
def handle_selector_chng(attr, old, new):
self.unit_selected = new
self.substantial_change = True

self.function_selector = MultiChoice(value=[], options=[])
self.function_selector.placeholder = "Select specific functions"
self.unit_selector = Select(title="Unit selection", options=[])
self.unit_selector.on_change("value", handle_selector_chng)
self.unit_selected = "seconds"
self.task_exec_by_activity_chart = figure()
self.task_exec_by_prefix_chart = figure()
self.senddata_by_activity_chart = figure()
self.root = column(
self.function_selector,
self.unit_selector,
row(
[
self.task_exec_by_prefix_chart,
self.task_exec_by_activity_chart,
self.senddata_by_activity_chart,
],
sizing_mode="stretch_width",
),
sizing_mode="scale_width",
)

def format(self, unit: str, val: Any) -> str:
formatters = {"bytes": format_bytes, "seconds": format_time}
return formatters.get(unit, str)(val)

@without_property_validation
@log_errors
def update(self):
items = sorted(
(k, v)
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple)
)
for (context, *parts), val in items:
if context == "get-data":
activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.senddata["activity"]:
self.substantial_change = True
self.senddata["activity"].append(activity)

idx = self.senddata["activity"].index(activity)
while idx >= len(self.senddata[f"{activity}_{unit}"]):
self.senddata[f"{activity}_{unit}"].append(0)
self.senddata[f"{activity}_{unit}_text"].append("")
self.senddata[f"{activity}_{unit}_text"][idx] = self.format(unit, val)
self.senddata[f"{activity}_{unit}"][idx] = val

elif context == "execute":
prefix, activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.task_activities:
self.substantial_change = True
self.task_activities.append(activity)

if prefix not in self.task_exec_data["functions"]:
self.substantial_change = True
self.function_selector.options.append(prefix)
self.task_exec_data["functions"].append(prefix)
self.task_exec_data["timestamp"].append(datetime.utcnow())
idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
for op in self.task_activities:
while len(self.task_exec_data[f"{op}_{unit}"]) != len(
self.task_exec_data["functions"]
):
self.task_exec_data[f"{op}_{unit}"].append(0)
self.task_exec_data[f"{op}_{unit}_text"].append("")

self.task_exec_data[f"{activity}_{unit}"][idx] = val
self.task_exec_data[f"{activity}_{unit}_text"][idx] = self.format(
unit, val
)

data = self.task_exec_data.copy()
# If user has manually selected function(s) then we are only showing them.
if self.function_selector.value:
indexes = [data["functions"].index(f) for f in self.function_selector.value]
for key, values in data.items():
data[key] = [values[idx] for idx in indexes]

# Otherwise limit those being shown which have 'expired' to be displayed
else:
cutoff = datetime.utcnow() - timedelta(seconds=10)
n_show = len([d for d in data["timestamp"] if d > cutoff]) or 5
for key in data:
data[key] = data[key][-n_show:]
self.task_exec_data_limited = data.copy()

# Show total number of functions to choose from
self.function_selector.title = (
f"Filter by function ({len(self.function_selector.options)}):"
)

task_exec_piechart = self._build_task_execution_by_activity_chart(
self.task_exec_data_limited.copy()
)
task_exec_barchart = self._build_task_execution_by_prefix_chart(
self.task_exec_data_limited.copy()
)
senddata_piechart = self._build_senddata_chart(self.senddata.copy())

# Replacing the child causes small blips if done every iteration vs updating
# renderers, but it's needed when new functions and/or activities show up to
# rerender plot
if self.substantial_change:
self.root.children[-1].children[0] = task_exec_barchart
self.root.children[-1].children[1] = task_exec_piechart
self.root.children[-1].children[2] = senddata_piechart
self.substantial_change = False
else:
self.task_exec_by_prefix_chart.renderers = task_exec_piechart.renderers
self.task_exec_by_activity_chart.renderers = task_exec_barchart.renderers
self.senddata_by_activity_chart.renderers = senddata_piechart.renderers

def _build_task_execution_by_activity_chart(
self, task_exec_data: defaultdict[str, list]
) -> figure:
piechart_data = {}
piechart_data["value"] = [
sum(task_exec_data[f"{op}_{self.unit_selected}"])
for op in self.task_activities
]
piechart_data["text"] = [
self.format(self.unit_selected, v) for v in piechart_data["value"]
]
piechart_data["angle"] = [
(
sum(task_exec_data[f"{activity}_{self.unit_selected}"])
/ sum(piechart_data["value"])
if sum(piechart_data["value"])
else 0 # may not have any bytes movement reported, avoid divide by zero
)
* 2
* math.pi
for activity in self.task_activities
]
piechart_data["color"] = small_palettes["YlGnBu"].get(
len(self.task_activities), []
)
piechart_data["activity"] = self.task_activities
self.task_exec_by_activity_src.data = piechart_data

piechart = figure(
height=500,
sizing_mode="scale_both",
title="Task execution, by activity",
tools="hover",
tooltips="@{activity}: @text",
x_range=(-0.5, 1.0),
)
piechart.axis.axis_label = None
piechart.axis.visible = False
piechart.grid.grid_line_color = None

piechart.wedge(
x=0,
y=1,
radius=0.4,
start_angle=cumsum("angle", include_zero=True),
end_angle=cumsum("angle"),
line_color="white",
fill_color="color",
legend_field="activity",
source=self.task_exec_by_activity_src,
)
return piechart

def _build_task_execution_by_prefix_chart(
self, task_exec_data: defaultdict[str, list]
) -> figure:
barchart = figure(
x_range=task_exec_data["functions"],
height=500,
sizing_mode="scale_both",
title="Task execution, by function",
tools="pan,wheel_zoom,box_zoom,reset",
)
barchart.yaxis.visible = False
barchart.xaxis.major_label_orientation = 0.2
barchart.grid.grid_line_color = None
stackers = [
name for name in task_exec_data if name.endswith(self.unit_selected)
]
if stackers:
renderers = barchart.vbar_stack(
stackers,
x="functions",
width=0.9,
source=self.task_exec_by_prefix_src,
color=small_palettes["YlGnBu"].get(len(self.task_activities), []),
legend_label=self.task_activities,
)
for vbar in renderers:
tooltips = [
(
vbar.name,
f"@{{{vbar.name}_text}}",
),
("function", "@functions"),
]
barchart.add_tools(HoverTool(tooltips=tooltips, renderers=[vbar]))

if any(
len(self.task_exec_by_prefix_src.data[k]) != len(task_exec_data[k])
for k in self.task_exec_by_prefix_src.data
):
self.substantial_change = True

self.task_exec_by_prefix_src.data = dict(task_exec_data)
barchart.renderers = renderers
return barchart

def _build_senddata_chart(self, senddata: defaultdict[str, list]) -> figure:
piedata = {}
piedata["activity"] = senddata["activity"]
piedata["value"] = [
(sum(senddata[f"{op}_{self.unit_selected}"])) for op in senddata["activity"]
]
piedata["text"] = [self.format(self.unit_selected, v) for v in piedata["value"]]
piedata["angle"] = [
(
(sum(senddata[f"{op}_{self.unit_selected}"]) / sum(piedata["value"]))
if sum(piedata["value"])
else 0.0
)
* 2
* math.pi
for op in piedata["activity"]
]
piedata["color"] = small_palettes["YlGnBu"].get(len(piedata["activity"]), [])

self.sendsrc.data = piedata
senddata_piechart = figure(
height=500,
sizing_mode="scale_both",
title="Send data, by activity",
tools="hover",
tooltips="@{activity}: @text",
x_range=(-0.5, 1.0),
)
senddata_piechart.wedge(
x=0,
y=1,
radius=0.4,
start_angle=cumsum("angle", include_zero=True),
end_angle=cumsum("angle"),
line_color="white",
fill_color="color",
legend_field="activity",
source=self.sendsrc,
)
senddata_piechart.axis.axis_label = None
senddata_piechart.axis.visible = False
senddata_piechart.grid.grid_line_color = None
return senddata_piechart


class Contention(DashboardComponent):
"""
Event Loop Health (and GIL Contention, if configured)
Expand Down
2 changes: 2 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Contention,
CurrentLoad,
ExceptionsTable,
FinePerformanceMetrics,
MemoryByKey,
Occupancy,
SystemMonitor,
Expand Down Expand Up @@ -112,6 +113,7 @@
"/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500),
"/individual-scheduler-system": individual_doc(SystemMonitor, 500),
"/individual-contention": individual_doc(Contention, 500),
"/individual-fine-performance-metrics": individual_doc(FinePerformanceMetrics, 500),
"/individual-profile": individual_profile_doc,
"/individual-profile-server": individual_profile_server_doc,
"/individual-gpu-memory": gpu_memory_doc,
Expand Down
16 changes: 16 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Contention,
CurrentLoad,
Events,
FinePerformanceMetrics,
Hardware,
MemoryByKey,
MemoryColor,
Expand Down Expand Up @@ -328,6 +329,21 @@ async def test_WorkersMemory(c, s, a, b):
assert all(d["width"])


@gen_cluster(client=True)
async def test_FinePerformanceMetrics(c, s, a, b):
cl = FinePerformanceMetrics(s)

futures = c.map(slowinc, range(10), delay=0.001)
await wait(futures)
await asyncio.sleep(1) # wait for metrics to arrive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit (not worthy of holding off the merge): I'd like us to be very conservative with this kind of sleeping since it artificially slows down our test suite.

Ideally, there was a hook we could listen to to know when we can check our assertions. This hook often doesn't exist but we can still implement the test more forgivingly, e.g.

start = time()
while True:
    try:
        assert not cl.task_exec_data

        cl.update()
        assert cl.task_exec_data
        assert cl.task_exec_data["functions"] == ["slowinc"]
        break
    except AssertionError:
        if time() - start > 1:
            raise
        await asyncio.sleep(0.01)


assert not cl.task_exec_data

cl.update()
assert cl.task_exec_data
assert cl.task_exec_data["functions"] == ["slowinc"]


@gen_cluster(client=True)
async def test_ClusterMemory(c, s, a, b):
cl = ClusterMemory(s)
Expand Down
Loading