|
26 | 26 | Iterable,
|
27 | 27 | Iterator,
|
28 | 28 | Mapping,
|
29 |
| - Sequence, |
30 | 29 | Set,
|
31 | 30 | )
|
32 | 31 | from contextlib import suppress
|
@@ -2169,43 +2168,17 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
|
2169 | 2168 | # If there were no restrictions, `valid_workers()` didn't subset by `running`.
|
2170 | 2169 | valid_workers = self.running
|
2171 | 2170 |
|
2172 |
| - if ts.dependencies or valid_workers is not None: |
2173 |
| - ws = decide_worker( |
2174 |
| - ts, |
2175 |
| - self.running, |
2176 |
| - valid_workers, |
2177 |
| - partial(self.worker_objective, ts), |
2178 |
| - ) |
2179 |
| - else: |
2180 |
| - # TODO if `is_rootish` would always return True for tasks without dependencies, |
2181 |
| - # we could remove all this logic. The rootish assignment logic would behave |
2182 |
| - # more or less the same as this, maybe without gauranteed round-robin though? |
2183 |
| - # This path is only reachable when `ts` doesn't have dependencies, but its |
2184 |
| - # group is also smaller than the cluster. |
2185 |
| - |
2186 |
| - # Fastpath when there are no related tasks or restrictions |
2187 |
| - worker_pool = self.idle or self.workers |
2188 |
| - # FIXME idle and workers are SortedDict's declared as dicts |
2189 |
| - # because sortedcontainers is not annotated |
2190 |
| - wp_vals = cast("Sequence[WorkerState]", worker_pool.values()) |
2191 |
| - n_workers: int = len(wp_vals) |
2192 |
| - if n_workers < 20: # smart but linear in small case |
2193 |
| - ws = min(wp_vals, key=operator.attrgetter("occupancy")) |
2194 |
| - assert ws |
2195 |
| - if ws.occupancy == 0: |
2196 |
| - # special case to use round-robin; linear search |
2197 |
| - # for next worker with zero occupancy (or just |
2198 |
| - # land back where we started). |
2199 |
| - wp_i: WorkerState |
2200 |
| - start: int = self.n_tasks % n_workers |
2201 |
| - i: int |
2202 |
| - for i in range(n_workers): |
2203 |
| - wp_i = wp_vals[(i + start) % n_workers] |
2204 |
| - if wp_i.occupancy == 0: |
2205 |
| - ws = wp_i |
2206 |
| - break |
2207 |
| - else: # dumb but fast in large case |
2208 |
| - ws = wp_vals[self.n_tasks % n_workers] |
| 2171 | + if self.validate: |
| 2172 | + assert ( |
| 2173 | + ts.dependencies or valid_workers is not None |
| 2174 | + ), f"{ts} should be covered by root-ish case" |
| 2175 | + |
| 2176 | + ws = decide_worker( |
| 2177 | + ts, |
| 2178 | + self.running, |
| 2179 | + valid_workers, |
| 2180 | + partial(self.worker_objective, ts), |
| 2181 | + ) |
2209 | 2182 |
|
2210 | 2183 | if self.validate and ws is not None:
|
2211 | 2184 | assert self.workers.get(ws.address) is ws
|
|
0 commit comments