Skip to content

Commit dba7271

Browse files
committed
define latency penalty as global constant
1 parent f1b351a commit dba7271

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

distributed/scheduler.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
from tornado.ioloop import IOLoop
5454

5555
import dask
56+
import dask.config
5657
from dask.core import get_deps, validate_key
5758
from dask.typing import Key, no_default
5859
from dask.utils import (
@@ -188,6 +189,8 @@
188189
"spans": SpansSchedulerExtension,
189190
"stealing": WorkStealing,
190191
}
192+
LATENCY_PENALTY = dask.config.get("distributed.scheduler.latency-penalty", 0.25)
193+
CONSIDER_HAS_WHAT = dask.config.get("distributed.scheduler.consider-has-what", False)
191194

192195

193196
class ClientState:
@@ -3091,7 +3094,10 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple:
30913094
comm_bytes = sum(
30923095
dts.get_nbytes()
30933096
for dts in ts.dependencies
3094-
if (ws not in dts.who_has and dts not in ws.needs_what)
3097+
if (
3098+
ws not in dts.who_has
3099+
and (not CONSIDER_HAS_WHAT or dts not in ws.needs_what)
3100+
)
30953101
)
30963102

30973103
stack_time = ws.occupancy / ws.nthreads
@@ -3111,9 +3117,7 @@ def worker_objective(self, ts: TaskState, ws: WorkerState) -> tuple:
31113117
# Note: This coincides with DEFAULT_TASK_DURATION / nthreads==2
31123118
# This means that a 2 Thread worker with one task is a tie on
31133119
# start_time with a worker with zero tasks but a trivial transfer
3114-
import dask
3115-
3116-
start_time += dask.config.get("distributed.scheduler.latency-penalty", 0.25)
3120+
start_time += LATENCY_PENALTY
31173121

31183122
# Differences below 10ms are meaningless and we should rather break ties
31193123
# by nbytes

0 commit comments

Comments
 (0)