Description
Whereas the following is not a bug we can pinpoint, it really bothers us. After weeks of experimentation and investigation we didn't find a real solution.
We run something like
(a_bag
.repartition(100_000) # outputs small parttitions
.map(very_cpu_heavy_computation) # outputs ~4MB partitions
.repartition(1000).to_dataframe().to_parquet() # writes ~30MB parquet files from ~400MB in-memory-partitions
)
on 100 workers with 1500 cpus on Kubernetes pods.
When it comes to the last re-partition step, dask starts blocking the event loop and the workers start spamming the following logs
distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/2: Timed out trying to connect to 'tcp://10.2.3.20:38449' after 20 s: Timed out trying to connect to 'tcp://10.2.3.20:38449' after 20 s: connect() didn't finish in time
=> 5000 events
distributed.core - INFO - Event loop was unresponsive in Worker for 4.95s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
=> 26,094 events
We are working around that, by setting
# less connections to avoid potential network saturation and connection timeouts
export DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING=25
export DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING=5
export DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG=16384
# graceful timeout and retry policy
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=20
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=20
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=2
sothat, the job finally finishes, despite the issues in the communication.
Anyways, sometimes workers even start hanging and have to be restarted manually. Also the progress significantly slows down to the end of the job.
I'm sharing this with the community and hoping, that somebody may give pointers what to try, or even ideas for a resolution.
Environment:
- Dask version: 2.23.0
- Python version: 3.6
- Operating System: ubuntu on kubernetes
- Install method (conda, pip, source): pip