Skip to content

Cannot close cluster after interrupt (ctrl-c) #444

@cagantomer

Description

@cagantomer

What happened:

I am using KubeCluster for running ad-hoc dask cluster as part of my script.

I am trying to gracefully shut down the cluster in case there was an interrupt (SIGINT / ctrl-C, SIGKILL) with the example below.

When the scripts runs without interrupt, it will correctly clean up the cluster created. When interrupt it, with ctrl-c, the handler catches the exception, it but calling client.cluster.close in the finally clause raises a timeout error (see below). The workers and scheduler pods remain active and not terminated.

What you expected to happen:
The pods of the ad-hoc cluster should terminate and there should not be any exception.

Minimal Complete Verifiable Example:

from pydoc import cli
import time
import signal
import random

from dask_kubernetes import KubeCluster, make_pod_spec
from dask.distributed import Client, wait


def get_client() -> Client:
    """Get a (dask) client for remote"""

    num_workers = 5
    image = "daskdev/dask:2022.3.0-py3.9"

    pod_spec = make_pod_spec(
        image=image,
        memory_request="4G",
        cpu_request=1,
        extra_pod_config={"nodeSelector": {"env": "sw"}},
    )

    name = f"dask-k8s-example-{{uuid}}"        

    cluster = KubeCluster(
        pod_spec, namespace="research", name=name, n_workers=num_workers
    )

    print(
        f"started KubeCluster - dashboard address is {cluster.dashboard_link}, scheduler address: {cluster.scheduler.address}"
    )

    client = Client(cluster)

    # TODO: add timeout (and maybe make this optional)
    client.wait_for_workers(num_workers)

    return client


def double(value: int):
    time.sleep(random.random() * 3)
    return value + value

def square(value: int):
    time.sleep(random.random() * 4)
    return value * value

if __name__ == "__main__":
    client = get_client()
    
    def handle_signal(sig, frame):
        print(f"got a signal {sig} with frame {frame}")
        raise Exception("handled ctrl-c")

    # signal.signal(signal.SIGINT, handle_signal)

    try:
        tasks = []
        for i in range(20):
            ipi = client.submit(double, i)
            isq = client.submit(square, ipi)
            tasks.append(isq)

        print("waiting for tasks to finish...")
        wait(tasks)

        for task in tasks:
            if task.status == "error":
                print("error in task", task)
            else:
                print(f"task {task}. results {task.result()}")
    except KeyboardInterrupt:
        print("ctrl-c (through KeyboardInterrupt)")
    except Exception as ex:
        print(ex)
    finally:
        print("closing...") 
        client.cluster.close()
        client.close()

The output from execution:

python bin/test_dask_kubecluster.py
Creating scheduler pod on cluster. This may take some time.
/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/client.py:1283: VersionMismatchWarning: Mismatched versions found

+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| blosc | 1.10.6 | 1.10.2 | None |
| lz4 | None | 3.1.10 | None |
+---------+--------+-----------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
started KubeCluster - dashboard address is http://localhost:8787/status, scheduler address: tcp://dask-k8s-example-f47db833-b.research:8786
waiting for tasks to finish...
^Cctrl-c (through KeyboardInterrupt)
closing...
2022-04-27 09:13:57,071 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x110361be0>>, <Task finished name='Task-165' coro=<SpecCluster._correct_state_internal() done, defined at /Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py:314> exception=OSError('Timed out trying to connect to tcp://localhost:56654 after 30 s')>)
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect
stream = await self.client.connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close
await self._correct_state()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc
comm = await self.live_comm()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm
comm = await connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
raise OSError(
OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect
stream = await self.client.connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/tomercagan/dev/nextresearch/bin/test_dask_kubecluster.py", line 79, in
client.cluster.close()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/cluster.py", line 193, in close
return self.sync(self._close, callback_timeout=timeout)
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 309, in sync
return sync(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 376, in sync
raise exc.with_traceback(tb)
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
result = yield future
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close
await self._correct_state()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc
comm = await self.live_comm()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm
comm = await connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
raise OSError(
OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s
2022-04-27 09:13:57,171 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client

Anything else we need to know?:

Click ctrl-c once "waiting for tasks to finish..." appears on the screen.

Also, not related, but the dashboard_link always shows localhost instead of something more useful (I recall reading a discussion about it - but it was a while ago).

Environment:

  • Dask version: 2022.3.0
  • dask-kubernetes: 2022.1.0
  • Python version: 3.9.10
  • Operating System: Linux (based on daskdev/dask image)
  • Install method (conda, pip, source): docker / kubernetes

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions