-
-
Notifications
You must be signed in to change notification settings - Fork 737
Description
I've experienced a deadlock around a worker's connection to the scheduler breaking and trying to restart. I'm opening this general issue to track all the problems and explain the situation; I'll open sub-issues for specific things to fix.
I believe the flow is something like this:
- An unhandled error in a stream handler on the worker occurs (
handle_compute_task
in this case). Server.handle_stream
terminates, and closes the comm as its final act.- The closing of the comm causes the
BatchedSend
to abort itself, settingself.please_stop=True
- The error from
handle_stream
causeshandle_scheduler
to try to restart the connection to the scheduler - The connection appears to be restarted (because the BatchedSend has been given a new, working comm), but the
_background_send
coroutine isn’t actually running (becauseplease_stop
was never set back to False)! - So every
batched_stream.send
appears to succeed, but the messages actually just sit in the buffer forever - The scheduler thinks nothing is happening
Full narrative of me trying to figure out what's going on
The scheduler thinks a couple workers are processing some tasks, but those workers aren’t actually running any tasks and have nothing in ready. Additionally, both workers have 40-60 messages in their BatchedSend buffer but are not sending them. Those messages include things like 'task-finished'
and 'task-erred'
. So that’s causing the deadlock: the info the scheduler needs to assign new tasks is stuck in the BatchedStream buffer.
Why? I did notice Connection to scheduler broken. Reconnecting...
log messages on those workers. Reading through Worker.handle_scheduler
where that message prints, the error recovery seems to be scheduling a new heartbeat
to run soon. I think the intent is that this will call into _register_with_scheduler
once again, which will restart the comm and call self.batched_stream.start
again with the new comm, restarting it?
However I don’t think a BatchedSend
is actually restartable. When the _background_send
coroutine hits an exception, it calls self.abort()
. This sets please_stop=True
(and drops the buffer containing potentially critical messages!).
If you call BatchedSend.start
again after this though, please_stop
is still True. So the _background_send
coroutine will terminate immediately without sending anything.
Any subsequent messages written to the BatchedSend
will just sit there and never be sent. However, send
won’t actually raise an error, because send
only checks if the comm is closed, not if the BatchedSend
itself is closed.
I’m a little confused by this because if I’m reading it right, how was reconnection ever supposed to work? The BatchedSend doesn’t seem at all restartable, yet we’re acting like it is.
As for why the connection broke in the first place? I see this in the logs:
Traceback (most recent call last):
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1237, in handle_scheduler
await self.handle_stream(
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/core.py", line 564, in handle_stream
handler(**merge(extra, msg))
File "/opt/conda/envs/coiled/lib/python3.9/site-packages/distributed/worker.py", line 1937, in handle_compute_task
self.tasks[key].nbytes = value
KeyError: "('slices-40d6794b777d639e9440f8f518224cfd', 2, 1)"
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
The worker’s handle_compute_task
is trying to set nbytes
for a dependency that has no task state yet. This seems broken but I don’t know why it can happen yet.
But anyway, this exception propagates up to the core Server.handle_stream
method. And interestingly, we don’t try-except each call to stream handlers. If a stream handler ever fails, we abort stream handling, close the comm, and raise an error.
And that error from handle_stream
is what causes handle_scheduler
to try to reconnect, which doesn’t actually work because the BatchedSend is stopped.
There are two problems here:
- General: Properly support restarting
BatchedSend
#5481. I believe that any unhandled error in a stream handler would putWorker.batched_stream
into a broken state upon reconnection, where nosends
actually get sent. - Specific: KeyError in
Worker.handle_compute_task
(causes deadlock) #5482
I also notice a couple other issues:
-
BatchedSend
is dropping all buffered messages when it hits an error. Those messages still could (and must!) be sent upon reconnection. Dropping these messages will cause other deadlocks. Do not drop BatchedSend payload if worker reconnects #5457 addresses this. - Is it intended that any error from a handler makes
Server.handle_stream
close the comm? #5483 Should we not have try-excepts around eachhandler()
call?
As it currently stands, I think #5457 will fix this deadlock. However, I'm concerned about the overall design of BatchedSend
. It has neither an interface nor tests to support restarting after it's been closed. However, we're using it this way. If we want it to support reconnection, we should refactor and test it to do so. It also does not seem to have clear invariants about what its internal state means, and does not validate parts of its state when it probably should. It's a very old piece of code, still using Tornado. Overall, it might be due for a refresh.