Skip to content

Commit 4b00be1

Browse files
authored
Polish parsing of worker-saturation from config (#7255)
1 parent eb96593 commit 4b00be1

File tree

3 files changed

+27
-11
lines changed

3 files changed

+27
-11
lines changed

distributed/distributed-schema.yaml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,13 @@ properties:
118118
How frequently to balance worker loads
119119
120120
worker-saturation:
121-
type: number
122-
exclusiveMinimum: 0
121+
oneOf:
122+
- type: number
123+
exclusiveMinimum: 0
124+
# String "inf", not to be confused with .inf which in YAML means float
125+
# infinity. This is necessary because there's no way to parse a float
126+
# infinity from a DASK_* environment variable.
127+
- enum: [inf]
123128
description: |
124129
Controls how many root tasks are sent to workers (like a `readahead`).
125130

distributed/scheduler.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1646,13 +1646,22 @@ def __init__(
16461646
/ 2.0
16471647
)
16481648

1649-
sat = dask.config.get("distributed.scheduler.worker-saturation")
1650-
try:
1651-
self.WORKER_SATURATION = float(sat)
1652-
except ValueError:
1653-
raise ValueError(
1654-
f"Unsupported `distributed.scheduler.worker-saturation` value {sat!r}. Must be a float."
1649+
self.WORKER_SATURATION = dask.config.get(
1650+
"distributed.scheduler.worker-saturation"
1651+
)
1652+
if self.WORKER_SATURATION == "inf":
1653+
# Special case necessary because there's no way to parse a float infinity
1654+
# from a DASK_* environment variable
1655+
self.WORKER_SATURATION = math.inf
1656+
if (
1657+
not isinstance(self.WORKER_SATURATION, (int, float))
1658+
or self.WORKER_SATURATION <= 0
1659+
):
1660+
raise ValueError( # pragma: nocover
1661+
"`distributed.scheduler.worker-saturation` must be a float > 0; got "
1662+
+ repr(self.WORKER_SATURATION)
16551663
)
1664+
16561665
self.transition_counter = 0
16571666
self._idle_transition_counter = 0
16581667
self.transition_counter_max = transition_counter_max

distributed/tests/test_scheduler.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,12 +505,14 @@ def func(first, second):
505505
"saturation_config, expected_task_counts",
506506
[
507507
(2.5, (5, 3)),
508-
("2.5", (5, 3)),
509508
(2.0, (4, 2)),
510509
(1.1, (3, 2)),
511510
(1.0, (2, 1)),
512-
(-1.0, (1, 1)),
513-
(float("inf"), (6, 4))
511+
(0.1, (1, 1)),
512+
# This is necessary because there's no way to parse a float infinite from
513+
# a DASK_* environment variable
514+
("inf", (6, 4)),
515+
(float("inf"), (6, 4)),
514516
# ^ depends on root task assignment logic; ok if changes, just needs to add up to 10
515517
],
516518
)

0 commit comments

Comments
 (0)