Skip to content

Timed out trying to connect ... : connect() didn't finish in time #4080

Open
@mnarodovitch

Description

@mnarodovitch

Whereas the following is not a bug we can pinpoint, it really bothers us. After weeks of experimentation and investigation we didn't find a real solution.

We run something like

(a_bag
  .repartition(100_000) # outputs small parttitions
  .map(very_cpu_heavy_computation) # outputs ~4MB partitions
  .repartition(1000).to_dataframe().to_parquet() # writes ~30MB parquet files from ~400MB in-memory-partitions
)

on 100 workers with 1500 cpus on Kubernetes pods.

When it comes to the last re-partition step, dask starts blocking the event loop and the workers start spamming the following logs

distributed.utils_comm - INFO - Retrying get_data_from_worker after exception in attempt 0/2: Timed out trying to connect to 'tcp://10.2.3.20:38449' after 20 s: Timed out trying to connect to 'tcp://10.2.3.20:38449' after 20 s: connect() didn't finish in time

=> 5000 events

distributed.core - INFO - Event loop was unresponsive in Worker for 4.95s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

=> 26,094 events

We are working around that, by setting

# less connections to avoid potential network saturation and connection timeouts
export DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING=25
export DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING=5
export DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG=16384

# graceful timeout and retry policy
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=20
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=20
export DASK_DISTRIBUTED__COMM__RETRY__COUNT=2

sothat, the job finally finishes, despite the issues in the communication.

Anyways, sometimes workers even start hanging and have to be restarted manually. Also the progress significantly slows down to the end of the job.

I'm sharing this with the community and hoping, that somebody may give pointers what to try, or even ideas for a resolution.

Environment:

  • Dask version: 2.23.0
  • Python version: 3.6
  • Operating System: ubuntu on kubernetes
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions