Skip to content

Round-robin worker selection makes poor choices with worker-saturation > 1.0 #7197

@gjoseph92

Description

@gjoseph92

test_wait_first_completed is failing in #7191, with the worker-saturation value set to 1.1

@gen_cluster(client=True)
async def test_wait_first_completed(c, s, a, b):
event = Event()
x = c.submit(block_on_event, event)
y = c.submit(block_on_event, event)
z = c.submit(inc, 2)
done, not_done = await wait([x, y, z], return_when="FIRST_COMPLETED")
assert done == {z}
assert not_done == {x, y}
assert z.status == "finished"
assert x.status == "pending"
assert y.status == "pending"
await event.set()

It works fine with 1.0, but because of the round-up logic #7116 allowing workers to be oversaturated, fails for 1.1

It blocks forever because the worker with 1 thread gets assigned [block_on_event, inc], and the worker with 2 threads gets assigned [block_on_event]. It should be the other way around.

The culprit has something to do with the round-robin logic that only applies to rare situations like this, where the cluster is small but larger than the TaskGroup being assigned

# TODO if `is_rootish` would always return True for tasks without dependencies,
# we could remove all this logic. The rootish assignment logic would behave
# more or less the same as this, maybe without gauranteed round-robin though?
# This path is only reachable when `ts` doesn't have dependencies, but its
# group is also smaller than the cluster.
# Fastpath when there are no related tasks or restrictions
worker_pool = self.idle or self.workers
# FIXME idle and workers are SortedDict's declared as dicts
# because sortedcontainers is not annotated
wp_vals = cast("Sequence[WorkerState]", worker_pool.values())
n_workers: int = len(wp_vals)
if n_workers < 20: # smart but linear in small case
ws = min(wp_vals, key=operator.attrgetter("occupancy"))
assert ws
if ws.occupancy == 0:
# special case to use round-robin; linear search
# for next worker with zero occupancy (or just
# land back where we started).
wp_i: WorkerState
start: int = self.n_tasks % n_workers
i: int
for i in range(n_workers):
wp_i = wp_vals[(i + start) % n_workers]
if wp_i.occupancy == 0:
ws = wp_i
break

If I update is_rootish like so:

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index cf240240..802df12d 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -3043,6 +3043,8 @@ class SchedulerState:
         """
         if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
             return False
+        if not ts.dependencies:
+            return True
         tg = ts.group
         # TODO short-circuit to True if `not ts.dependencies`?
         return (

the test passes.

cc @fjetter @crusaderky

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions