Open
Description
From https://dask.discourse.group/t/only-1-worker-is-running-when-the-dag-is-forking/2192
Non-root tasks that declare resources do not evenly distribute on the cluster, instead piling up on a single worker.
import time
import dask
from distributed import Client
@dask.delayed
def f():
return 1
@dask.delayed
def g(x, y):
time.sleep(2)
return x + y
ops = []
root = f()
#root = 1
for i in range(4):
with dask.annotate(resources={"cores": 100}):
nonroot = g(root, i)
ops.append(nonroot)
with Client(n_workers=2, threads_per_worker=4, resources={"cores": 128}):
t0 = time.time()
dask.compute(*ops)
t1 = time.time()
print("compute time:", t1 - t0)
Expected: 4s
Actual: 8s
distributed.scheduler.worker-saturation
does not seem to make a difference. Having less or more than 5 tasks (the threshold for is_rootish
) doesn't seem to have an impact (as long as you have less tasks than threads).
Uncommenting root = 1
, thus making the tasks with resources actually root (not just rootish) makes the issue disappear.
After increasing the number of tasks from 6 to 100, this is what I see on the dashboard: