Skip to content

Commit 23e104a

Browse files
committed
Add special case for inflight tasks without dependents
1 parent d9cb0e9 commit 23e104a

File tree

1 file changed

+59
-15
lines changed

1 file changed

+59
-15
lines changed

distributed/worker.py

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from datetime import timedelta
1616
from inspect import isawaitable
1717
from pickle import PicklingError
18-
from typing import Iterable
18+
from typing import Iterable, Optional
1919

2020
from tlz import first, keymap, merge, pluck # noqa: F401
2121
from tornado import gen
@@ -1477,7 +1477,7 @@ def delete_data(self, comm=None, keys=None, report=True):
14771477
if ts:
14781478
ts.scheduler_holds_ref = False
14791479
self.log.append((key, "delete"))
1480-
self.release_key(key, cause="delete data")
1480+
self.release_key(key, reason="delete data")
14811481

14821482
logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys))
14831483
return "OK"
@@ -2314,9 +2314,16 @@ async def gather_dep(
23142314
self.log.append(("receive-dep", worker, list(response["data"])))
23152315
except EnvironmentError as e:
23162316
logger.exception("Worker stream died during communication: %s", worker)
2317-
self.log.append(("receive-dep-failed", worker))
2318-
for d in self.has_what.pop(worker):
2319-
self.tasks[d].who_has.remove(worker)
2317+
has_what = self.has_what.pop(worker)
2318+
self.log.append(("receive-dep-failed", worker, has_what))
2319+
for d in has_what:
2320+
ts = self.tasks[d]
2321+
# FIXME: We might break the "invariant" that a task in state
2322+
# 'fetch' either has a set attribute who_has or is tracked
2323+
# in missing_flight_dep and is "handled as missing". This
2324+
# leaves the task for a while in an ill defined state
2325+
# What about `pending_data_per_worker`?
2326+
ts.who_has.remove(worker)
23202327

23212328
except Exception as e:
23222329
logger.exception(e)
@@ -2330,23 +2337,33 @@ async def gather_dep(
23302337
busy = response.get("status", "") == "busy"
23312338
data = response.get("data", {})
23322339

2340+
assert to_gather_keys == self.in_flight_workers.get(worker)
23332341
for d in self.in_flight_workers.pop(worker):
23342342

23352343
ts = self.tasks.get(d)
23362344

23372345
if not busy and d in data:
23382346
self.transition(ts, "memory", value=data[d])
23392347
elif ts is None or ts.state == "executing":
2340-
self.release_key(d, cause="already executing at gather")
2341-
continue
2348+
self.log.append(("already-executing", d))
2349+
self.release_key(d, reason="already executing at gather")
2350+
elif ts.state == "flight" and not ts.dependents:
2351+
self.log.append(("flight no-dependents", d))
2352+
self.release_key(
2353+
d, reason="In-flight task no longer has dependents."
2354+
)
23422355
elif ts.state not in ("ready", "memory"):
2356+
self.log.append(("busy?", d, busy))
23432357
self.transition(ts, "fetch")
2344-
2345-
if not busy and d not in data and ts.dependents:
2358+
elif not busy and d not in data and ts.dependents:
23462359
self.log.append(("missing-dep", d))
23472360
self.batched_stream.send(
23482361
{"op": "missing-data", "errant_worker": worker, "key": d}
23492362
)
2363+
else:
2364+
raise RuntimeError(
2365+
f"Unexpected state for {ts} encountered after gather_dep"
2366+
)
23502367

23512368
if self.validate:
23522369
self.validate_state()
@@ -2373,7 +2390,7 @@ def bad_dep(self, dep):
23732390
ts.exception = msg["exception"]
23742391
ts.traceback = msg["traceback"]
23752392
self.transition(ts, "error")
2376-
self.release_key(dep.key, cause="bad dep")
2393+
self.release_key(dep.key, reason="bad dep")
23772394

23782395
async def handle_missing_dep(self, *deps, **kwargs):
23792396
self.log.append(("handle-missing", deps))
@@ -2472,16 +2489,29 @@ def steal_request(self, key):
24722489
# If task is marked as "constrained" we haven't yet assigned it an
24732490
# `available_resources` to run on, that happens in
24742491
# `transition_constrained_executing`
2475-
self.release_key(ts.key, cause="stolen")
2492+
self.release_key(ts.key, reason="stolen")
24762493
if self.validate:
24772494
assert ts.key not in self.tasks
24782495

2479-
def release_key(self, key, cause=None, reason=None, report=True):
2496+
def release_key(
2497+
self,
2498+
key: str,
2499+
cause: Optional[TaskState] = None,
2500+
reason: Optional[str] = None,
2501+
report: bool = True,
2502+
):
24802503
try:
24812504
if self.validate:
24822505
assert isinstance(key, str)
24832506
ts = self.tasks.get(key, TaskState(key=key))
2484-
2507+
logger.debug(
2508+
"Release key %s",
2509+
{
2510+
"key": key,
2511+
"cause": cause,
2512+
"reason": reason,
2513+
},
2514+
)
24852515
if cause:
24862516
self.log.append((key, "release-key", {"cause": cause}))
24872517
else:
@@ -2515,7 +2545,7 @@ def release_key(self, key, cause=None, reason=None, report=True):
25152545
# scheduler allow us to. See also handle_delete_data and
25162546
and not dependency.scheduler_holds_ref
25172547
):
2518-
self.release_key(dependency.key, cause=f"Dependent {ts} released")
2548+
self.release_key(dependency.key, reason=f"Dependent {ts} released")
25192549

25202550
for worker in ts.who_has:
25212551
self.has_what[worker].discard(ts.key)
@@ -3184,7 +3214,21 @@ def validate_task_flight(self, ts):
31843214
def validate_task_fetch(self, ts):
31853215
assert ts.runspec is None
31863216
assert ts.key not in self.data
3187-
assert ts.who_has
3217+
# FIXME This is currently not an invariant since upon comm failure we
3218+
# remove the erroneous worker from all who_has and correct the state
3219+
# upon the next ensure_communicate
3220+
3221+
# if not ts.who_has:
3222+
# # If we do not know who_has for a fetch task, it must be logged in
3223+
# # the missing dep. There should be a handle_missing_dep running for
3224+
# # all of these keys
3225+
3226+
# assert ts.key in self._missing_dep_flight, (
3227+
# ts.key,
3228+
# self.story(ts),
3229+
# self._missing_dep_flight.copy(),
3230+
# self.in_flight_workers.copy(),
3231+
# )
31883232
assert ts.dependents
31893233

31903234
for w in ts.who_has:

0 commit comments

Comments
 (0)