Closed
Description
If we switch the #6283 script from client.run(dask_worker.close())
-style to client.retire_workers
, and release the break_worker
task so it stops constantly breaking worker comms, it no longer errors during scheduler transitions, but instead deadlocks.
All the tasks eventually end up in memory on the scheduler, but the client isn't notified, and thinks the futures are still pending.
Again, not minimized or investigated at all. Maybe AMM-related since retire_workers
is involved? I'm guessing this is just a scheduler<->client issue, not worker-related, since the dashboard shows 10 tasks in memory while it's deadlocked.
import time
import distributed
def break_worker():
worker = distributed.get_worker()
print(f"breaking {worker}")
worker.batched_stream.comm.abort()
async def close_worker(dask_worker):
print(f"Shutting down {dask_worker}")
try:
await dask_worker.close(report=True, nanny=False)
except Exception:
import sys
import traceback
print("Failed to close worker cleanly, exiting")
traceback.print_exc()
sys.exit(0)
EXPECTED_TOTAL_RUNTIME = 5
if __name__ == "__main__":
with distributed.Client(
n_workers=2, processes=True, threads_per_worker=2
) as client:
print(client.dashboard_link)
input("Press enter to start")
fs = client.map(time.sleep, [2] * 10, pure=False)
b = client.submit(break_worker)
while True:
try:
print("waiting")
distributed.wait(fs, timeout=EXPECTED_TOTAL_RUNTIME)
except distributed.TimeoutError:
print("timed out")
stuck_keys = {f.key for f in fs if not f.done()}
else:
print("success!")
break
b = None
print("getting processing")
processing = client.processing()
stuck_workers = {
addr
for addr, tasks in processing.items()
if stuck_keys.intersection(tasks)
}
print(f"{stuck_keys} are stuck on {stuck_workers}")
client.retire_workers(list(stuck_workers))
input("Done")
http://127.0.0.1:8787/status
Press enter to start
waiting
breaking <Worker 'tcp://127.0.0.1:51914', name: 1, status: running, stored: 2, running: 2/2, ready: 2, comm: 0, waiting: 0>
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-2', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-3', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-9', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-4'} are stuck on {'tcp://127.0.0.1:51915', 'tcp://127.0.0.1:51914'}
2022-05-05 13:50:50,433 - distributed.active_memory_manager - WARNING - Tried retiring worker tcp://127.0.0.1:51914, but 2 tasks could not be moved as there are no suitable workers to receive them. The worker will not be retired.
2022-05-05 13:50:50,433 - distributed.active_memory_manager - WARNING - Tried retiring worker tcp://127.0.0.1:51915, but 4 tasks could not be moved as there are no suitable workers to receive them. The worker will not be retired.
waiting
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-2', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-3', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on {'tcp://127.0.0.1:51914'}
waiting
2022-05-05 13:50:55,454 - distributed.nanny - ERROR - Worker process died unexpectedly
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on set()
waiting
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on set()
waiting
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on set()
waiting
^CTraceback (most recent call last):
File "/Users/gabe/dev/dask-playground/deadlock/blocker.py", line 40, in <module>
distributed.wait(fs, timeout=EXPECTED_TOTAL_RUNTIME)
File "/Users/gabe/dev/distributed/distributed/client.py", line 4681, in wait
result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
File "/Users/gabe/dev/distributed/distributed/utils.py", line 318, in sync
return sync(
File "/Users/gabe/dev/distributed/distributed/utils.py", line 381, in sync
wait(10)
File "/Users/gabe/dev/distributed/distributed/utils.py", line 370, in wait
return e.wait(timeout)
File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 574, in wait
signaled = self._cond.wait(timeout)
File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 316, in wait
gotit = waiter.acquire(True, timeout)
KeyboardInterrupt