-
-
Notifications
You must be signed in to change notification settings - Fork 155
Description
What happened:
I am using KubeCluster
for running ad-hoc dask cluster as part of my script.
I am trying to gracefully shut down the cluster in case there was an interrupt (SIGINT / ctrl-C, SIGKILL) with the example below.
When the scripts runs without interrupt, it will correctly clean up the cluster created. When interrupt it, with ctrl-c, the handler catches the exception, it but calling client.cluster.close
in the finally clause raises a timeout error (see below). The workers and scheduler pods remain active and not terminated.
What you expected to happen:
The pods of the ad-hoc cluster should terminate and there should not be any exception.
Minimal Complete Verifiable Example:
from pydoc import cli
import time
import signal
import random
from dask_kubernetes import KubeCluster, make_pod_spec
from dask.distributed import Client, wait
def get_client() -> Client:
"""Get a (dask) client for remote"""
num_workers = 5
image = "daskdev/dask:2022.3.0-py3.9"
pod_spec = make_pod_spec(
image=image,
memory_request="4G",
cpu_request=1,
extra_pod_config={"nodeSelector": {"env": "sw"}},
)
name = f"dask-k8s-example-{{uuid}}"
cluster = KubeCluster(
pod_spec, namespace="research", name=name, n_workers=num_workers
)
print(
f"started KubeCluster - dashboard address is {cluster.dashboard_link}, scheduler address: {cluster.scheduler.address}"
)
client = Client(cluster)
# TODO: add timeout (and maybe make this optional)
client.wait_for_workers(num_workers)
return client
def double(value: int):
time.sleep(random.random() * 3)
return value + value
def square(value: int):
time.sleep(random.random() * 4)
return value * value
if __name__ == "__main__":
client = get_client()
def handle_signal(sig, frame):
print(f"got a signal {sig} with frame {frame}")
raise Exception("handled ctrl-c")
# signal.signal(signal.SIGINT, handle_signal)
try:
tasks = []
for i in range(20):
ipi = client.submit(double, i)
isq = client.submit(square, ipi)
tasks.append(isq)
print("waiting for tasks to finish...")
wait(tasks)
for task in tasks:
if task.status == "error":
print("error in task", task)
else:
print(f"task {task}. results {task.result()}")
except KeyboardInterrupt:
print("ctrl-c (through KeyboardInterrupt)")
except Exception as ex:
print(ex)
finally:
print("closing...")
client.cluster.close()
client.close()
The output from execution:
python bin/test_dask_kubecluster.py
Creating scheduler pod on cluster. This may take some time.
/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/client.py:1283: VersionMismatchWarning: Mismatched versions found+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| blosc | 1.10.6 | 1.10.2 | None |
| lz4 | None | 3.1.10 | None |
+---------+--------+-----------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
started KubeCluster - dashboard address is http://localhost:8787/status, scheduler address: tcp://dask-k8s-example-f47db833-b.research:8786
waiting for tasks to finish...
^Cctrl-c (through KeyboardInterrupt)
closing...
2022-04-27 09:13:57,071 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x110361be0>>, <Task finished name='Task-165' coro=<SpecCluster._correct_state_internal() done, defined at /Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py:314> exception=OSError('Timed out trying to connect to tcp://localhost:56654 after 30 s')>)
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect
stream = await self.client.connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
asyncio.exceptions.CancelledErrorDuring handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
return fut.result()
asyncio.exceptions.CancelledErrorThe above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutErrorThe above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close
await self._correct_state()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc
comm = await self.live_comm()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm
comm = await connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
raise OSError(
OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/tcp.py", line 444, in connect
stream = await self.client.connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
asyncio.exceptions.CancelledErrorDuring handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 490, in wait_for
return fut.result()
asyncio.exceptions.CancelledErrorThe above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutErrorThe above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/tomercagan/dev/nextresearch/bin/test_dask_kubecluster.py", line 79, in
client.cluster.close()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/cluster.py", line 193, in close
return self.sync(self._close, callback_timeout=timeout)
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 309, in sync
return sync(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 376, in sync
raise exc.with_traceback(tb)
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
result = yield future
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 406, in _close
await self._correct_state()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/deploy/spec.py", line 321, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 844, in send_recv_from_rpc
comm = await self.live_comm()
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/core.py", line 801, in live_comm
comm = await connect(
File "/Users/tomercagan/dev/nsr-env/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
raise OSError(
OSError: Timed out trying to connect to tcp://localhost:56654 after 30 s
2022-04-27 09:13:57,171 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
Anything else we need to know?:
Click ctrl-c once "waiting for tasks to finish..." appears on the screen.
Also, not related, but the dashboard_link always shows localhost instead of something more useful (I recall reading a discussion about it - but it was a while ago).
Environment:
- Dask version: 2022.3.0
- dask-kubernetes: 2022.1.0
- Python version: 3.9.10
- Operating System: Linux (based on daskdev/dask image)
- Install method (conda, pip, source): docker / kubernetes