Description
Describe the issue:
I'm sorry I can't share the code (it is confidential) but I'll try to describe the 'crash' scenario as best I can.
I'll start with brief data description:
Data description:
- ~10k parquet files
- ~10k records per file (this results in ~100e6 records in total)
- each file has 10 columns tops, at most 6 columns are loaded at once
- files are stored in S3
100e6 records might seem like quite small data set but our process is a bit RAM hungry, e.g. to process 2e6 records we need 2 x ml.m5.xlarge SageMaker machines.
We first faced this problem when we tries to process all records at once (100e6 records stored across ~10k partitions). We used following cluster setups (we also start a worker on the node which runs the scheduler):
- 10 x ml.m5.4xlarge (640 GBs of RAM and additional 100 GBs of disk space per worker)
- 4 x ml.m4.16xlarge (1280 GBs of RAM and additional 100 GBs of disk space per worker)
Brief process description:
- we spawn a SageMaker cluster with custom image and fully replicated data on each worker
- we load parquet data -> we use delayed API for this so dask will decide which files to load on which worker and files will be loaded 'locally' without pushing data through master. This results in the same number of partitions as files
- we compute some stats on the input data needed for preprocessing
- we perform preprocessing with stats from 3.
- we train XGBoost model on preprocessed data
For loading, stats computation and preprocessing DataFrame and Bag APIs are used.
Somewhere during stages 3. and 4. workers start dying without any traceback other than asyncio or TCP connection timeouts / errors if work stealing is enabled:
Example logs from 4 x ml.m4.16xlarge cluster:
Logs from master:
2023-01-31T03:23:58.374+01:00
2023-01-31 02:23:58,347 - distributed.worker - ERROR - failed during get data with tcp://10.0.101.87:42561 -> tcp://10.0.90.223:34087
2023-01-31 02:23:58,347 - distributed.worker - ERROR - failed during get data with tcp://10.0.101.87:42561 -> tcp://10.0.90.223:34087
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer
bytes_read = self.read_from_fd(buf)
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd
return self.socket.recv_into(buf, len(buf))
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd return self.socket.recv_into(buf, len(buf))
2023-01-31T03:23:58.374+01:00
TimeoutError: [Errno 110] Connection timed out
TimeoutError: [Errno 110] Connection timed out
2023-01-31T03:23:58.374+01:00
The above exception was the direct cause of the following exception:
The above exception was the direct cause of the following exception:
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data
response = await comm.read(deserializers=serializers)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read
convert_stream_closed_error(self, e)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data response = await comm.read(deserializers=serializers) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read convert_stream_closed_error(self, e) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
2023-01-31T03:23:58.374+01:00
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
2023-01-31T03:23:58.374+01:00
2023-01-31 02:23:58,348 - distributed.core - INFO - Lost connection to 'tcp://10.0.90.223:42906'
2023-01-31 02:23:58,348 - distributed.core - INFO - Lost connection to 'tcp://10.0.90.223:42906'
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer
bytes_read = self.read_from_fd(buf)
File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd
return self.socket.recv_into(buf, len(buf))
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd return self.socket.recv_into(buf, len(buf))
2023-01-31T03:23:58.374+01:00
TimeoutError: [Errno 110] Connection timed out
TimeoutError: [Errno 110] Connection timed out
2023-01-31T03:23:58.374+01:00
The above exception was the direct cause of the following exception:
The above exception was the direct cause of the following exception:
2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm
result = await result
File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data
response = await comm.read(deserializers=serializers)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read
convert_stream_closed_error(self, e)
File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm result = await result File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data response = await comm.read(deserializers=serializers) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read convert_stream_closed_error(self, e) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
2023-01-31T03:23:58.375+01:00
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
Log from worker:
2023-01-31 02:23:58,002 - distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/10: in <TCP (closed) Ephemeral Worker->Worker for gather local=tcp://10.0.90.223:42906 remote=tcp://10.0.101.87:42561>: ConnectionResetError: [Errno 104] Connection reset by peer
Briefly after such message worker dies. Sometimes it is restarted and killed again by scheduler, sometimes it just dies. Container is stopped once worker is dead.
With work stealing disabled the problem never occurs. Increasing timeouts to 120 seconds or retry attempts to 10 does not help. Along with 'sudden death' problem an increased disk usage can be observed on other workers which attempt to complete the task (?). After first death other workers also die one after another with same errors.
I tried a couple of dask versions (including latest) - error always prevailed.
This seems to be a problem for large clusters and long running jobs / tasks (?) because it makes long runs which fail after quite some time e.g. 4 hours, costly.
Anything else we need to know?:
Unfortunately I can't provide any additional info. I tried to create a self-contained example of this behavior with small synthetic data set but i was unable to reproduce this crash.
Environment:
- Dask version: 2023.1.1
- Python version: 3.8
- Operating System: public.ecr.aws/lambda/python:3.8
- Install method (conda, pip, source): pip install during docker image build