Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ properties:

This is used by the scheduler in some scheduling decisions

reuse-broadcast-comm:
type: boolean
description: |
Whether to reuse the Scheduler to Worker Comm for repeated broadcasts.

This can be useful to avoid the overhead of creating and destroying Comms when sending multiple
small messages in methods like ``Client.run``. The scheduler will persist an open
Comm object for each worker. Set this to False if you want to close the Comm after each broadcast.

events-cleanup-delay:
type: string
description: |
Expand Down
2 changes: 2 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ distributed:
blocked-handlers: []
contact-address: null
default-data-size: 1kiB
# Whether to reuse the same Scheduler to Worker comm for repeated broadcasts.
reuse-broadcast-comm: True
# Number of seconds to wait until workers or clients are removed from the events log
# after they have been removed from the scheduler
events-cleanup-delay: 1h
Expand Down
7 changes: 6 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6747,13 +6747,18 @@ async def broadcast(

ERROR = object()

reuse_broadcast_comm = dask.config.get(
"distributed.scheduler.reuse-broadcast-comm", False
)
close = not reuse_broadcast_comm

async def send_message(addr: str) -> Any:
try:
comm = await self.rpc.connect(addr)
comm.name = "Scheduler Broadcast"
try:
resp = await send_recv(
comm, close=True, serializers=serializers, **msg
comm, close=close, serializers=serializers, **msg
)
finally:
self.rpc.reuse(addr, comm)
Expand Down
17 changes: 17 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,23 @@ async def test_broadcast_deprecation(s, a, b):
assert out == {a.address: b"pong", b.address: b"pong"}


@pytest.mark.parametrize("reuse_broadcast_comm", [True, False])
@gen_cluster()
async def test_broadcast_reuse_comm(
s: Scheduler, a: Worker, b: Worker, reuse_broadcast_comm: bool
) -> None:
with dask.config.set(
{"distributed.scheduler.reuse-broadcast-comm": reuse_broadcast_comm}
):
out = await s.broadcast(msg={"op": "ping"})
assert out == {a.address: b"pong", b.address: b"pong"}

if reuse_broadcast_comm:
assert s.rpc.open == 2
else:
assert s.rpc.open == 0


@gen_cluster(nthreads=[])
async def test_worker_name(s):
async with Worker(s.address, name="alice") as w:
Expand Down
Loading