Skip to content

Commit

Permalink
Expose message-bytes-limit in config (#7074)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Sep 27, 2022
1 parent 9038c7a commit 8c4133c
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 20 deletions.
11 changes: 11 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,17 @@ properties:
To avoid this, Dask usually used a file-based lock.
However, on some systems file-based locks don't work.
This is particularly common on HPC NFS systems, where users may want to set this to false.
transfer:
type: object
description: |
Configuration setting for data transfer between workers
properties:
message-bytes-limit:
type:
- string
- integer
description: |
The maximum size of a message sent between workers
connections:
type: object
description: |
Expand Down
2 changes: 2 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ distributed:
blocked-handlers: []
multiprocessing-method: spawn
use-file-locking: True
transfer:
message-bytes-limit: 50MB
connections: # Maximum concurrent connections for data
outgoing: 50 # This helps to control network saturation
incoming: 10
Expand Down
16 changes: 8 additions & 8 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ async def test_clean(c, s, a, b):
@gen_cluster(client=True)
async def test_message_breakup(c, s, a, b):
n = 100_000
a.state.transfer_message_target_bytes = 10 * n
b.state.transfer_message_target_bytes = 10 * n
a.state.transfer_message_bytes_limit = 10 * n
b.state.transfer_message_bytes_limit = 10 * n
xs = [
c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30)
]
Expand Down Expand Up @@ -809,10 +809,10 @@ async def test_multiple_transfers(c, s, w1, w2, w3):
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_share_communication(c, s, w1, w2, w3):
x = c.submit(
mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address
mul, b"1", int(w3.transfer_message_bytes_limit + 1), workers=w1.address
)
y = c.submit(
mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address
mul, b"2", int(w3.transfer_message_bytes_limit + 1), workers=w2.address
)
await wait([x, y])
await c._replicate([x, y], workers=[w1.address, w2.address])
Expand All @@ -826,8 +826,8 @@ async def test_share_communication(c, s, w1, w2, w3):
@pytest.mark.xfail(reason="very high flakiness")
@gen_cluster(client=True)
async def test_dont_overlap_communications_to_same_worker(c, s, a, b):
x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address)
x = c.submit(mul, b"1", int(b.transfer_message_bytes_limit + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_bytes_limit + 1), workers=a.address)
await wait([x, y])
z = c.submit(add, x, y, workers=b.address)
await wait(z)
Expand Down Expand Up @@ -3001,9 +3001,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_acquire_replicas_large_data(c, s, a):
"""When acquire-replicas is used to acquire multiple sizeable tasks, it respects
transfer_message_target_bytes and acquires them over multiple iterations.
transfer_message_bytes_limit and acquires them over multiple iterations.
"""
size = a.state.transfer_message_target_bytes // 5 - 10_000
size = a.state.transfer_message_bytes_limit // 5 - 10_000

class C:
def __sizeof__(self):
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,13 +1013,13 @@ async def test_deprecated_worker_attributes(s, a, b):
@pytest.mark.parametrize(
"nbytes,n_in_flight",
[
# Note: transfer_message_target_bytes = 50e6 bytes
(int(10e6), 3),
(int(20e6), 2),
(int(30e6), 1),
],
)
def test_aggregate_gather_deps(ws, nbytes, n_in_flight):
ws.transfer_message_bytes_limit = int(50e6)
ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(
AcquireReplicasEvent(
Expand Down Expand Up @@ -1066,7 +1066,7 @@ def test_gather_priority(ws):
},
# Substantial nbytes prevents transfer_incoming_count_limit to be
# overridden by transfer_incoming_bytes_throttle_threshold,
# but it's less than transfer_message_target_bytes
# but it's less than transfer_message_bytes_limit
nbytes={f"x{i}": 4 * 2**20 for i in range(1, 9)},
stimulus_id="compute1",
),
Expand Down
7 changes: 5 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,9 @@ def __init__(
self.transfer_outgoing_count_limit = dask.config.get(
"distributed.worker.connections.incoming"
)

transfer_message_bytes_limit = parse_bytes(
dask.config.get("distributed.worker.transfer.message-bytes-limit")
)
self.threads = {}

self.active_threads_lock = threading.Lock()
Expand Down Expand Up @@ -767,6 +769,7 @@ def __init__(
validate=validate,
transition_counter_max=transition_counter_max,
transfer_incoming_bytes_limit=transfer_incoming_bytes_limit,
transfer_message_bytes_limit=transfer_message_bytes_limit,
)
BaseWorker.__init__(self, state)

Expand Down Expand Up @@ -889,7 +892,7 @@ def data(self) -> MutableMapping[str, Any]:
ready = DeprecatedWorkerStateAttribute()
tasks = DeprecatedWorkerStateAttribute()
target_message_size = DeprecatedWorkerStateAttribute(
target="transfer_message_target_bytes"
target="transfer_message_bytes_limit"
)
total_out_connections = DeprecatedWorkerStateAttribute(
target="transfer_incoming_count_limit"
Expand Down
12 changes: 7 additions & 5 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import heapq
import logging
import math
import operator
import random
import sys
Expand Down Expand Up @@ -1129,7 +1130,7 @@ class WorkerState:
#: :meth:`BaseWorker.gather_dep`. Multiple small tasks that can be fetched from the
#: same worker will be clustered in a single instruction as long as their combined
#: size doesn't exceed this value.
transfer_message_target_bytes: int
transfer_message_bytes_limit: float

#: All and only tasks with ``TaskState.state == 'missing'``.
missing_dep_flight: set[TaskState]
Expand Down Expand Up @@ -1254,6 +1255,7 @@ def __init__(
validate: bool = True,
transition_counter_max: int | Literal[False] = False,
transfer_incoming_bytes_limit: int | None = None,
transfer_message_bytes_limit: float = math.inf,
):
self.nthreads = nthreads

Expand Down Expand Up @@ -1292,7 +1294,7 @@ def __init__(
self.in_flight_tasks = set()
self.executed_count = 0
self.long_running = set()
self.transfer_message_target_bytes = int(50e6) # 50 MB
self.transfer_message_bytes_limit = transfer_message_bytes_limit
self.log = deque(maxlen=100_000)
self.stimulus_log = deque(maxlen=10_000)
self.transition_counter = 0
Expand Down Expand Up @@ -1635,7 +1637,7 @@ def _select_keys_for_gather(
"""Helper of _ensure_communicating.
Fetch all tasks that are replicated on the target worker within a single
message, up to transfer_message_target_bytes or until we reach the limit
message, up to transfer_message_bytes_limit or until we reach the limit
for the size of incoming data transfers.
"""
to_gather: list[TaskState] = []
Expand All @@ -1644,10 +1646,10 @@ def _select_keys_for_gather(
if self.transfer_incoming_bytes_limit is not None:
bytes_left_to_fetch = min(
self.transfer_incoming_bytes_limit - self.transfer_incoming_bytes,
self.transfer_message_target_bytes,
self.transfer_message_bytes_limit,
)
else:
bytes_left_to_fetch = self.transfer_message_target_bytes
bytes_left_to_fetch = self.transfer_message_bytes_limit

while available:
ts = available.peek()
Expand Down
7 changes: 4 additions & 3 deletions docs/source/worker-state.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ These :class:`TaskState` objects have their state set to ``fetch``, are put in t
network. For each dependency we select a worker at random that has that data and collect
the dependency from that worker. To improve bandwidth, we opportunistically gather other
dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB
of data (:attr:`~WorkerState.transfer_message_target_bytes`) - too little data and
bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of 50
connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn
of data (:attr:`~WorkerState.transfer_message_bytes_limit`, which is acquired from the
configuration key ``distributed.worker.transfer.message-bytes-limit``) - too little data
and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of
50 connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn
acquired from the configuration key ``distributed.worker.connections.outgoing``) so as
to avoid overly-fragmenting our network bandwidth.

Expand Down

0 comments on commit 8c4133c

Please sign in to comment.