Skip to content

Commit 52e0a88

Browse files
committed
Simplify decide_worker
1 parent 4cf9baf commit 52e0a88

File tree

1 file changed

+6
-30
lines changed

1 file changed

+6
-30
lines changed

distributed/scheduler.py

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,36 +1839,12 @@ def decide_worker(self, ts: TaskState) -> WorkerState | None:
18391839
tg.last_worker_tasks_left -= 1
18401840
return ws
18411841

1842-
if ts.dependencies or valid_workers is not None:
1843-
ws = decide_worker(
1844-
ts,
1845-
self.workers.values(),
1846-
valid_workers,
1847-
partial(self.worker_objective, ts),
1848-
)
1849-
else:
1850-
# Fastpath when there are no related tasks or restrictions
1851-
worker_pool = self.idle or self.workers
1852-
wp_vals = worker_pool.values()
1853-
n_workers: int = len(wp_vals)
1854-
if n_workers < 20: # smart but linear in small case
1855-
ws = min(wp_vals, key=operator.attrgetter("occupancy"))
1856-
assert ws
1857-
if ws.occupancy == 0:
1858-
# special case to use round-robin; linear search
1859-
# for next worker with zero occupancy (or just
1860-
# land back where we started).
1861-
wp_i: WorkerState
1862-
start: int = self.n_tasks % n_workers
1863-
i: int
1864-
for i in range(n_workers):
1865-
wp_i = wp_vals[(i + start) % n_workers]
1866-
if wp_i.occupancy == 0:
1867-
ws = wp_i
1868-
break
1869-
else: # dumb but fast in large case
1870-
ws = wp_vals[self.n_tasks % n_workers]
1871-
1842+
ws = decide_worker(
1843+
ts,
1844+
self.workers.values(),
1845+
valid_workers,
1846+
partial(self.worker_objective, ts),
1847+
)
18721848
if self.validate and ws is not None:
18731849
assert ws.address in self.workers
18741850

0 commit comments

Comments
 (0)