-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
Traceback (most recent call last):
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1237, in handle_scheduler
await self.handle_stream(
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 564, in handle_stream
handler(**merge(extra, msg))
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1937, in handle_compute_task
self.tasks[key].nbytes = value
KeyError: "('slices-40d6794b777d639e9440f8f518224cfd', 2, 1)"
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
I may be able to reproduce this if necessary. I was running a stackstac example notebook on binder against a Coiled cluster over wss, where the particular versions of things were causing a lot of errors (unrelated to dask). So I was frequently rerunning the same tasks, cancelling them, restarting the client, rerunning, etc. Perhaps this cancelling, restarting, rerunning is related?
@fjetter says
The only reason this keyerror can appear is if the compute instruction transitions its own dependency into a forgotten state which is very, very wrong.
Relevant code, ending at the line where the error occurs:
distributed/distributed/worker.py
Lines 1852 to 1937 in 11c41b5
| def handle_compute_task( | |
| self, | |
| *, | |
| key, | |
| function=None, | |
| args=None, | |
| kwargs=None, | |
| task=no_value, | |
| who_has=None, | |
| nbytes=None, | |
| priority=None, | |
| duration=None, | |
| resource_restrictions=None, | |
| actor=False, | |
| annotations=None, | |
| stimulus_id=None, | |
| ): | |
| self.log.append((key, "compute-task", stimulus_id, time())) | |
| try: | |
| ts = self.tasks[key] | |
| logger.debug( | |
| "Asked to compute an already known task %s", | |
| {"task": ts, "stimulus_id": stimulus_id}, | |
| ) | |
| except KeyError: | |
| self.tasks[key] = ts = TaskState(key) | |
| ts.runspec = SerializedTask(function, args, kwargs, task) | |
| if priority is not None: | |
| priority = tuple(priority) + (self.generation,) | |
| self.generation -= 1 | |
| if actor: | |
| self.actors[ts.key] = None | |
| ts.exception = None | |
| ts.traceback = None | |
| ts.exception_text = "" | |
| ts.traceback_text = "" | |
| ts.priority = priority | |
| ts.duration = duration | |
| if resource_restrictions: | |
| ts.resource_restrictions = resource_restrictions | |
| ts.annotations = annotations | |
| recommendations = {} | |
| scheduler_msgs = [] | |
| for dependency in who_has: | |
| dep_ts = self.ensure_task_exists( | |
| key=dependency, | |
| stimulus_id=stimulus_id, | |
| priority=priority, | |
| ) | |
| # link up to child / parents | |
| ts.dependencies.add(dep_ts) | |
| dep_ts.dependents.add(ts) | |
| if ts.state in READY | {"executing", "waiting", "resumed"}: | |
| pass | |
| elif ts.state == "memory": | |
| recommendations[ts] = "memory" | |
| scheduler_msgs.append(self.get_task_state_for_scheduler(ts)) | |
| elif ts.state in { | |
| "released", | |
| "fetch", | |
| "flight", | |
| "missing", | |
| "cancelled", | |
| "error", | |
| }: | |
| recommendations[ts] = "waiting" | |
| else: | |
| raise RuntimeError(f"Unexpected task state encountered {ts} {stimulus_id}") | |
| for msg in scheduler_msgs: | |
| self.batched_stream.send(msg) | |
| self.transitions(recommendations, stimulus_id=stimulus_id) | |
| # We received new info, that's great but not related to the compute-task | |
| # instruction | |
| self.update_who_has(who_has, stimulus_id=stimulus_id) | |
| if nbytes is not None: | |
| for key, value in nbytes.items(): | |
| self.tasks[key].nbytes = value |
Scheduler code producing the message which causes this error:
distributed/distributed/scheduler.py
Lines 7953 to 7994 in 11c41b5
| def _task_to_msg(state: SchedulerState, ts: TaskState, duration: double = -1) -> dict: | |
| """Convert a single computational task to a message""" | |
| ws: WorkerState | |
| dts: TaskState | |
| # FIXME: The duration attribute is not used on worker. We could safe ourselves the time to compute and submit this | |
| if duration < 0: | |
| duration = state.get_task_duration(ts) | |
| msg: dict = { | |
| "op": "compute-task", | |
| "key": ts._key, | |
| "priority": ts._priority, | |
| "duration": duration, | |
| "stimulus_id": f"compute-task-{time()}", | |
| "who_has": {}, | |
| } | |
| if ts._resource_restrictions: | |
| msg["resource_restrictions"] = ts._resource_restrictions | |
| if ts._actor: | |
| msg["actor"] = True | |
| deps: set = ts._dependencies | |
| if deps: | |
| msg["who_has"] = { | |
| dts._key: [ws._address for ws in dts._who_has] for dts in deps | |
| } | |
| msg["nbytes"] = {dts._key: dts._nbytes for dts in deps} | |
| if state._validate: | |
| assert all(msg["who_has"].values()) | |
| task = ts._run_spec | |
| if type(task) is dict: | |
| msg.update(task) | |
| else: | |
| msg["task"] = task | |
| if ts._annotations: | |
| msg["annotations"] = ts._annotations | |
| return msg |