Skip to content

Deadlock: all keys in memory, but Futures not done #6285

Closed
@gjoseph92

Description

@gjoseph92

If we switch the #6283 script from client.run(dask_worker.close())-style to client.retire_workers, and release the break_worker task so it stops constantly breaking worker comms, it no longer errors during scheduler transitions, but instead deadlocks.

All the tasks eventually end up in memory on the scheduler, but the client isn't notified, and thinks the futures are still pending.

Again, not minimized or investigated at all. Maybe AMM-related since retire_workers is involved? I'm guessing this is just a scheduler<->client issue, not worker-related, since the dashboard shows 10 tasks in memory while it's deadlocked.

import time

import distributed


def break_worker():
    worker = distributed.get_worker()
    print(f"breaking {worker}")
    worker.batched_stream.comm.abort()


async def close_worker(dask_worker):
    print(f"Shutting down {dask_worker}")
    try:
        await dask_worker.close(report=True, nanny=False)
    except Exception:
        import sys
        import traceback

        print("Failed to close worker cleanly, exiting")
        traceback.print_exc()
        sys.exit(0)


EXPECTED_TOTAL_RUNTIME = 5

if __name__ == "__main__":
    with distributed.Client(
        n_workers=2, processes=True, threads_per_worker=2
    ) as client:
        print(client.dashboard_link)
        input("Press enter to start")

        fs = client.map(time.sleep, [2] * 10, pure=False)
        b = client.submit(break_worker)

        while True:
            try:
                print("waiting")
                distributed.wait(fs, timeout=EXPECTED_TOTAL_RUNTIME)
            except distributed.TimeoutError:
                print("timed out")
                stuck_keys = {f.key for f in fs if not f.done()}
            else:
                print("success!")
                break

            b = None

            print("getting processing")
            processing = client.processing()
            stuck_workers = {
                addr
                for addr, tasks in processing.items()
                if stuck_keys.intersection(tasks)
            }

            print(f"{stuck_keys} are stuck on {stuck_workers}")
            client.retire_workers(list(stuck_workers))

        input("Done")
http://127.0.0.1:8787/status
Press enter to start
waiting
breaking <Worker 'tcp://127.0.0.1:51914', name: 1, status: running, stored: 2, running: 2/2, ready: 2, comm: 0, waiting: 0>
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-2', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-3', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-9', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-4'} are stuck on {'tcp://127.0.0.1:51915', 'tcp://127.0.0.1:51914'}
2022-05-05 13:50:50,433 - distributed.active_memory_manager - WARNING - Tried retiring worker tcp://127.0.0.1:51914, but 2 tasks could not be moved as there are no suitable workers to receive them. The worker will not be retired.
2022-05-05 13:50:50,433 - distributed.active_memory_manager - WARNING - Tried retiring worker tcp://127.0.0.1:51915, but 4 tasks could not be moved as there are no suitable workers to receive them. The worker will not be retired.
waiting
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-2', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-3', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on {'tcp://127.0.0.1:51914'}
waiting
2022-05-05 13:50:55,454 - distributed.nanny - ERROR - Worker process died unexpectedly
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on set()
waiting
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on set()
waiting
timed out
getting processing
{'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-1', 'sleep-551598e8-b81f-4a84-97ea-ed4e952e22a6-0'} are stuck on set()
waiting
^CTraceback (most recent call last):
  File "/Users/gabe/dev/dask-playground/deadlock/blocker.py", line 40, in <module>
    distributed.wait(fs, timeout=EXPECTED_TOTAL_RUNTIME)
  File "/Users/gabe/dev/distributed/distributed/client.py", line 4681, in wait
    result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 318, in sync
    return sync(
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 381, in sync
    wait(10)
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 370, in wait
    return e.wait(timeout)
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 574, in wait
    signaled = self._cond.wait(timeout)
  File "/Users/gabe/.pyenv/versions/3.9.1/lib/python3.9/threading.py", line 316, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

Metadata

Metadata

Assignees

No one assigned

    Labels

    deadlockThe cluster appears to not make any progress

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions