Skip to content

Round up saturation-factor #7116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ properties:

Up to worker-saturation * nthreads root tasks are sent to a
worker at a time. If `.inf`, all runnable tasks are immediately sent to workers.
The target number is rounded up, so any `worker-saturation` value > 1.0 guarantees
at least one extra task will be sent to workers.

Allowing oversaturation (> 1.0) means a worker may start running a new root task as
soon as it completes the previous, even if there is a higher-priority downstream task
Expand Down Expand Up @@ -360,7 +362,7 @@ properties:
The maximum amount of data for a worker to request from another in a single gather operation

Tasks are gathered in batches, and if the first task in a batch is larger than this value,
the task will still be gathered to ensure progress. Hence, this limit is not absolute.
the task will still be gathered to ensure progress. Hence, this limit is not absolute.
Note that this limit applies to a single gather operation and a worker may gather data from
multiple workers in parallel.
connections:
Expand Down
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8199,7 +8199,7 @@ def _task_slots_available(ws: WorkerState, saturation_factor: float) -> int:
"Number of tasks that can be sent to this worker without oversaturating it"
assert not math.isinf(saturation_factor)
nthreads = ws.nthreads
return max(int(saturation_factor * nthreads), 1) - (
return max(math.ceil(saturation_factor * nthreads), 1) - (
len(ws.processing) - len(ws.long_running)
)

Expand Down
5 changes: 3 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,10 @@ async def test_queued_remove_add_worker(c, s, a, b):
@pytest.mark.parametrize(
"saturation_config, expected_task_counts",
[
(2.5, (5, 2)),
("2.5", (5, 2)),
(2.5, (5, 3)),
("2.5", (5, 3)),
(2.0, (4, 2)),
(1.1, (3, 2)),
(1.0, (2, 1)),
(-1.0, (1, 1)),
(float("inf"), (6, 4))
Expand Down