Skip to content

KeyError in Worker.handle_compute_task (causes deadlock) #5482

@gjoseph92

Description

@gjoseph92
Traceback (most recent call last):
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1237, in handle_scheduler
    await self.handle_stream(
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 564, in handle_stream
    handler(**merge(extra, msg))
  File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1937, in handle_compute_task
    self.tasks[key].nbytes = value
KeyError: "('slices-40d6794b777d639e9440f8f518224cfd', 2, 1)"
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...

I may be able to reproduce this if necessary. I was running a stackstac example notebook on binder against a Coiled cluster over wss, where the particular versions of things were causing a lot of errors (unrelated to dask). So I was frequently rerunning the same tasks, cancelling them, restarting the client, rerunning, etc. Perhaps this cancelling, restarting, rerunning is related?

@fjetter says

The only reason this keyerror can appear is if the compute instruction transitions its own dependency into a forgotten state which is very, very wrong.

Relevant code, ending at the line where the error occurs:

def handle_compute_task(
self,
*,
key,
function=None,
args=None,
kwargs=None,
task=no_value,
who_has=None,
nbytes=None,
priority=None,
duration=None,
resource_restrictions=None,
actor=False,
annotations=None,
stimulus_id=None,
):
self.log.append((key, "compute-task", stimulus_id, time()))
try:
ts = self.tasks[key]
logger.debug(
"Asked to compute an already known task %s",
{"task": ts, "stimulus_id": stimulus_id},
)
except KeyError:
self.tasks[key] = ts = TaskState(key)
ts.runspec = SerializedTask(function, args, kwargs, task)
if priority is not None:
priority = tuple(priority) + (self.generation,)
self.generation -= 1
if actor:
self.actors[ts.key] = None
ts.exception = None
ts.traceback = None
ts.exception_text = ""
ts.traceback_text = ""
ts.priority = priority
ts.duration = duration
if resource_restrictions:
ts.resource_restrictions = resource_restrictions
ts.annotations = annotations
recommendations = {}
scheduler_msgs = []
for dependency in who_has:
dep_ts = self.ensure_task_exists(
key=dependency,
stimulus_id=stimulus_id,
priority=priority,
)
# link up to child / parents
ts.dependencies.add(dep_ts)
dep_ts.dependents.add(ts)
if ts.state in READY | {"executing", "waiting", "resumed"}:
pass
elif ts.state == "memory":
recommendations[ts] = "memory"
scheduler_msgs.append(self.get_task_state_for_scheduler(ts))
elif ts.state in {
"released",
"fetch",
"flight",
"missing",
"cancelled",
"error",
}:
recommendations[ts] = "waiting"
else:
raise RuntimeError(f"Unexpected task state encountered {ts} {stimulus_id}")
for msg in scheduler_msgs:
self.batched_stream.send(msg)
self.transitions(recommendations, stimulus_id=stimulus_id)
# We received new info, that's great but not related to the compute-task
# instruction
self.update_who_has(who_has, stimulus_id=stimulus_id)
if nbytes is not None:
for key, value in nbytes.items():
self.tasks[key].nbytes = value

Scheduler code producing the message which causes this error:

def _task_to_msg(state: SchedulerState, ts: TaskState, duration: double = -1) -> dict:
"""Convert a single computational task to a message"""
ws: WorkerState
dts: TaskState
# FIXME: The duration attribute is not used on worker. We could safe ourselves the time to compute and submit this
if duration < 0:
duration = state.get_task_duration(ts)
msg: dict = {
"op": "compute-task",
"key": ts._key,
"priority": ts._priority,
"duration": duration,
"stimulus_id": f"compute-task-{time()}",
"who_has": {},
}
if ts._resource_restrictions:
msg["resource_restrictions"] = ts._resource_restrictions
if ts._actor:
msg["actor"] = True
deps: set = ts._dependencies
if deps:
msg["who_has"] = {
dts._key: [ws._address for ws in dts._who_has] for dts in deps
}
msg["nbytes"] = {dts._key: dts._nbytes for dts in deps}
if state._validate:
assert all(msg["who_has"].values())
task = ts._run_spec
if type(task) is dict:
msg.update(task)
else:
msg["task"] = task
if ts._annotations:
msg["annotations"] = ts._annotations
return msg

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions