Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose message-bytes-limit in config #7074

Merged
merged 2 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,17 @@ properties:
To avoid this, Dask usually used a file-based lock.
However, on some systems file-based locks don't work.
This is particularly common on HPC NFS systems, where users may want to set this to false.
transfer:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XREF: #6977

I went with singular transfer to be coherent with variable naming.

type: object
description: |
Configuration setting for data transfer between workers
properties:
message-bytes-limit:
type:
- string
- integer
description: |
The maximum size of a message sent between workers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is not correct, no? Rather, IIUC, this is the maximum amount of "extra" data we'll ask for from a worker once we've picked a worker to transfer a task from.

So an individual message may be (much) larger than message-bytes-limit, but if we decide to glue a bunch of messages together into one gather the sum of the size of those messages will never exceed message-bytes-limit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not explain the fine print, that's a fair point. I'll file a follow-up PR that highlights that this is not an absolute maximum.

connections:
type: object
description: |
Expand Down
2 changes: 2 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ distributed:
blocked-handlers: []
multiprocessing-method: spawn
use-file-locking: True
transfer:
message-bytes-limit: 50MB
connections: # Maximum concurrent connections for data
outgoing: 50 # This helps to control network saturation
incoming: 10
Expand Down
16 changes: 8 additions & 8 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ async def test_clean(c, s, a, b):
@gen_cluster(client=True)
async def test_message_breakup(c, s, a, b):
n = 100_000
a.state.transfer_message_target_bytes = 10 * n
b.state.transfer_message_target_bytes = 10 * n
a.state.transfer_message_bytes_limit = 10 * n
b.state.transfer_message_bytes_limit = 10 * n
xs = [
c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30)
]
Expand Down Expand Up @@ -809,10 +809,10 @@ async def test_multiple_transfers(c, s, w1, w2, w3):
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_share_communication(c, s, w1, w2, w3):
x = c.submit(
mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address
mul, b"1", int(w3.transfer_message_bytes_limit + 1), workers=w1.address
)
y = c.submit(
mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address
mul, b"2", int(w3.transfer_message_bytes_limit + 1), workers=w2.address
)
await wait([x, y])
await c._replicate([x, y], workers=[w1.address, w2.address])
Expand All @@ -826,8 +826,8 @@ async def test_share_communication(c, s, w1, w2, w3):
@pytest.mark.xfail(reason="very high flakiness")
@gen_cluster(client=True)
async def test_dont_overlap_communications_to_same_worker(c, s, a, b):
x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address)
x = c.submit(mul, b"1", int(b.transfer_message_bytes_limit + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_bytes_limit + 1), workers=a.address)
await wait([x, y])
z = c.submit(add, x, y, workers=b.address)
await wait(z)
Expand Down Expand Up @@ -3001,9 +3001,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_acquire_replicas_large_data(c, s, a):
"""When acquire-replicas is used to acquire multiple sizeable tasks, it respects
transfer_message_target_bytes and acquires them over multiple iterations.
transfer_message_bytes_limit and acquires them over multiple iterations.
"""
size = a.state.transfer_message_target_bytes // 5 - 10_000
size = a.state.transfer_message_bytes_limit // 5 - 10_000

class C:
def __sizeof__(self):
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,13 +1013,13 @@ async def test_deprecated_worker_attributes(s, a, b):
@pytest.mark.parametrize(
"nbytes,n_in_flight",
[
# Note: transfer_message_target_bytes = 50e6 bytes
(int(10e6), 3),
(int(20e6), 2),
(int(30e6), 1),
],
)
def test_aggregate_gather_deps(ws, nbytes, n_in_flight):
ws.transfer_message_bytes_limit = int(50e6)
ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(
AcquireReplicasEvent(
Expand Down Expand Up @@ -1066,7 +1066,7 @@ def test_gather_priority(ws):
},
# Substantial nbytes prevents transfer_incoming_count_limit to be
# overridden by transfer_incoming_bytes_throttle_threshold,
# but it's less than transfer_message_target_bytes
# but it's less than transfer_message_bytes_limit
nbytes={f"x{i}": 4 * 2**20 for i in range(1, 9)},
stimulus_id="compute1",
),
Expand Down
7 changes: 5 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,9 @@ def __init__(
self.transfer_outgoing_count_limit = dask.config.get(
"distributed.worker.connections.incoming"
)

transfer_message_bytes_limit = parse_bytes(
dask.config.get("distributed.worker.transfer.message-bytes-limit")
)
self.threads = {}

self.active_threads_lock = threading.Lock()
Expand Down Expand Up @@ -767,6 +769,7 @@ def __init__(
validate=validate,
transition_counter_max=transition_counter_max,
transfer_incoming_bytes_limit=transfer_incoming_bytes_limit,
transfer_message_bytes_limit=transfer_message_bytes_limit,
)
BaseWorker.__init__(self, state)

Expand Down Expand Up @@ -889,7 +892,7 @@ def data(self) -> MutableMapping[str, Any]:
ready = DeprecatedWorkerStateAttribute()
tasks = DeprecatedWorkerStateAttribute()
target_message_size = DeprecatedWorkerStateAttribute(
target="transfer_message_target_bytes"
target="transfer_message_bytes_limit"
)
total_out_connections = DeprecatedWorkerStateAttribute(
target="transfer_incoming_count_limit"
Expand Down
12 changes: 7 additions & 5 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import heapq
import logging
import math
import operator
import random
import sys
Expand Down Expand Up @@ -1129,7 +1130,7 @@ class WorkerState:
#: :meth:`BaseWorker.gather_dep`. Multiple small tasks that can be fetched from the
#: same worker will be clustered in a single instruction as long as their combined
#: size doesn't exceed this value.
transfer_message_target_bytes: int
transfer_message_bytes_limit: float
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Driveby: Renamed to match other attribute names (e.g., transfer_incoming_bytes_limit). cc @crusaderky


#: All and only tasks with ``TaskState.state == 'missing'``.
missing_dep_flight: set[TaskState]
Expand Down Expand Up @@ -1254,6 +1255,7 @@ def __init__(
validate: bool = True,
transition_counter_max: int | Literal[False] = False,
transfer_incoming_bytes_limit: int | None = None,
transfer_message_bytes_limit: float = math.inf,
):
self.nthreads = nthreads

Expand Down Expand Up @@ -1292,7 +1294,7 @@ def __init__(
self.in_flight_tasks = set()
self.executed_count = 0
self.long_running = set()
self.transfer_message_target_bytes = int(50e6) # 50 MB
self.transfer_message_bytes_limit = transfer_message_bytes_limit
self.log = deque(maxlen=100_000)
self.stimulus_log = deque(maxlen=10_000)
self.transition_counter = 0
Expand Down Expand Up @@ -1635,7 +1637,7 @@ def _select_keys_for_gather(
"""Helper of _ensure_communicating.

Fetch all tasks that are replicated on the target worker within a single
message, up to transfer_message_target_bytes or until we reach the limit
message, up to transfer_message_bytes_limit or until we reach the limit
for the size of incoming data transfers.
"""
to_gather: list[TaskState] = []
Expand All @@ -1644,10 +1646,10 @@ def _select_keys_for_gather(
if self.transfer_incoming_bytes_limit is not None:
bytes_left_to_fetch = min(
self.transfer_incoming_bytes_limit - self.transfer_incoming_bytes,
self.transfer_message_target_bytes,
self.transfer_message_bytes_limit,
)
else:
bytes_left_to_fetch = self.transfer_message_target_bytes
bytes_left_to_fetch = self.transfer_message_bytes_limit

while available:
ts = available.peek()
Expand Down
7 changes: 4 additions & 3 deletions docs/source/worker-state.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ These :class:`TaskState` objects have their state set to ``fetch``, are put in t
network. For each dependency we select a worker at random that has that data and collect
the dependency from that worker. To improve bandwidth, we opportunistically gather other
dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB
of data (:attr:`~WorkerState.transfer_message_target_bytes`) - too little data and
bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of 50
connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn
of data (:attr:`~WorkerState.transfer_message_bytes_limit`, which is acquired from the
configuration key ``distributed.worker.transfer.message-bytes-limit``) - too little data
and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of
50 connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn
acquired from the configuration key ``distributed.worker.connections.outgoing``) so as
to avoid overly-fragmenting our network bandwidth.

Expand Down