Skip to content

Conversation

@TomAugspurger
Copy link
Member

This updates Scheduler.broadcast to reuse (i.e. not close) the Comm object that sends the message to the worker. This is motivated by the UCX comms, which are relatively expensive to create and destroy. We noticed that a simple client.run (which uses Scheduler.broadcast internally) took ~100s of ms to complete with UCX, most of which was spent creating and destroying Comm objects. With this change, subsequent client.run calls take <10ms.

I've provided a new config option distributed.scheduler.reuse-broadcast-comm for users who want the previous behavior. But based on https://github.com/dask/distributed/pull/3766/files, folks agreed that this was a better default behavior. That change didn't quite get the desired behavior because of the close=True passed to send_recv.

This updates Scheduler.broadcast to reuse (i.e. *not* close) the Comm object
that sends the message to the worker. This is motivated by the UCX comms,
which are relatively expensive to create and destroy. We noticed that a
simple ``client.run`` (which uses ``Scheduler.broadcast`` internally) took
~100s of ms to complete with UCX, most of which was spent creating and destroying
Comm objects. With this change, subsequent ``client.run`` calls take <10ms.

I've provided a new config option `distributed.scheduler.reuse-broadcast-comm`
for users who want the previous behavior. But based on https://github.com/dask/distributed/pull/3766/files,
folks agreed that this was a better default behavior. That change didn't quite
get the desired behavior because of the `close=True` passed to `send_recv`.
@TomAugspurger TomAugspurger requested a review from fjetter as a code owner June 2, 2025 14:52
@TomAugspurger
Copy link
Member Author

This shows a smaller, but still nice improvement for the default TCP comm:

In [1]: from distributed import Client, LocalCluster

In [2]: cluster = LocalCluster(n_workers=32); client = cluster.get_client()

In [3]: dask.config.set({"distributed.scheduler.reuse-broadcast-comm": False})

In [6]: %time _ = client.run(lambda: None)
CPU times: user 34.4 ms, sys: 8 ms, total: 42.4 ms
Wall time: 41.7 ms

In [7]: %time _ = client.run(lambda: None)
CPU times: user 31.4 ms, sys: 7.68 ms, total: 39.1 ms
Wall time: 38.3 ms

So about 40ms per broadcast.

With comm reuse:

In [10]: dask.config.set({"distributed.scheduler.reuse-broadcast-comm": True})
Out[10]: <dask.config.set at 0x7f81e8cd65a0>

In [11]: %time _ = client.run(lambda: None)
CPU times: user 18.2 ms, sys: 11.9 ms, total: 30 ms
Wall time: 29.2 ms

In [12]: %time _ = client.run(lambda: None)
CPU times: user 7.64 ms, sys: 3.55 ms, total: 11.2 ms
Wall time: 10.6 ms

The initial one is slightly faster (doesn't have to close the sockets?) and subsequent ones are about 4x faster.

@github-actions
Copy link
Contributor

github-actions bot commented Jun 2, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ± 0      27 suites  ±0   11h 11m 53s ⏱️ - 7m 51s
 4 115 tests + 1   4 002 ✅ + 4    111 💤 ±0  1 ❌  - 2  1 🔥  - 1 
51 595 runs  +25  49 308 ✅ +28  2 285 💤 ±0  1 ❌  - 2  1 🔥  - 1 

For more details on these failures and errors, see this check.

Results for commit a14d540. ± Comparison against base commit 6d3f6eb.

This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
pytest ‑ internal
distributed.tests.test_scheduler ‑ test_broadcast_reuse_comm[False]
distributed.tests.test_scheduler ‑ test_broadcast_reuse_comm[True]

♻️ This comment has been updated with latest results.

@TomAugspurger TomAugspurger mentioned this pull request Jun 2, 2025
@jacobtomlinson jacobtomlinson merged commit 78f5725 into dask:main Jun 3, 2025
30 of 33 checks passed
@TomAugspurger TomAugspurger deleted the tom/reuse-broadcast-comm branch June 4, 2025 14:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants