Skip to content

All tasks without dependencies are root-ish #7221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1569df3
All tasks without dependencies are root-ish
gjoseph92 Oct 28, 2022
a75d057
fix `test_retries`
gjoseph92 Oct 28, 2022
1d9fce9
fix `distributed/diagnostics/tests/test_graph_layout.py::test_states`
gjoseph92 Oct 28, 2022
d20717f
fix `distributed/tests/test_cancelled_state.py::test_executing_cancel…
gjoseph92 Oct 28, 2022
841b0f2
fix `distributed/tests/test_worker_memory.py::test_pause_executor_man…
gjoseph92 Oct 28, 2022
0530ec6
driveby: async_wait_for
gjoseph92 Oct 28, 2022
bb33863
fix `distributed/tests/test_worker_memory.py::test_pause_executor_wit…
gjoseph92 Oct 28, 2022
abf36de
WIP update test_priorities
gjoseph92 Oct 28, 2022
9a040d2
Get test_priorities working
gjoseph92 Oct 31, 2022
d56f7f3
fix `test_states` for queuing disabled
gjoseph92 Oct 31, 2022
55818de
fix `test_retries`
gjoseph92 Oct 31, 2022
0e6901b
skip round robin for queuing
gjoseph92 Oct 31, 2022
a47ac74
Fix co-assignment when group smaller than cluster
gjoseph92 Oct 31, 2022
1cd75cb
`test_profile_plot` determinism
gjoseph92 Oct 31, 2022
913b511
Update distributed/tests/test_priorities.py
gjoseph92 Nov 3, 2022
680c538
Merge branch 'main' into no-deps-is-rootish
crusaderky Nov 3, 2022
4c684a2
Refactor for clarity
crusaderky Nov 3, 2022
c901823
remove dead round-robin code
gjoseph92 Nov 3, 2022
3806164
test `c.submit` doesn't cut in line
gjoseph92 Nov 3, 2022
45b9bbd
remove `test_quiet_cluster_round_robin`
gjoseph92 Nov 3, 2022
0db898d
Special-case no-worker tasks that became root-ish
gjoseph92 Nov 5, 2022
e9be596
Copy over tests
gjoseph92 Nov 5, 2022
5985419
Revert "Copy over tests"
gjoseph92 Nov 5, 2022
c556762
Revert "Special-case no-worker tasks that became root-ish"
gjoseph92 Nov 5, 2022
17f1ba4
actors aren't rootish
gjoseph92 Nov 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distributed/dashboard/tests/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ def test_basic(Component):

@gen_cluster(
client=True,
nthreads=[("", 1)],
clean_kwargs={"threads": False},
config={"distributed.worker.profile.enabled": True},
)
async def test_profile_plot(c, s, a, b):
async def test_profile_plot(c, s, a):
p = ProfilePlot()
assert not p.source.data["left"]
while not len(p.source.data["left"]):
Expand Down
7 changes: 6 additions & 1 deletion distributed/diagnostics/tests/test_graph_layout.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import math
import operator

from distributed import wait
Expand Down Expand Up @@ -45,9 +46,13 @@ async def test_states(c, s, a, b):
s.add_plugin(gl)
await c.submit(sum, c.map(inc, range(5)))

expected = {"waiting", "processing", "memory", "released"}
if math.isfinite(s.WORKER_SATURATION):
expected.add("queued")

while True:
updates = {state for _, state in gl.state_updates}
if updates == {"waiting", "processing", "memory", "released"}:
if updates == expected:
break
await asyncio.sleep(0.01)

Expand Down
69 changes: 26 additions & 43 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
Iterable,
Iterator,
Mapping,
Sequence,
Set,
)
from contextlib import suppress
Expand Down Expand Up @@ -2072,8 +2071,11 @@ def decide_worker_rootish_queuing_disabled(
# Last-used worker is full, unknown, retiring, or paused;
# pick a new worker for the next few tasks
ws = min(pool, key=partial(self.worker_objective, ts))
tg.last_worker_tasks_left = math.floor(
(len(tg) / self.total_nthreads) * ws.nthreads
tg.last_worker_tasks_left = max(
1,
# This can evaluate to 0 for tasks with no dependencies
# (see definition of is_rootish)
int(len(tg) / self.total_nthreads * ws.nthreads),
)

# Record `last_worker`, or clear it on the final task
Expand All @@ -2082,9 +2084,10 @@ def decide_worker_rootish_queuing_disabled(
)
tg.last_worker_tasks_left -= 1

if self.validate and ws is not None:
if self.validate:
assert self.workers.get(ws.address) is ws
assert ws in self.running, (ws, self.running)
assert tg.last_worker_tasks_left >= 0, tg.last_worker_tasks_left

return ws

Expand Down Expand Up @@ -2165,43 +2168,17 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
# If there were no restrictions, `valid_workers()` didn't subset by `running`.
valid_workers = self.running

if ts.dependencies or valid_workers is not None:
ws = decide_worker(
ts,
self.running,
valid_workers,
partial(self.worker_objective, ts),
)
else:
# TODO if `is_rootish` would always return True for tasks without dependencies,
# we could remove all this logic. The rootish assignment logic would behave
# more or less the same as this, maybe without gauranteed round-robin though?
# This path is only reachable when `ts` doesn't have dependencies, but its
# group is also smaller than the cluster.

# Fastpath when there are no related tasks or restrictions
worker_pool = self.idle or self.workers
# FIXME idle and workers are SortedDict's declared as dicts
# because sortedcontainers is not annotated
wp_vals = cast("Sequence[WorkerState]", worker_pool.values())
n_workers: int = len(wp_vals)
if n_workers < 20: # smart but linear in small case
ws = min(wp_vals, key=operator.attrgetter("occupancy"))
assert ws
if ws.occupancy == 0:
# special case to use round-robin; linear search
# for next worker with zero occupancy (or just
# land back where we started).
wp_i: WorkerState
start: int = self.n_tasks % n_workers
i: int
for i in range(n_workers):
wp_i = wp_vals[(i + start) % n_workers]
if wp_i.occupancy == 0:
ws = wp_i
break
else: # dumb but fast in large case
ws = wp_vals[self.n_tasks % n_workers]
if self.validate:
assert (
ts.dependencies or valid_workers is not None
), f"{ts} should be covered by root-ish case"

ws = decide_worker(
ts,
self.running,
valid_workers,
partial(self.worker_objective, ts),
)

if self.validate and ws is not None:
assert self.workers.get(ws.address) is ws
Expand Down Expand Up @@ -2988,10 +2965,16 @@ def is_rootish(self, ts: TaskState) -> bool:
Root-ish tasks are part of a group that's much larger than the cluster,
and have few or no dependencies.
"""
if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
if (
ts.resource_restrictions
or ts.worker_restrictions
or ts.host_restrictions
or ts.actor
):
return False
if not ts.dependencies:
return True
tg = ts.group
# TODO short-circuit to True if `not ts.dependencies`?
return (
len(tg) > self.total_nthreads * 2
and len(tg.dependencies) < 5
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
BlockedGetData,
_LockedCommPool,
assert_story,
async_wait_for,
gen_cluster,
inc,
lock_inc,
Expand Down Expand Up @@ -212,7 +213,7 @@ async def wait_and_raise(*args, **kwargs):
await wait_for_state(f1.key, "executing", w)
# Queue up another task to ensure this is not affected by our error handling
f2 = c.submit(inc, 1, key="f2")
await wait_for_state(f2.key, "ready", w)
await async_wait_for(lambda: len(s.tasks) == 2, 5)

f1.release()
await wait_for_state(f1.key, "cancelled", w)
Expand Down
11 changes: 3 additions & 8 deletions distributed/tests/test_client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,13 @@ def test_unsupported_arguments(client, s, a, b):
def test_retries(client):
args = [ZeroDivisionError("one"), ZeroDivisionError("two"), 42]

with client.get_executor(retries=5, pure=False) as e:
with client.get_executor(retries=6, pure=False) as e:
future = e.submit(varying(args))
assert future.result() == 42

with client.get_executor(retries=4) as e:
with client.get_executor(retries=1) as e:
future = e.submit(varying(args))
result = future.result()
assert result == 42

with client.get_executor(retries=2) as e:
future = e.submit(varying(args))
with pytest.raises(ZeroDivisionError, match="two"):
Comment on lines -216 to -227
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to change these retries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is extremely sensitive to how workers are selected on an idle cluster. It uses this stateful varying utility, which changes its behavior depending on how many times it's been called on that worker.

It had to be changed when the idle-round-robin behavior was added: https://github.com/dask/distributed/pull/4638/files#diff-59af67191283f0c64a3be8ce1f344f49b9d025f8264b77fba5c8250865bde433

So I've had to change it again since it's being removed.

with pytest.raises(ZeroDivisionError):
res = future.result()

with client.get_executor(retries=0) as e:
Expand Down
94 changes: 46 additions & 48 deletions distributed/tests/test_priorities.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
import dask
from dask import delayed

from distributed import Client, Event, Scheduler, Status, Worker, wait
from distributed.utils_test import gen_cluster, inc, slowinc
from distributed import Client, Event, Scheduler, Status, Worker
from distributed.utils_test import (
async_wait_for,
gen_cluster,
inc,
slowinc,
wait_for_state,
)

dinc = delayed(inc)
dslowinc = delayed(slowinc)
Expand All @@ -35,8 +41,8 @@ async def block_worker(
pause : bool
True
When entering the context manager, pause the worker. At exit, wait for all
tasks created inside the context manager to be added to Scheduler.unrunnable
and then unpause the worker.
tasks created inside the context manager to be added to ``Scheduler.unrunnable``
or ``Scheduler.queued`` and then unpause the worker.
False
When entering the context manager, send a dummy long task to the worker. At
exit, wait for all tasks created inside the context manager to reach the
Expand All @@ -51,48 +57,40 @@ async def block_worker(
"""
if pause:
w.status = Status.paused
while s.workers[w.address].status != Status.paused:
await asyncio.sleep(0.01)
await async_wait_for(
lambda: s.workers[w.address].status == Status.paused, timeout=5
)
else:
ev = Event()
clog = c.submit(lambda ev: ev.wait(), ev, key="block_worker")
while "block_worker" not in w.state.tasks:
await asyncio.sleep(0.01)
clog = c.submit(ev.wait, key="block_worker")
await wait_for_state(clog.key, "executing", w)

yield

if ntasks_on_scheduler is None:
ntasks_on_scheduler = len(c.futures)
if ntasks_on_worker is None:
ntasks_on_worker = len(c.futures)
while len(s.tasks) < ntasks_on_scheduler:
await asyncio.sleep(0.01)
await async_wait_for(lambda: len(s.tasks) >= ntasks_on_scheduler, timeout=5)

if pause:
assert len(s.unrunnable) == ntasks_on_worker
assert (
len(s.unrunnable) == ntasks_on_worker or len(s.queued) == ntasks_on_worker
)
assert not w.state.tasks
w.status = Status.running
else:
while len(w.state.tasks) < ntasks_on_worker:
await asyncio.sleep(0.01)
await ev.set()
await clog
del clog
while "block_worker" in s.tasks:
await asyncio.sleep(0.01)
await async_wait_for(lambda: "block_worker" not in s.tasks, timeout=5)


def gen_blockable_cluster(test_func):
"""Generate a cluster with 1 worker and disabled memory monitor,
to be used together with ``async with block_worker(...):``.
"""
return pytest.mark.parametrize(
"pause",
[
pytest.param(False, id="queue on worker"),
pytest.param(True, id="queue on scheduler"),
],
Comment on lines -92 to -94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the ids?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They weren't accurate anymore. I could call then pause and clog or something.

)(
return pytest.mark.parametrize("pause", [False, True])(
gen_cluster(
client=True,
nthreads=[("", 1)],
Expand All @@ -109,11 +107,11 @@ async def test_submit(c, s, a, pause):
clog = c.submit(lambda ev: ev.wait(), ev, key="clog")
high = c.submit(inc, 2, key="high", priority=1)

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert s.tasks[low.key].state == "processing"
await high
assert s.tasks[clog.key].state == "processing"
assert s.tasks[low.key].state != "memory"
await ev.set()
await wait(low)
await low


@gen_blockable_cluster
Expand All @@ -124,12 +122,12 @@ async def test_map(c, s, a, pause):
clog = c.submit(lambda ev: ev.wait(), ev, key="clog")
high = c.map(inc, [4, 5, 6], key=["h1", "h2", "h3"], priority=1)

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert all(s.tasks[fut.key].state == "processing" for fut in low)
await c.gather(high)
assert s.tasks[clog.key].state == "processing"
assert all(s.tasks[fut.key].state != "memory" for fut in low)
await ev.set()
await clog
await wait(low)
await c.gather(low)


@gen_blockable_cluster
Expand All @@ -140,12 +138,12 @@ async def test_compute(c, s, a, pause):
clog = c.submit(lambda ev: ev.wait(), ev, key="clog")
high = c.compute(dinc(2, dask_key_name="high"), priority=1)

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert s.tasks[low.key].state == "processing"
await high
assert s.tasks[clog.key].state == "processing"
assert s.tasks[low.key].state != "memory"
await ev.set()
await clog
await wait(low)
await low


@gen_blockable_cluster
Expand All @@ -156,12 +154,12 @@ async def test_persist(c, s, a, pause):
clog = c.submit(lambda ev: ev.wait(), ev, key="clog")
high = dinc(2, dask_key_name="high").persist(priority=1)

await wait(high)
assert all(ws.processing for ws in s.workers.values())
assert s.tasks[low.key].state == "processing"
await high
assert s.tasks[clog.key].state == "processing"
assert s.tasks[low.key].state != "memory"
await ev.set()
await wait(clog)
await wait(low)
await clog
await low


@gen_blockable_cluster
Expand All @@ -176,11 +174,11 @@ async def test_annotate_compute(c, s, a, pause):
async with block_worker(c, s, a, pause):
low, clog, high = c.compute([low, clog, high], optimize_graph=False)

await wait(high)
assert s.tasks[low.key].state == "processing"
await high
assert s.tasks[low.key].state != "memory"
await ev.set()
await wait(clog)
await wait(low)
await clog
await low


@gen_blockable_cluster
Expand All @@ -195,11 +193,11 @@ async def test_annotate_persist(c, s, a, pause):
async with block_worker(c, s, a, pause):
low, clog, high = c.persist([low, clog, high], optimize_graph=False)

await wait(high)
assert s.tasks[low.key].state == "processing"
await high
assert s.tasks[low.key].state != "memory"
await ev.set()
await wait(clog)
await wait(low)
await clog
await low


@gen_blockable_cluster
Expand Down
Loading