Skip to content

Work stealing causes a sudden death of workers without any informative traceback. #7523

Open
@KWiecko

Description

@KWiecko

Describe the issue:

I'm sorry I can't share the code (it is confidential) but I'll try to describe the 'crash' scenario as best I can.

I'll start with brief data description:

Data description:

  • ~10k parquet files
  • ~10k records per file (this results in ~100e6 records in total)
  • each file has 10 columns tops, at most 6 columns are loaded at once
  • files are stored in S3

100e6 records might seem like quite small data set but our process is a bit RAM hungry, e.g. to process 2e6 records we need 2 x ml.m5.xlarge SageMaker machines.

We first faced this problem when we tries to process all records at once (100e6 records stored across ~10k partitions). We used following cluster setups (we also start a worker on the node which runs the scheduler):

  • 10 x ml.m5.4xlarge (640 GBs of RAM and additional 100 GBs of disk space per worker)
  • 4 x ml.m4.16xlarge (1280 GBs of RAM and additional 100 GBs of disk space per worker)

Brief process description:

  1. we spawn a SageMaker cluster with custom image and fully replicated data on each worker
  2. we load parquet data -> we use delayed API for this so dask will decide which files to load on which worker and files will be loaded 'locally' without pushing data through master. This results in the same number of partitions as files
  3. we compute some stats on the input data needed for preprocessing
  4. we perform preprocessing with stats from 3.
  5. we train XGBoost model on preprocessed data

For loading, stats computation and preprocessing DataFrame and Bag APIs are used.
Somewhere during stages 3. and 4. workers start dying without any traceback other than asyncio or TCP connection timeouts / errors if work stealing is enabled:

Example logs from 4 x ml.m4.16xlarge cluster:

Logs from master:

2023-01-31T03:23:58.374+01:00
2023-01-31 02:23:58,347 - distributed.worker - ERROR - failed during get data with tcp://10.0.101.87:42561 -> tcp://10.0.90.223:34087
	2023-01-31 02:23:58,347 - distributed.worker - ERROR - failed during get data with tcp://10.0.101.87:42561 -> tcp://10.0.90.223:34087
	2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
  File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
	Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd return self.socket.recv_into(buf, len(buf))
	2023-01-31T03:23:58.374+01:00
TimeoutError: [Errno 110] Connection timed out
	TimeoutError: [Errno 110] Connection timed out
	2023-01-31T03:23:58.374+01:00
The above exception was the direct cause of the following exception:
	The above exception was the direct cause of the following exception:
	2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
  File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data
    response = await comm.read(deserializers=serializers)
  File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
	Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data response = await comm.read(deserializers=serializers) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read convert_stream_closed_error(self, e) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
	2023-01-31T03:23:58.374+01:00
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
	distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
	2023-01-31T03:23:58.374+01:00
2023-01-31 02:23:58,348 - distributed.core - INFO - Lost connection to 'tcp://10.0.90.223:42906'
	2023-01-31 02:23:58,348 - distributed.core - INFO - Lost connection to 'tcp://10.0.90.223:42906'
	2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
  File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
	Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 869, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/code/venv/lib/python3.8/site-packages/tornado/iostream.py", line 1138, in read_from_fd return self.socket.recv_into(buf, len(buf))
	2023-01-31T03:23:58.374+01:00
TimeoutError: [Errno 110] Connection timed out
	TimeoutError: [Errno 110] Connection timed out
	2023-01-31T03:23:58.374+01:00
The above exception was the direct cause of the following exception:
	The above exception was the direct cause of the following exception:
	2023-01-31T03:23:58.374+01:00
Traceback (most recent call last):
  File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm
    result = await result
  File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data
    response = await comm.read(deserializers=serializers)
  File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
	Traceback (most recent call last): File "/code/venv/lib/python3.8/site-packages/distributed/core.py", line 820, in _handle_comm result = await result File "/code/venv/lib/python3.8/site-packages/distributed/worker.py", line 1767, in get_data response = await comm.read(deserializers=serializers) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 241, in read convert_stream_closed_error(self, e) File "/code/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
	2023-01-31T03:23:58.375+01:00
distributed.comm.core.CommClosedError: in <TCP (closed)  local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out
	distributed.comm.core.CommClosedError: in <TCP (closed) local=tcp://10.0.101.87:42561 remote=tcp://10.0.90.223:42906>: TimeoutError: [Errno 110] Connection timed out

Log from worker:

2023-01-31 02:23:58,002 - distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/10: in <TCP (closed) Ephemeral Worker->Worker for gather local=tcp://10.0.90.223:42906 remote=tcp://10.0.101.87:42561>: ConnectionResetError: [Errno 104] Connection reset by peer

Briefly after such message worker dies. Sometimes it is restarted and killed again by scheduler, sometimes it just dies. Container is stopped once worker is dead.

With work stealing disabled the problem never occurs. Increasing timeouts to 120 seconds or retry attempts to 10 does not help. Along with 'sudden death' problem an increased disk usage can be observed on other workers which attempt to complete the task (?). After first death other workers also die one after another with same errors.

I tried a couple of dask versions (including latest) - error always prevailed.

This seems to be a problem for large clusters and long running jobs / tasks (?) because it makes long runs which fail after quite some time e.g. 4 hours, costly.

Anything else we need to know?:

Unfortunately I can't provide any additional info. I tried to create a self-contained example of this behavior with small synthetic data set but i was unable to reproduce this crash.

Environment:

  • Dask version: 2023.1.1
  • Python version: 3.8
  • Operating System: public.ecr.aws/lambda/python:3.8
  • Install method (conda, pip, source): pip install during docker image build

Metadata

Metadata

Assignees

No one assigned

    Labels

    discussionDiscussing a topic with no specific actions yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions