Closed
Description
Hello,
I'm using dask to analyse some images, my code looks like this :
somefunc(n):
try:
long calculation
return result
except:
return to be filtered result
def main():
num_image = 268828
X = np.empty((num_image,7), dtype=np.float32)
cluster = SLURMCluster(cores=32,
processes=32,
interface ="ib0",
walltime="04:10:00",
memory = "80GB")
cluster.scale(600)
client = Client(cluster)
with tqdm (total = num_image) as pbar:
future = client.map(somefunc, range(num_image))
for future,result in as_completed(future,with_results = True):
X[result[0],:] = result
pbar.update(1)
future.release()
return X
if __name__ == '__main__':
x = main()
io.savemat('/gpfs/home/savedata.mat', {'data': x})
The scripts very well excepts for some iterations and it stays stuck without doing some iterations
100%|█████████▉| 268711/268828 [3:40:11<00:31, 3.68it/s]
Looking at the log files I found that it first looped on a first error :
Traceback (most recent call last):
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/worker.py", line 875, in heartbeat
metrics=await self.get_metrics(),
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/core.py", line 747, in send_recv_from_rpc
comm = await self.pool.connect(self.addr)
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/core.py", line 874, in connect
connection_args=self.connection_args,
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/comm/core.py", line 227, in connect
_raise(error)
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/comm/core.py", line 204, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://192.168.11.3:16892' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x2b1a8a2bc630>: ConnectionRefusedError: [Errno 111] Connection refused
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2b1a32811c88>>, <Task finished coro=<Worker.heartbeat() done, defined at /gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/worker.py:866> exception=OSError("Timed out trying to connect to 'tcp://192.168.11.3:16892' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x2b1a8a346c50>: ConnectionRefusedError: [Errno 111] Connection refused",)>)
Traceback (most recent call last):
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/comm/core.py", line 215, in connect
quiet_exceptions=EnvironmentError,
tornado.util.TimeoutError: Timeout
And then I think it's when the wall time as been reached it loops on another one :
Traceback (most recent call last):
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/tornado/ioloop.py", line 907, in _run
return self.callback()
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/worker.py", line 857, in <lambda>
lambda: self.batched_stream.send({"op": "keep-alive"}),
File "/gpfs/home/maxlavaud/miniconda3/envs/env_dask/lib/python3.6/site-packages/distributed/batched.py", line 117, in send
raise CommClosedError
distributed.comm.core.CommClosedError
Also I've been warned that there is some process emited by dask that are using a lot of memory (10-100MB) on the login node, I don't quit understand where do they come from.